From a36c51de1ed178499088d212627a1d561bbd5d67 Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 9 Apr 2026 09:28:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=81=A2=E5=A4=8D=EF=BC=8C=E5=8E=BB=E9=99=A4?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- search/server.go | 241 +++++++++++++++++++---------------------------- 1 file changed, 95 insertions(+), 146 deletions(-) diff --git a/search/server.go b/search/server.go index 715385c..2be27d0 100644 --- a/search/server.go +++ b/search/server.go @@ -3,8 +3,8 @@ package search import ( - "container/heap" // 堆结构(域名交错排序) - "encoding/json" // JSON 序列化(响应输出) + "container/heap" // 堆结构(域名交错排序) + "encoding/json" // JSON 序列化(响应输出) "fmt" // 错误格式化 "io" // 读取请求体 "log" // 日志 @@ -36,16 +36,10 @@ type Server struct { httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) // 以下为收获服务(harvester)内嵌字段 - mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目 - memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞) - rowCount int64 // 内存中累计的索引条目总数(触发刷盘) - flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 - - // 统计缓存(按需更新:请求到来时触发,5s 内有结果则用新值,否则返回旧缓存) - statsCache map[string]any // 缓存的统计结果 - statsCacheMu sync.RWMutex // 保护统计缓存及更新状态 - statsUpdating bool // 是否正在后台更新(防止重复启动) - statsUpdateDone chan struct{} // 当前更新完成时关闭此 channel + mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目 + memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞) + rowCount int64 // 内存中累计的索引条目总数(触发刷盘) + flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 } // New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 @@ -58,7 +52,6 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { httpCli: &http.Client{ Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second, }, - statsCache: make(map[string]any), } // 启动定期刷盘 goroutine go s.runPeriodicFlush() @@ -74,99 +67,6 @@ func (s *Server) runPeriodicFlush() { } } -// triggerStatsUpdate 触发一次后台统计更新(若已在更新中则不重复启动)。 -// 返回一个 channel,调用方可等待本次更新完成。 -func (s *Server) triggerStatsUpdate() <-chan struct{} { - s.statsCacheMu.Lock() - defer s.statsCacheMu.Unlock() - - if s.statsUpdating { - // 已有更新在跑,直接返回其 done channel - return s.statsUpdateDone - } - // 开启新一轮更新 - done := make(chan struct{}) - s.statsUpdating = true - s.statsUpdateDone = done - go func() { - s.updateStatsCache() - s.statsCacheMu.Lock() - s.statsUpdating = false - s.statsCacheMu.Unlock() - close(done) - }() - return done -} - - - -// updateStatsCache 计算统计信息并更新缓存。 -func (s *Server) updateStatsCache() { - domainCount := make(map[string]int) - langCount := make(map[string]int) - totalWords := 0 - total := 0 - - s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { - total++ - domain := netloc(url) - domainCount[domain]++ - totalWords += len(snippet.Text) - - siteInfo, _ := s.db.GetSiteInfo(domain) - if siteInfo != nil { - for lang, ratio := range siteInfo.Languages { - if ratio > 0.1 { - langCount[lang]++ - } - } - } - return nil - }) - - // 排序取 Top - type kv struct{ k string; v int } - topDomains := make([]kv, 0, len(domainCount)) - for k, v := range domainCount { - topDomains = append(topDomains, kv{k, v}) - } - sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v }) - if len(topDomains) > 20 { - topDomains = topDomains[:20] - } - topLangs := make([]kv, 0, len(langCount)) - for k, v := range langCount { - topLangs = append(topLangs, kv{k, v}) - } - sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v }) - if len(topLangs) > 10 { - topLangs = topLangs[:10] - } - - domainsMap := make(map[string]int) - for _, kv := range topDomains { - domainsMap[kv.k] = kv.v - } - langsMap := make(map[string]int) - for _, kv := range topLangs { - langsMap[kv.k] = kv.v - } - - cache := map[string]any{ - "total_urls": total, - "total_words": totalWords, - "total_domains": len(domainCount), - "domains": domainsMap, - "languages": langsMap, - "pending": atomic.LoadInt64(&s.rowCount), - "cached_at": time.Now().Unix(), - } - - s.statsCacheMu.Lock() - s.statsCache = cache - s.statsCacheMu.Unlock() -} - // Flush 公开的刷盘方法,供定时任务和外部调用。 func (s *Server) Flush() { s.flush() } @@ -176,8 +76,8 @@ func (s *Server) Handler() http.Handler { // 搜索路由 mux.HandleFunc("/search", s.handleSearch) // 收获服务路由(爬虫数据写入) - mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据 - mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘 + mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据 + mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘 // 管理接口 mux.HandleFunc("/admin/recent", s.handleAdminRecent) mux.HandleFunc("/admin/stats", s.handleAdminStats) @@ -322,8 +222,8 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { Description: desc, Domain: netloc(e.url), Language: lang, - WordCount: len(e.snippet.Text), - CrawledAt: e.snippet.Timestamp, + WordCount: len(e.snippet.Text), + CrawledAt: e.snippet.Timestamp, }) } @@ -334,25 +234,74 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resp) } -// handleAdminStats 返回全局统计:触发后台更新,5s 内有结果返回新值,否则返回旧缓存。 +// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "application/json; charset=utf-8") - done := s.triggerStatsUpdate() + domainCount := make(map[string]int) + langCount := make(map[string]int) + totalWords := 0 + total := 0 - select { - case <-done: - // 更新完成,返回最新缓存 - case <-time.After(5 * time.Second): - // 超时,使用旧缓存 + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { + total++ + domain := netloc(url) + domainCount[domain]++ + totalWords += len(snippet.Text) + + siteInfo, _ := s.db.GetSiteInfo(domain) + if siteInfo != nil { + for lang, ratio := range siteInfo.Languages { + if ratio > 0.1 { + langCount[lang]++ + } + } + } + return nil + }) + + // 排序取 Top + type kv struct { + k string + v int + } + topDomains := make([]kv, 0, len(domainCount)) + for k, v := range domainCount { + topDomains = append(topDomains, kv{k, v}) + } + sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v }) + if len(topDomains) > 20 { + topDomains = topDomains[:20] + } + topLangs := make([]kv, 0, len(langCount)) + for k, v := range langCount { + topLangs = append(topLangs, kv{k, v}) + } + sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v }) + if len(topLangs) > 10 { + topLangs = topLangs[:10] } - s.statsCacheMu.RLock() - cache := s.statsCache - s.statsCacheMu.RUnlock() + domainsMap := make(map[string]int) + for _, kv := range topDomains { + domainsMap[kv.k] = kv.v + } + langsMap := make(map[string]int) + for _, kv := range topLangs { + langsMap[kv.k] = kv.v + } - json.NewEncoder(w).Encode(cache) + resp := map[string]any{ + "total_urls": total, + "total_words": totalWords, + "total_domains": len(domainCount), // 真实的域名总数(非Top 20) + "domains": domainsMap, + "languages": langsMap, + "pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数 + } + + json.NewEncoder(w).Encode(resp) } // handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。 @@ -458,19 +407,19 @@ func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) { // searchResponse 是搜索 API 的 JSON 响应结构。 type searchResponse struct { - Tokens []string `json:"tokens"` // 查询的分词结果 + Tokens []string `json:"tokens"` // 查询的分词结果 Counts map[string]int `json:"counts"` // 每个词在索引中出现的 URL 数量 Results []searchResult `json:"results"` // 排序后的搜索结果列表 - Total int `json:"total"` // 符合 site: 过滤条件前的总候选数 + Total int `json:"total"` // 符合 site: 过滤条件前的总候选数 } // searchResult 是单条搜索结果。 type searchResult struct { - Score float64 `json:"score"` // 综合排序分数 - URL string `json:"url"` // 页面 URL + Score float64 `json:"score"` // 综合排序分数 + URL string `json:"url"` // 页面 URL Snippet *snippetInfo `json:"snippet,omitempty"` // 摘要信息(标题/描述/正文) - Relevance map[string]float64 `json:"relevance"` // 每个关键词在该 URL 下的权重 - DomainCount int `json:"domain_count"` // 该 URL 所属域名的总候选数 + Relevance map[string]float64 `json:"relevance"` // 每个关键词在该 URL 下的权重 + DomainCount int `json:"domain_count"` // 该 URL 所属域名的总候选数 Factors map[string]float64 `json:"factors,omitempty"` // 各排序因子的详细分数 } @@ -662,18 +611,18 @@ func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]sear // 12 维分数向量:分别记录各项因子,供后续多阶段调整 var vec [12]float64 - vec[0] = score // 0: 综合分数 - vec[1] = rel // 1: 相关性 - vec[2] = prosper // 2: 繁荣值 - vec[3] = 1 - bad // 3: URL 质量 - vec[4] = 1 // 4: 语种倍数(待填充) - vec[5] = 1 // 5: 重复惩罚(待填充) - vec[6] = adjust // 6: 人工调整 - vec[7] = 1 // 7: 网站时间衰减(待填充) - vec[8] = 1 // 8: 连续词加成(待填充) - vec[9] = 1 // 9: 关键词内容(预留) - vec[10] = 1 // 10: URL 时间衰减(待填充) - vec[11] = 0.1 // 11: 常数因子 + vec[0] = score // 0: 综合分数 + vec[1] = rel // 1: 相关性 + vec[2] = prosper // 2: 繁荣值 + vec[3] = 1 - bad // 3: URL 质量 + vec[4] = 1 // 4: 语种倍数(待填充) + vec[5] = 1 // 5: 重复惩罚(待填充) + vec[6] = adjust // 6: 人工调整 + vec[7] = 1 // 7: 网站时间衰减(待填充) + vec[8] = 1 // 8: 连续词加成(待填充) + vec[9] = 1 // 9: 关键词内容(预留) + vec[10] = 1 // 10: URL 时间衰减(待填充) + vec[11] = 0.1 // 11: 常数因子 candidates = append(candidates, candidate{u, rel, vec}) } @@ -761,10 +710,10 @@ func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]sear defer snippetWg.Done() snip := s.getSnippet(cand.url) r := searchResult{ - Score: cand.scoreVec[0], - URL: unescapeURL(cand.url), - Snippet: snip, - Relevance: make(map[string]float64), + Score: cand.scoreVec[0], + URL: unescapeURL(cand.url), + Snippet: snip, + Relevance: make(map[string]float64), DomainCount: 0, Factors: map[string]float64{ "relevance": cand.scoreVec[1], @@ -1021,10 +970,10 @@ func min3(a, b, c int) int { type domainHeap []rerankItem type rerankItem struct { - score float64 - url string + score float64 + url string domainMul float64 // 域名衰减倍数 - vec [12]float64 + vec [12]float64 } func (h domainHeap) Len() int { return len(h) }