优化刷盘逻辑
This commit is contained in:
+26
-9
@@ -41,6 +41,7 @@ type Server struct {
|
|||||||
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
||||||
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
||||||
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
||||||
|
flushWg sync.WaitGroup // 追踪 flush 是否完成
|
||||||
|
|
||||||
// flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务
|
// flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务
|
||||||
indexCache map[string][]storage.IndexEntry
|
indexCache map[string][]storage.IndexEntry
|
||||||
@@ -570,14 +571,24 @@ func (s *Server) handleAdminPriorityStatus(w http.ResponseWriter, r *http.Reques
|
|||||||
json.NewEncoder(w).Encode(crawler.GlobalPriorityStatus())
|
json.NewEncoder(w).Encode(crawler.GlobalPriorityStatus())
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleAdminFlush 强制刷盘。
|
// handleAdminFlush 触发刷盘(不等待完成,立即返回)。
|
||||||
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
if r.Method != http.MethodGet && r.Method != http.MethodPost {
|
if r.Method != http.MethodGet && r.Method != http.MethodPost {
|
||||||
http.Error(w, `{"error":"method not allowed"}`, 405)
|
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.Flush()
|
// TryLock 尝试获取锁
|
||||||
|
if s.flushMu.TryLock() {
|
||||||
|
// 获取到锁:启动 goroutine 执行 flush,锁会在 goroutine 中释放
|
||||||
|
s.flushWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
s.doFlush() // doFlush 不获取锁,只执行核心逻辑
|
||||||
|
s.flushMu.Unlock()
|
||||||
|
s.flushWg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
// 无论是否获取到锁,都立即返回
|
||||||
w.Write([]byte("flushed"))
|
w.Write([]byte("flushed"))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1609,17 +1620,19 @@ func (s *Server) lowThreshold(key string) float64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// flush 将内存中的索引批量合并写入磁盘,然后清空内存。
|
// flush 将内存中的索引批量合并写入磁盘,然后清空内存。
|
||||||
|
// 由定时器调用,需要获取锁并等待完成。
|
||||||
func (s *Server) flush() {
|
func (s *Server) flush() {
|
||||||
if !s.flushMu.TryLock() {
|
s.flushMu.Lock()
|
||||||
return
|
s.flushWg.Add(1)
|
||||||
}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
s.flushMu.Unlock()
|
s.flushMu.Unlock()
|
||||||
// 清除索引读缓存
|
s.flushWg.Done()
|
||||||
s.indexCacheMu.Lock()
|
|
||||||
s.indexCache = nil
|
|
||||||
s.indexCacheMu.Unlock()
|
|
||||||
}()
|
}()
|
||||||
|
s.doFlush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// doFlush 执行 flush 的核心逻辑(不清锁,由调用者负责)。
|
||||||
|
func (s *Server) doFlush() {
|
||||||
s.memMu.Lock()
|
s.memMu.Lock()
|
||||||
snapshot := s.mem
|
snapshot := s.mem
|
||||||
s.mem = make(map[string][]storage.IndexEntry)
|
s.mem = make(map[string][]storage.IndexEntry)
|
||||||
@@ -1688,6 +1701,10 @@ func (s *Server) flush() {
|
|||||||
// flush 完成后立即刷新 stats + recent 缓存(确保数据实时性)
|
// flush 完成后立即刷新 stats + recent 缓存(确保数据实时性)
|
||||||
go s.refreshStatsCache()
|
go s.refreshStatsCache()
|
||||||
go s.refreshRecentCache()
|
go s.refreshRecentCache()
|
||||||
|
// 清除索引读缓存
|
||||||
|
s.indexCacheMu.Lock()
|
||||||
|
s.indexCache = nil
|
||||||
|
s.indexCacheMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。
|
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。
|
||||||
|
|||||||
Reference in New Issue
Block a user