更新
This commit is contained in:
@@ -59,6 +59,7 @@ type SearchConfig struct {
|
||||
BacklinkWeight float64 `yaml:"backlink_weight"`
|
||||
ServerPort int `yaml:"server_port"`
|
||||
FlushIntervalSeconds int `yaml:"flush_interval_seconds"`
|
||||
StatsRefreshInterval int `yaml:"stats_refresh_interval"` // 统计缓存刷新间隔(秒),默认 30
|
||||
MissPenalty float64 `yaml:"miss_penalty"` // 缺词惩罚系数(0=不惩罚,1=完全忽略缺词URL),默认 0.15
|
||||
}
|
||||
|
||||
@@ -137,6 +138,7 @@ func GetDefaultConfig() Config {
|
||||
BacklinkWeight: 1.0,
|
||||
ServerPort: 50082,
|
||||
FlushIntervalSeconds: 300,
|
||||
StatsRefreshInterval: 30,
|
||||
MissPenalty: 0.15,
|
||||
},
|
||||
Backlink: BacklinkConfig{
|
||||
@@ -249,6 +251,14 @@ func SearchServerPort() int { return Global.Search.ServerPort }
|
||||
// FlushIntervalSeconds 返回配置值
|
||||
func FlushIntervalSeconds() int { return Global.Search.FlushIntervalSeconds }
|
||||
|
||||
// StatsRefreshInterval 返回统计缓存刷新间隔(秒),默认 30。
|
||||
func StatsRefreshInterval() int {
|
||||
if Global.Search.StatsRefreshInterval <= 0 {
|
||||
return 30
|
||||
}
|
||||
return Global.Search.StatsRefreshInterval
|
||||
}
|
||||
|
||||
// MissPenalty 返回缺词惩罚系数(0~1),值越大对缺少查询词的 URL 惩罚越重。
|
||||
func MissPenalty() float64 { return Global.Search.MissPenalty }
|
||||
|
||||
|
||||
+78
-9
@@ -47,6 +47,10 @@ type Server struct {
|
||||
indexCacheMu sync.RWMutex
|
||||
indexCacheHits int64 // 缓存命中计数(原子)
|
||||
|
||||
// stats 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt
|
||||
statsCache map[string]any
|
||||
statsCacheMu sync.RWMutex
|
||||
|
||||
// backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发)
|
||||
backlinkRunner interface {
|
||||
Status() map[string]interface{}
|
||||
@@ -67,6 +71,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
|
||||
}
|
||||
// 启动定期刷盘 goroutine
|
||||
go s.runPeriodicFlush()
|
||||
// 启动 stats 缓存定期刷新 goroutine
|
||||
go s.runStatsCacheRefresher()
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -171,10 +177,17 @@ func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
// ListenAndServe 启动搜索服务器。
|
||||
// ListenAndServe 启动搜索服务器(带超时保护)。
|
||||
func (s *Server) ListenAndServe(addr string) error {
|
||||
log.Printf("[search] listening on %s", addr)
|
||||
return http.ListenAndServe(addr, s.Handler())
|
||||
srv := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: s.Handler(),
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 60 * time.Second,
|
||||
IdleTimeout: 120 * time.Second,
|
||||
}
|
||||
return srv.ListenAndServe()
|
||||
}
|
||||
|
||||
// ---- Admin 接口 ----
|
||||
@@ -257,10 +270,36 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
|
||||
// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。
|
||||
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
|
||||
s.statsCacheMu.RLock()
|
||||
cached := s.statsCache
|
||||
s.statsCacheMu.RUnlock()
|
||||
|
||||
if cached == nil {
|
||||
// 缓存尚未就绪,返回空统计
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"total_urls": 0,
|
||||
"total_words": 0,
|
||||
"total_domains": 0,
|
||||
"domains": map[string]int{},
|
||||
"languages": map[string]int{},
|
||||
"pending": atomic.LoadInt64(&s.rowCount),
|
||||
"recrawl_eligible": 0,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// 将 pending(内存未刷盘数)覆盖为实时值
|
||||
cached["pending"] = atomic.LoadInt64(&s.rowCount)
|
||||
json.NewEncoder(w).Encode(cached)
|
||||
}
|
||||
|
||||
// refreshStatsCache 全量遍历 bbolt 计算统计快照,存入 statsCache。
|
||||
func (s *Server) refreshStatsCache() {
|
||||
domainCount := make(map[string]int)
|
||||
langCount := make(map[string]int)
|
||||
totalWords := 0
|
||||
@@ -269,6 +308,14 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now().Unix()
|
||||
maxAge := int64(config.RecrawlMaxAge())
|
||||
|
||||
// 收集域名,遍历结束后批量查 SiteInfo(避免嵌套事务)
|
||||
type domainStat struct {
|
||||
domain string
|
||||
langMap map[string]float64
|
||||
}
|
||||
domainSet := make(map[string]bool)
|
||||
snippetDomains := make([]string, 0)
|
||||
|
||||
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
|
||||
total++
|
||||
domain := netloc(url)
|
||||
@@ -277,7 +324,15 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||
if now-snippet.Timestamp >= maxAge {
|
||||
recrawlEligible++
|
||||
}
|
||||
if !domainSet[domain] {
|
||||
domainSet[domain] = true
|
||||
snippetDomains = append(snippetDomains, domain)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// 遍历结束后批量查 SiteInfo(避免 ForEachSnippet 回调中嵌套 bbolt 事务)
|
||||
for _, domain := range snippetDomains {
|
||||
siteInfo, _ := s.db.GetSiteInfo(domain)
|
||||
if siteInfo != nil {
|
||||
for lang, ratio := range siteInfo.Languages {
|
||||
@@ -286,8 +341,7 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// 排序取 Top
|
||||
type kv struct {
|
||||
@@ -320,17 +374,30 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||
langsMap[kv.k] = kv.v
|
||||
}
|
||||
|
||||
resp := map[string]any{
|
||||
result := map[string]any{
|
||||
"total_urls": total,
|
||||
"total_words": totalWords,
|
||||
"total_domains": len(domainCount), // 真实的域名总数(非Top 20)
|
||||
"total_domains": len(domainCount),
|
||||
"domains": domainsMap,
|
||||
"languages": langsMap,
|
||||
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
|
||||
"recrawl_eligible": recrawlEligible, // 已过期、可被重爬的 URL 数量
|
||||
"pending": atomic.LoadInt64(&s.rowCount),
|
||||
"recrawl_eligible": recrawlEligible,
|
||||
}
|
||||
|
||||
json.NewEncoder(w).Encode(resp)
|
||||
s.statsCacheMu.Lock()
|
||||
s.statsCache = result
|
||||
s.statsCacheMu.Unlock()
|
||||
log.Printf("[stats] cache refreshed: %d urls, %d domains, %d words", total, len(domainCount), totalWords)
|
||||
}
|
||||
|
||||
// runStatsCacheRefresher 后台定时刷新 stats 缓存。
|
||||
func (s *Server) runStatsCacheRefresher() {
|
||||
interval := time.Duration(config.StatsRefreshInterval()) * time.Second
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
s.refreshStatsCache()
|
||||
}
|
||||
}
|
||||
|
||||
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
|
||||
@@ -1528,6 +1595,8 @@ func (s *Server) flush() {
|
||||
log.Printf("[harvester] flush write error: %v", err)
|
||||
}
|
||||
log.Printf("[harvester] flush done, %d keys written", len(batch))
|
||||
// flush 完成后立即刷新 stats 缓存(确保数据实时性)
|
||||
go s.refreshStatsCache()
|
||||
}
|
||||
|
||||
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。
|
||||
|
||||
Reference in New Issue
Block a user