diff --git a/search/server.go b/search/server.go index 17d042e..6de1e57 100644 --- a/search/server.go +++ b/search/server.go @@ -51,6 +51,11 @@ type Server struct { statsCache map[string]any statsCacheMu sync.RWMutex + // recent 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt + recentCache map[int][]recentItem // limit → 预截取的结果列表 + recentCacheMu sync.RWMutex + recentTotal int // 总条目数(不截取) + // backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发) backlinkRunner interface { Status() map[string]interface{} @@ -71,8 +76,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { } // 启动定期刷盘 goroutine go s.runPeriodicFlush() - // 启动 stats 缓存定期刷新 goroutine - go s.runStatsCacheRefresher() + // 启动 stats + recent 缓存定期刷新 goroutine + go s.runCacheRefresher() return s } @@ -204,6 +209,7 @@ type recentItem struct { } // handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。 +// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。 // 参数:limit(默认50,最大200)。 func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -219,31 +225,64 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { limit = 200 } + s.recentCacheMu.RLock() + cached := s.recentCache[limit] + total := s.recentTotal + s.recentCacheMu.RUnlock() + + if cached == nil { + // 缓存尚未就绪,返回空列表 + json.NewEncoder(w).Encode(map[string]any{ + "items": []recentItem{}, + "total": 0, + }) + return + } + + json.NewEncoder(w).Encode(map[string]any{ + "items": cached, + "total": total, + }) +} + +// refreshRecentCache 全量遍历 bbolt 计算 recent 快照,预截取常用 limit,存入 recentCache。 +func (s *Server) refreshRecentCache() { type entry struct { - url string - snippet *storage.SnippetEntry - siteInfo *storage.SiteInfo + url string + snippet *storage.SnippetEntry + domain string } var items []entry + domainSet := make(map[string]bool) + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { - siteInfo, _ := s.db.GetSiteInfo(netloc(url)) - items = append(items, entry{url, snippet, siteInfo}) + domain := netloc(url) + items = append(items, entry{url: url, snippet: snippet, domain: domain}) + domainSet[domain] = true return nil }) + // 遍历结束后批量查 SiteInfo(避免嵌套事务) + siteInfoMap := make(map[string]*storage.SiteInfo) + for domain := range domainSet { + if si, _ := s.db.GetSiteInfo(domain); si != nil { + siteInfoMap[domain] = si + } + } + // 按时间倒序 sort.Slice(items, func(i, j int) bool { return items[i].snippet.Timestamp > items[j].snippet.Timestamp }) - if len(items) > limit { - items = items[:limit] - } - - result := make([]recentItem, 0, len(items)) + // 转换为 recentItem + allItems := make([]recentItem, 0, len(items)) for _, e := range items { - lang := e.siteInfo.Languages + lang := make(map[string]float64) + if si := siteInfoMap[e.domain]; si != nil { + lang = si.Languages + } if lang == nil { lang = make(map[string]float64) } @@ -251,22 +290,34 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { if len(desc) > 200 { desc = desc[:200] } - result = append(result, recentItem{ + allItems = append(allItems, recentItem{ URL: e.url, Title: e.snippet.Title, Description: desc, - Domain: netloc(e.url), + Domain: e.domain, Language: lang, WordCount: len(e.snippet.Text), CrawledAt: e.snippet.Timestamp, }) } - resp := map[string]any{ - "items": result, - "total": len(items), + // 预截取常用 limit 值(50, 100, 200) + cache := make(map[int][]recentItem, 3) + for _, l := range []int{50, 100, 200} { + if len(allItems) > l { + copied := make([]recentItem, l) + copy(copied, allItems[:l]) + cache[l] = copied + } else { + cache[l] = allItems + } } - json.NewEncoder(w).Encode(resp) + + s.recentCacheMu.Lock() + s.recentCache = cache + s.recentTotal = len(allItems) + s.recentCacheMu.Unlock() + log.Printf("[recent] cache refreshed: %d items", len(allItems)) } // handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 @@ -390,13 +441,15 @@ func (s *Server) refreshStatsCache() { log.Printf("[stats] cache refreshed: %d urls, %d domains, %d words", total, len(domainCount), totalWords) } -// runStatsCacheRefresher 后台定时刷新 stats 缓存。 -func (s *Server) runStatsCacheRefresher() { +// runCacheRefresher 后台定时刷新 stats 和 recent 缓存。 +// 统一由一个 goroutine 交替刷新,避免同时全量遍历 bbolt 造成压力。 +func (s *Server) runCacheRefresher() { interval := time.Duration(config.StatsRefreshInterval()) * time.Second ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { s.refreshStatsCache() + s.refreshRecentCache() } } @@ -1595,8 +1648,9 @@ func (s *Server) flush() { log.Printf("[harvester] flush write error: %v", err) } log.Printf("[harvester] flush done, %d keys written", len(batch)) - // flush 完成后立即刷新 stats 缓存(确保数据实时性) + // flush 完成后立即刷新 stats + recent 缓存(确保数据实时性) go s.refreshStatsCache() + go s.refreshRecentCache() } // getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。