diff --git a/harvester/harvester.go b/harvester/harvester.go index c1db5bc..221d335 100644 --- a/harvester/harvester.go +++ b/harvester/harvester.go @@ -7,6 +7,8 @@ package harvester import ( "encoding/json" // JSON 反序列化(解析爬虫请求) + "fmt" // 错误格式化 + "io" // 读取请求体 "log" // 日志输出 "math/rand" // 随机数(打乱合并顺序、触发概率性操作) "net/http" // HTTP 服务端 @@ -42,31 +44,108 @@ func New(db *storage.DB, infoSvc *info.Service) *Server { } } -// ingestPayload 是爬虫发送的 JSON 请求体结构。 +// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。 type ingestPayload struct { - URL string `json:"url"` // 被索引页面的最终 URL + URL string `json:"url"` // 被索引页面的最终 URL Keywords []struct { Word string `json:"word"` // 关键词 Weight float32 `json:"weight"` // 该 URL 在该词下的权重 } `json:"keywords"` } +// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]] +// 兼容处理:Python 端发的是数组,Go 期望的是对象。 +type ingestPayloadLegacy []any + // Handler 返回 HTTP 路由处理器。 func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/l", s.handleIngest) // /l 端点:接收爬虫数据 + mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘(用于手动触发或调试) + mux.HandleFunc("/admin/pending", s.handleAdminPending) // /admin/pending:返回未刷盘数据条数 return mux } +// handleAdminPending 返回内存中未刷盘的索引条目数量。 +func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + count := atomic.LoadInt64(&s.rowCount) + json.NewEncoder(w).Encode(map[string]int64{"pending": count}) +} + +// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。 +func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) { + s.flush() + w.Write([]byte("flushed")) +} + +// Flush 公开的刷盘方法,供外部调用(定时刷盘、退出前刷盘)。 +func (s *Server) Flush() { s.flush() } + +// parsePayload 解析爬虫请求体,兼容新旧两种格式。 +// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]} +// 旧格式(Python 爬虫):["url", [["word", weight], ...]] +func parsePayload(r *http.Request) (*ingestPayload, error) { + // 读取 body 为 bytes(支持重读) + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + return nil, err + } + + // 先尝试新格式(Go 爬虫) + var modern ingestPayload + if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" { + return &modern, nil + } + + // 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]] + var legacy ingestPayloadLegacy + if err := json.Unmarshal(body, &legacy); err != nil { + return nil, fmt.Errorf("invalid payload: %w", err) + } + if len(legacy) < 2 { + return nil, fmt.Errorf("invalid legacy payload: too few elements") + } + url, ok := legacy[0].(string) + if !ok { + return nil, fmt.Errorf("invalid url type") + } + kwsRaw, ok := legacy[1].([]any) + if !ok { + return nil, fmt.Errorf("invalid keywords type") + } + payload := &ingestPayload{URL: url} + for _, kw := range kwsRaw { + kwSlice, ok := kw.([]any) + if !ok || len(kwSlice) < 2 { + continue + } + word, _ := kwSlice[0].(string) + weight, _ := kwSlice[1].(float64) + if word == "" { + continue + } + payload.Keywords = append(payload.Keywords, struct { + Word string `json:"word"` + Weight float32 `json:"weight"` + }{word, float32(weight)}) + } + if payload.URL == "" { + return nil, fmt.Errorf("empty url after parsing") + } + return payload, nil +} + // handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。 func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } - var payload ingestPayload - if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { - http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest) + payload, err := parsePayload(r) + if err != nil { + http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest) return } diff --git a/main.go b/main.go index 16d5f67..a390b64 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "os" // 操作系统信号 "os/signal" // 信号捕获 "syscall" // 系统调用(SIGTERM) + "time" // 定期刷盘定时器 "sese-engine/analyzer" // 文本分析和关键词提取 "sese-engine/backlink" // 反向链接(繁荣值)计算 @@ -67,8 +68,17 @@ func main() { } }() + // ---- 4b. 定期刷盘:每 60 秒强制将内存索引写入磁盘,确保搜索实时可用 ---- + go func() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + harvSrv.Flush() + } + }() + // ---- 5. 搜索服务器(默认 :80):对外提供搜索 API ---- - searchSrv := search.New(db, infoSvc, anal) + searchSrv := search.New(db, infoSvc, anal, "http://localhost:5000") go func() { addr := fmt.Sprintf(":%d", config.SearchServerPort) if err := searchSrv.ListenAndServe(addr); err != nil { @@ -93,5 +103,6 @@ func main() { quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit - log.Println("shutdown signal received, exiting...") + log.Println("shutdown signal received, flushing index...") + harvSrv.Flush() // 退出前刷盘,不丢数据 } diff --git a/search/server.go b/search/server.go index 40f1384..6fdaed1 100644 --- a/search/server.go +++ b/search/server.go @@ -5,6 +5,7 @@ package search import ( "container/heap" // 堆结构(域名交错排序) "encoding/json" // JSON 序列化(响应输出) + "io" // 代理响应复制 "log" // 日志 "math" // 数学运算(Log、幂) "net/http" // HTTP 服务端 @@ -25,18 +26,21 @@ import ( // Server 是搜索 HTTP 服务器。 type Server struct { - db *storage.DB - infoSvc *info.Service - analyzer *analyzer.Analyzer - httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) + db *storage.DB + infoSvc *info.Service + analyzer *analyzer.Analyzer + httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) + harvesterURL string // 收获服务器地址(如 "http://localhost:5000") } // New 创建一个 search Server。 -func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { +// harvesterURL 为收获服务器的地址,用于代理刷盘和状态查询。 +func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer, harvesterURL string) *Server { return &Server{ - db: db, - infoSvc: infoSvc, - analyzer: a, + db: db, + infoSvc: infoSvc, + analyzer: a, + harvesterURL: harvesterURL, httpCli: &http.Client{ Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second, }, @@ -50,6 +54,7 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/admin/recent", s.handleAdminRecent) mux.HandleFunc("/admin/stats", s.handleAdminStats) mux.HandleFunc("/admin/priority", s.handleAdminPriority) + mux.HandleFunc("/admin/flush", s.handleAdminFlush) return mux } @@ -199,6 +204,19 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { "domains": domainsMap, "languages": langsMap, } + + // 从 harvester 代理获取未刷盘数据条数 + if s.harvesterURL != "" { + req, _ := http.NewRequest(http.MethodGet, s.harvesterURL+"/admin/pending", nil) + if proxyResp, err := s.httpCli.Do(req); err == nil { + defer proxyResp.Body.Close() + var pendingResp map[string]int64 + if err := json.NewDecoder(proxyResp.Body).Decode(&pendingResp); err == nil { + resp["pending"] = pendingResp["pending"] + } + } + } + json.NewEncoder(w).Encode(resp) } @@ -282,6 +300,33 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) { } } +// handleAdminFlush 代理到 harvester 的 /flush 接口,执行刷盘。 +func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + if r.Method != http.MethodGet && r.Method != http.MethodPost { + http.Error(w, `{"error":"method not allowed"}`, 405) + return + } + + // 代理请求到 harvester + proxyURL := s.harvesterURL + "/flush" + req, err := http.NewRequest(http.MethodGet, proxyURL, nil) + if err != nil { + http.Error(w, `{"error":"`+err.Error()+`"}`, 500) + return + } + resp, err := s.httpCli.Do(req) + if err != nil { + http.Error(w, `{"error":"harvester unreachable: `+err.Error()+`"}`, 502) + return + } + defer resp.Body.Close() + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) +} + // ---- 搜索处理器 ---- // searchResponse 是搜索 API 的 JSON 响应结构。 @@ -874,13 +919,28 @@ func rerank(candidates []candidate, from, to int) []candidate { heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec}) } - // 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够 + // 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够 to 条, + // 然后截取 [from:to] 段返回。 var result []candidate - for h.Len() > 0 && len(result) < to { - item := heap.Pop(h).(rerankItem) - if len(result) >= from { - result = append(result, candidate{url: item.url, scoreVec: item.vec}) + for len(result) < to { + if h.Len() == 0 { + // 堆为空时,将所有域名剩余条目依次推入堆(每域一条) + anyPushed := false + for domain, items := range domainItems { + if len(items) == 0 { + continue + } + next := items[len(items)-1] + domainItems[domain] = items[:len(items)-1] + heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec}) + anyPushed = true + } + if !anyPushed { + break // 所有域名都没有剩余条目,结束 + } } + item := heap.Pop(h).(rerankItem) + result = append(result, candidate{url: item.url, scoreVec: item.vec}) domain := netloc(item.url) domainMul[domain] /= 8 // 该域名的下一次出现衰减到 1/8 remaining := domainItems[domain] @@ -890,7 +950,11 @@ func rerank(candidates []candidate, from, to int) []candidate { heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec}) } } - return result + // 截取分页段 + if from >= len(result) { + return nil + } + return result[from:] } // ---- 杂项辅助函数 ----