From 0f881e5581e3ccc86f4f025f8ce71309aa4cada4 Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 9 Apr 2026 02:41:41 +0800 Subject: [PATCH] up --- search/server.go | 154 +++++++++++++++++++++++++++++------------------ 1 file changed, 96 insertions(+), 58 deletions(-) diff --git a/search/server.go b/search/server.go index e8ae2c5..6b92ff2 100644 --- a/search/server.go +++ b/search/server.go @@ -40,6 +40,10 @@ type Server struct { memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞) rowCount int64 // 内存中累计的索引条目总数(触发刷盘) flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 + + // 统计缓存(后台定期更新,避免 API 阻塞) + statsCache map[string]any // 缓存的统计结果 + statsCacheMu sync.RWMutex // 保护统计缓存 } // New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 @@ -52,9 +56,12 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { httpCli: &http.Client{ Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second, }, + statsCache: make(map[string]any), } // 启动定期刷盘 goroutine go s.runPeriodicFlush() + // 启动统计缓存更新 goroutine(每 60 秒更新一次) + go s.runStatsCacheUpdater() return s } @@ -67,6 +74,84 @@ func (s *Server) runPeriodicFlush() { } } +// runStatsCacheUpdater 定期更新统计缓存(后台线程,避免 API 阻塞)。 +func (s *Server) runStatsCacheUpdater() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + // 立即执行一次 + s.updateStatsCache() + for range ticker.C { + s.updateStatsCache() + } +} + +// updateStatsCache 计算统计信息并更新缓存。 +func (s *Server) updateStatsCache() { + domainCount := make(map[string]int) + langCount := make(map[string]int) + totalWords := 0 + total := 0 + + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { + total++ + domain := netloc(url) + domainCount[domain]++ + totalWords += len(snippet.Text) + + siteInfo, _ := s.db.GetSiteInfo(domain) + if siteInfo != nil { + for lang, ratio := range siteInfo.Languages { + if ratio > 0.1 { + langCount[lang]++ + } + } + } + return nil + }) + + // 排序取 Top + type kv struct{ k string; v int } + topDomains := make([]kv, 0, len(domainCount)) + for k, v := range domainCount { + topDomains = append(topDomains, kv{k, v}) + } + sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v }) + if len(topDomains) > 20 { + topDomains = topDomains[:20] + } + topLangs := make([]kv, 0, len(langCount)) + for k, v := range langCount { + topLangs = append(topLangs, kv{k, v}) + } + sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v }) + if len(topLangs) > 10 { + topLangs = topLangs[:10] + } + + domainsMap := make(map[string]int) + for _, kv := range topDomains { + domainsMap[kv.k] = kv.v + } + langsMap := make(map[string]int) + for _, kv := range topLangs { + langsMap[kv.k] = kv.v + } + + cache := map[string]any{ + "total_urls": total, + "total_words": totalWords, + "total_domains": len(domainCount), + "domains": domainsMap, + "languages": langsMap, + "pending": atomic.LoadInt64(&s.rowCount), + "cached_at": time.Now().Unix(), + } + + s.statsCacheMu.Lock() + s.statsCache = cache + s.statsCacheMu.Unlock() +} + // Flush 公开的刷盘方法,供定时任务和外部调用。 func (s *Server) Flush() { s.flush() } @@ -234,71 +319,24 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resp) } -// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 +// handleAdminStats 返回缓存的全局统计(后台线程定期更新,API 不阻塞)。 func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "application/json; charset=utf-8") - domainCount := make(map[string]int) - langCount := make(map[string]int) - totalWords := 0 - total := 0 + s.statsCacheMu.RLock() + cache := s.statsCache + s.statsCacheMu.RUnlock() - s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { - total++ - domain := netloc(url) - domainCount[domain]++ - totalWords += len(snippet.Text) - - siteInfo, _ := s.db.GetSiteInfo(domain) - if siteInfo != nil { - for lang, ratio := range siteInfo.Languages { - if ratio > 0.1 { - langCount[lang]++ - } - } - } - return nil - }) - - // 排序取 Top - type kv struct{ k string; v int } - topDomains := make([]kv, 0, len(domainCount)) - for k, v := range domainCount { - topDomains = append(topDomains, kv{k, v}) - } - sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v }) - if len(topDomains) > 20 { - topDomains = topDomains[:20] - } - topLangs := make([]kv, 0, len(langCount)) - for k, v := range langCount { - topLangs = append(topLangs, kv{k, v}) - } - sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v }) - if len(topLangs) > 10 { - topLangs = topLangs[:10] + // 如果缓存为空,同步计算一次(首次启动时) + if len(cache) == 0 { + s.updateStatsCache() + s.statsCacheMu.RLock() + cache = s.statsCache + s.statsCacheMu.RUnlock() } - domainsMap := make(map[string]int) - for _, kv := range topDomains { - domainsMap[kv.k] = kv.v - } - langsMap := make(map[string]int) - for _, kv := range topLangs { - langsMap[kv.k] = kv.v - } - - resp := map[string]any{ - "total_urls": total, - "total_words": totalWords, - "total_domains": len(domainCount), // 真实的域名总数(非Top 20) - "domains": domainsMap, - "languages": langsMap, - "pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数 - } - - json.NewEncoder(w).Encode(resp) + json.NewEncoder(w).Encode(cache) } // handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。