From dddc445955769ae644928959b69cd9536d85622e Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 10 Apr 2026 00:37:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 10 +++++ search/server.go | 95 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index ee4a93d..44b1349 100644 --- a/config/config.go +++ b/config/config.go @@ -59,6 +59,7 @@ type SearchConfig struct { BacklinkWeight float64 `yaml:"backlink_weight"` ServerPort int `yaml:"server_port"` FlushIntervalSeconds int `yaml:"flush_interval_seconds"` + StatsRefreshInterval int `yaml:"stats_refresh_interval"` // 统计缓存刷新间隔(秒),默认 30 MissPenalty float64 `yaml:"miss_penalty"` // 缺词惩罚系数(0=不惩罚,1=完全忽略缺词URL),默认 0.15 } @@ -137,6 +138,7 @@ func GetDefaultConfig() Config { BacklinkWeight: 1.0, ServerPort: 50082, FlushIntervalSeconds: 300, + StatsRefreshInterval: 30, MissPenalty: 0.15, }, Backlink: BacklinkConfig{ @@ -249,6 +251,14 @@ func SearchServerPort() int { return Global.Search.ServerPort } // FlushIntervalSeconds 返回配置值 func FlushIntervalSeconds() int { return Global.Search.FlushIntervalSeconds } +// StatsRefreshInterval 返回统计缓存刷新间隔(秒),默认 30。 +func StatsRefreshInterval() int { + if Global.Search.StatsRefreshInterval <= 0 { + return 30 + } + return Global.Search.StatsRefreshInterval +} + // MissPenalty 返回缺词惩罚系数(0~1),值越大对缺少查询词的 URL 惩罚越重。 func MissPenalty() float64 { return Global.Search.MissPenalty } diff --git a/search/server.go b/search/server.go index a83fcc1..17d042e 100644 --- a/search/server.go +++ b/search/server.go @@ -47,6 +47,10 @@ type Server struct { indexCacheMu sync.RWMutex indexCacheHits int64 // 缓存命中计数(原子) + // stats 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt + statsCache map[string]any + statsCacheMu sync.RWMutex + // backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发) backlinkRunner interface { Status() map[string]interface{} @@ -67,6 +71,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { } // 启动定期刷盘 goroutine go s.runPeriodicFlush() + // 启动 stats 缓存定期刷新 goroutine + go s.runStatsCacheRefresher() return s } @@ -171,10 +177,17 @@ func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) } -// ListenAndServe 启动搜索服务器。 +// ListenAndServe 启动搜索服务器(带超时保护)。 func (s *Server) ListenAndServe(addr string) error { log.Printf("[search] listening on %s", addr) - return http.ListenAndServe(addr, s.Handler()) + srv := &http.Server{ + Addr: addr, + Handler: s.Handler(), + ReadTimeout: 10 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 120 * time.Second, + } + return srv.ListenAndServe() } // ---- Admin 接口 ---- @@ -257,10 +270,36 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { } // handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 +// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。 func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "application/json; charset=utf-8") + s.statsCacheMu.RLock() + cached := s.statsCache + s.statsCacheMu.RUnlock() + + if cached == nil { + // 缓存尚未就绪,返回空统计 + json.NewEncoder(w).Encode(map[string]any{ + "total_urls": 0, + "total_words": 0, + "total_domains": 0, + "domains": map[string]int{}, + "languages": map[string]int{}, + "pending": atomic.LoadInt64(&s.rowCount), + "recrawl_eligible": 0, + }) + return + } + + // 将 pending(内存未刷盘数)覆盖为实时值 + cached["pending"] = atomic.LoadInt64(&s.rowCount) + json.NewEncoder(w).Encode(cached) +} + +// refreshStatsCache 全量遍历 bbolt 计算统计快照,存入 statsCache。 +func (s *Server) refreshStatsCache() { domainCount := make(map[string]int) langCount := make(map[string]int) totalWords := 0 @@ -269,6 +308,14 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { now := time.Now().Unix() maxAge := int64(config.RecrawlMaxAge()) + // 收集域名,遍历结束后批量查 SiteInfo(避免嵌套事务) + type domainStat struct { + domain string + langMap map[string]float64 + } + domainSet := make(map[string]bool) + snippetDomains := make([]string, 0) + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { total++ domain := netloc(url) @@ -277,7 +324,15 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { if now-snippet.Timestamp >= maxAge { recrawlEligible++ } + if !domainSet[domain] { + domainSet[domain] = true + snippetDomains = append(snippetDomains, domain) + } + return nil + }) + // 遍历结束后批量查 SiteInfo(避免 ForEachSnippet 回调中嵌套 bbolt 事务) + for _, domain := range snippetDomains { siteInfo, _ := s.db.GetSiteInfo(domain) if siteInfo != nil { for lang, ratio := range siteInfo.Languages { @@ -286,8 +341,7 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { } } } - return nil - }) + } // 排序取 Top type kv struct { @@ -320,17 +374,30 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { langsMap[kv.k] = kv.v } - resp := map[string]any{ - "total_urls": total, - "total_words": totalWords, - "total_domains": len(domainCount), // 真实的域名总数(非Top 20) - "domains": domainsMap, - "languages": langsMap, - "pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数 - "recrawl_eligible": recrawlEligible, // 已过期、可被重爬的 URL 数量 + result := map[string]any{ + "total_urls": total, + "total_words": totalWords, + "total_domains": len(domainCount), + "domains": domainsMap, + "languages": langsMap, + "pending": atomic.LoadInt64(&s.rowCount), + "recrawl_eligible": recrawlEligible, } - json.NewEncoder(w).Encode(resp) + s.statsCacheMu.Lock() + s.statsCache = result + s.statsCacheMu.Unlock() + log.Printf("[stats] cache refreshed: %d urls, %d domains, %d words", total, len(domainCount), totalWords) +} + +// runStatsCacheRefresher 后台定时刷新 stats 缓存。 +func (s *Server) runStatsCacheRefresher() { + interval := time.Duration(config.StatsRefreshInterval()) * time.Second + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + s.refreshStatsCache() + } } // handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。 @@ -1528,6 +1595,8 @@ func (s *Server) flush() { log.Printf("[harvester] flush write error: %v", err) } log.Printf("[harvester] flush done, %d keys written", len(batch)) + // flush 完成后立即刷新 stats 缓存(确保数据实时性) + go s.refreshStatsCache() } // getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。