This commit is contained in:
2026-04-09 09:23:39 +08:00
parent 0f881e5581
commit c96d622366
+38 -22
View File
@@ -41,9 +41,11 @@ type Server struct {
rowCount int64 // 内存中累计的索引条目总数(触发刷盘) rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
// 统计缓存(后台定期更新,避免 API 阻塞 // 统计缓存(按需更新:请求到来时触发,5s 内有结果则用新值,否则返回旧缓存
statsCache map[string]any // 缓存的统计结果 statsCache map[string]any // 缓存的统计结果
statsCacheMu sync.RWMutex // 保护统计缓存 statsCacheMu sync.RWMutex // 保护统计缓存及更新状态
statsUpdating bool // 是否正在后台更新(防止重复启动)
statsUpdateDone chan struct{} // 当前更新完成时关闭此 channel
} }
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 // New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
@@ -60,8 +62,6 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
} }
// 启动定期刷盘 goroutine // 启动定期刷盘 goroutine
go s.runPeriodicFlush() go s.runPeriodicFlush()
// 启动统计缓存更新 goroutine(每 60 秒更新一次)
go s.runStatsCacheUpdater()
return s return s
} }
@@ -74,17 +74,32 @@ func (s *Server) runPeriodicFlush() {
} }
} }
// runStatsCacheUpdater 定期更新统计缓存(后台线程,避免 API 阻塞)。 // triggerStatsUpdate 触发一次后台统计更新(若已在更新中则不重复启动)。
func (s *Server) runStatsCacheUpdater() { // 返回一个 channel,调用方可等待本次更新完成。
ticker := time.NewTicker(60 * time.Second) func (s *Server) triggerStatsUpdate() <-chan struct{} {
defer ticker.Stop() s.statsCacheMu.Lock()
// 立即执行一次 defer s.statsCacheMu.Unlock()
s.updateStatsCache()
for range ticker.C { if s.statsUpdating {
s.updateStatsCache() // 已有更新在跑,直接返回其 done channel
return s.statsUpdateDone
} }
// 开启新一轮更新
done := make(chan struct{})
s.statsUpdating = true
s.statsUpdateDone = done
go func() {
s.updateStatsCache()
s.statsCacheMu.Lock()
s.statsUpdating = false
s.statsCacheMu.Unlock()
close(done)
}()
return done
} }
// updateStatsCache 计算统计信息并更新缓存。 // updateStatsCache 计算统计信息并更新缓存。
func (s *Server) updateStatsCache() { func (s *Server) updateStatsCache() {
domainCount := make(map[string]int) domainCount := make(map[string]int)
@@ -319,23 +334,24 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
} }
// handleAdminStats 返回缓存的全局统计(后台线程定期更新,API 不阻塞) // handleAdminStats 返回全局统计:触发后台更新,5s 内有结果返回新值,否则返回旧缓存
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
done := s.triggerStatsUpdate()
select {
case <-done:
// 更新完成,返回最新缓存
case <-time.After(5 * time.Second):
// 超时,使用旧缓存
}
s.statsCacheMu.RLock() s.statsCacheMu.RLock()
cache := s.statsCache cache := s.statsCache
s.statsCacheMu.RUnlock() s.statsCacheMu.RUnlock()
// 如果缓存为空,同步计算一次(首次启动时)
if len(cache) == 0 {
s.updateStatsCache()
s.statsCacheMu.RLock()
cache = s.statsCache
s.statsCacheMu.RUnlock()
}
json.NewEncoder(w).Encode(cache) json.NewEncoder(w).Encode(cache)
} }