diff --git a/search/server.go b/search/server.go index bbbf2c7..5ccf379 100644 --- a/search/server.go +++ b/search/server.go @@ -41,6 +41,7 @@ type Server struct { memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞) rowCount int64 // 内存中累计的索引条目总数(触发刷盘) flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 + flushWg sync.WaitGroup // 追踪 flush 是否完成 // flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务 indexCache map[string][]storage.IndexEntry @@ -570,14 +571,24 @@ func (s *Server) handleAdminPriorityStatus(w http.ResponseWriter, r *http.Reques json.NewEncoder(w).Encode(crawler.GlobalPriorityStatus()) } -// handleAdminFlush 强制刷盘。 +// handleAdminFlush 触发刷盘(不等待完成,立即返回)。 func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") if r.Method != http.MethodGet && r.Method != http.MethodPost { http.Error(w, `{"error":"method not allowed"}`, 405) return } - s.Flush() + // TryLock 尝试获取锁 + if s.flushMu.TryLock() { + // 获取到锁:启动 goroutine 执行 flush,锁会在 goroutine 中释放 + s.flushWg.Add(1) + go func() { + s.doFlush() // doFlush 不获取锁,只执行核心逻辑 + s.flushMu.Unlock() + s.flushWg.Done() + }() + } + // 无论是否获取到锁,都立即返回 w.Write([]byte("flushed")) } @@ -1609,17 +1620,19 @@ func (s *Server) lowThreshold(key string) float64 { } // flush 将内存中的索引批量合并写入磁盘,然后清空内存。 +// 由定时器调用,需要获取锁并等待完成。 func (s *Server) flush() { - if !s.flushMu.TryLock() { - return - } + s.flushMu.Lock() + s.flushWg.Add(1) defer func() { s.flushMu.Unlock() - // 清除索引读缓存 - s.indexCacheMu.Lock() - s.indexCache = nil - s.indexCacheMu.Unlock() + s.flushWg.Done() }() + s.doFlush() +} + +// doFlush 执行 flush 的核心逻辑(不清锁,由调用者负责)。 +func (s *Server) doFlush() { s.memMu.Lock() snapshot := s.mem s.mem = make(map[string][]storage.IndexEntry) @@ -1688,6 +1701,10 @@ func (s *Server) flush() { // flush 完成后立即刷新 stats + recent 缓存(确保数据实时性) go s.refreshStatsCache() go s.refreshRecentCache() + // 清除索引读缓存 + s.indexCacheMu.Lock() + s.indexCache = nil + s.indexCacheMu.Unlock() } // getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。