diff --git a/config/config.go b/config/config.go index 528b1af..4325711 100644 --- a/config/config.go +++ b/config/config.go @@ -34,7 +34,8 @@ const ( LanguageWeight = 0.5 // 语种匹配权重:与查询语种一致时加分 ConsecutiveKeyWeight = 1.3 // 连续关键词命中权重:多词连续出现时加分 BacklinkWeight = 1.0 // 反向链接权重:指向该 URL 的链接越多得分越高 - SearchServerPort = 80 // 搜索服务的 HTTP 监听端口 + SearchServerPort = 80 // 搜索服务和收获服务的统一 HTTP 监听端口 + FlushIntervalSeconds = 60 // 定期刷盘间隔(秒):将内存索引批量写入磁盘 ) // Backlink computation @@ -50,12 +51,7 @@ const StoragePath = "./savedata" // Prometheus ports // 各模块 Prometheus 监控指标的 HTTP 端口 const ( - PromPortCrawler = 14950 // 爬虫模块的 metrics 端口 - PromPortHarvester = 14951 // 收获服务器模块的 metrics 端口 - PromPortBacklink = 14952 // 反向链接计算模块的 metrics 端口 - PromPortSearch = 14953 // 搜索服务模块的 metrics 端口 + PromPortCrawler = 14950 // 爬虫模块的 metrics 端口 + PromPortBacklink = 14952 // 反向链接计算模块的 metrics 端口 + PromPortSearch = 14953 // 搜索服务(含收获功能)模块的 metrics 端口 ) - -// Harvester HTTP endpoint -// 爬虫向收获服务器发送索引数据的 HTTP 端点地址 -const HarvesterAddr = "http://127.0.0.1:5000" diff --git a/crawler/crawler.go b/crawler/crawler.go index 0bdb8b8..a76d1ef 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -5,11 +5,12 @@ package crawler import ( "bytes" // 字节缓冲(构造 HTTP POST 请求体) "context" // context 超时控制 - "encoding/json" // JSON 序列化(发送关键词数据到 harvester) + "encoding/json" // JSON 序列化(发送关键词数据到收获服务) + "fmt" // 格式化(构造目标地址) "log" // 日志输出 "math" // 数学运算(指数衰减、质量评分) "math/rand" // 随机数(加权采样、队列打乱) - "net/http" // HTTP 客户端(POST 数据到 harvester) + "net/http" // HTTP 客户端(POST 数据到收获服务) "net/url" // URL 解析 "strings" // 字符串操作 "sync" // 互斥锁(保护并发收集结果) @@ -316,8 +317,8 @@ func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, _ = c.db.SetSiteInfo(host, info) } -// sendToHarvester 将关键词索引数据通过 HTTP POST 发送到收获服务器(:5000/l 端点)。 -// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因 harvester 故障而堆积。 +// sendToHarvester 将关键词索引数据通过 HTTP POST 发送到搜索服务器(/l 端点)。 +// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因收获服务故障而堆积。 func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { now := time.Now().Unix() @@ -355,7 +356,7 @@ func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, "POST", config.HarvesterAddr+"/l", bytes.NewReader(data)) + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d/l", config.SearchServerPort), bytes.NewReader(data)) if err != nil { return } @@ -370,7 +371,7 @@ func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { if failures >= circuitFailureThreshold { atomic.StoreInt32(&c.circuitState, circuitOpen) atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) - log.Printf("[crawler] circuit OPEN: harvester unreachable (%d failures), cooling for %ds", + log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds", failures, circuitCooldownSeconds) } return diff --git a/harvester/harvester.go b/harvester/harvester.go deleted file mode 100644 index 221d335..0000000 --- a/harvester/harvester.go +++ /dev/null @@ -1,422 +0,0 @@ -// Package harvester implements the index-writing server (port 5000). -// 收获服务器包:接收爬虫发送的关键词索引数据,批量写入 bbolt 持久化存储。 -// -// 工作流程:爬虫每抓取一个页面,将 (URL, 关键词列表) 通过 HTTP POST 发送到本服务; -// 本服务先将数据积累在内存中,当内存中索引条目数量超过阈值时,批量合并到磁盘索引。 -package harvester - -import ( - "encoding/json" // JSON 反序列化(解析爬虫请求) - "fmt" // 错误格式化 - "io" // 读取请求体 - "log" // 日志输出 - "math/rand" // 随机数(打乱合并顺序、触发概率性操作) - "net/http" // HTTP 服务端 - "strings" // 字符串操作(URL 清洗) - "sync" // 互斥锁(保护内存索引、防止并发刷盘) - "sync/atomic" // 原子操作(计数器) - - "sese-engine/config" // 全局配置(刷盘阈值、URL 上限) - "sese-engine/info" // info 服务(查询繁荣分数用于裁剪) - "sese-engine/storage" // 持久化存储 -) - -// Server 是收获 HTTP 服务器,负责接收爬虫数据、内存聚合、批量写入。 -type Server struct { - db *storage.DB - - // 内存索引聚合器:关键词 → 该词关联的 [权重, URL] 条目列表 - mem map[string][]storage.IndexEntry - memMu sync.Mutex // 保护内存索引的并发写入 - - rowCount int64 // 内存中累计的索引条目总数(用于触发刷盘) - flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 - - infoSvc *info.Service // info 服务:用于查询繁荣分数来决定索引裁剪优先级 -} - -// New 创建一个 harvester Server 实例。 -func New(db *storage.DB, infoSvc *info.Service) *Server { - return &Server{ - db: db, - mem: make(map[string][]storage.IndexEntry), - infoSvc: infoSvc, - } -} - -// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。 -type ingestPayload struct { - URL string `json:"url"` // 被索引页面的最终 URL - Keywords []struct { - Word string `json:"word"` // 关键词 - Weight float32 `json:"weight"` // 该 URL 在该词下的权重 - } `json:"keywords"` -} - -// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]] -// 兼容处理:Python 端发的是数组,Go 期望的是对象。 -type ingestPayloadLegacy []any - -// Handler 返回 HTTP 路由处理器。 -func (s *Server) Handler() http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/l", s.handleIngest) // /l 端点:接收爬虫数据 - mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘(用于手动触发或调试) - mux.HandleFunc("/admin/pending", s.handleAdminPending) // /admin/pending:返回未刷盘数据条数 - return mux -} - -// handleAdminPending 返回内存中未刷盘的索引条目数量。 -func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Content-Type", "application/json; charset=utf-8") - count := atomic.LoadInt64(&s.rowCount) - json.NewEncoder(w).Encode(map[string]int64{"pending": count}) -} - -// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。 -func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) { - s.flush() - w.Write([]byte("flushed")) -} - -// Flush 公开的刷盘方法,供外部调用(定时刷盘、退出前刷盘)。 -func (s *Server) Flush() { s.flush() } - -// parsePayload 解析爬虫请求体,兼容新旧两种格式。 -// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]} -// 旧格式(Python 爬虫):["url", [["word", weight], ...]] -func parsePayload(r *http.Request) (*ingestPayload, error) { - // 读取 body 为 bytes(支持重读) - body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) - if err != nil { - return nil, err - } - - // 先尝试新格式(Go 爬虫) - var modern ingestPayload - if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" { - return &modern, nil - } - - // 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]] - var legacy ingestPayloadLegacy - if err := json.Unmarshal(body, &legacy); err != nil { - return nil, fmt.Errorf("invalid payload: %w", err) - } - if len(legacy) < 2 { - return nil, fmt.Errorf("invalid legacy payload: too few elements") - } - url, ok := legacy[0].(string) - if !ok { - return nil, fmt.Errorf("invalid url type") - } - kwsRaw, ok := legacy[1].([]any) - if !ok { - return nil, fmt.Errorf("invalid keywords type") - } - payload := &ingestPayload{URL: url} - for _, kw := range kwsRaw { - kwSlice, ok := kw.([]any) - if !ok || len(kwSlice) < 2 { - continue - } - word, _ := kwSlice[0].(string) - weight, _ := kwSlice[1].(float64) - if word == "" { - continue - } - payload.Keywords = append(payload.Keywords, struct { - Word string `json:"word"` - Weight float32 `json:"weight"` - }{word, float32(weight)}) - } - if payload.URL == "" { - return nil, fmt.Errorf("empty url after parsing") - } - return payload, nil -} - -// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。 -func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - payload, err := parsePayload(r) - if err != nil { - http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest) - return - } - - // 清洗 URL:去除换行符(防止注入) - payload.URL = strings.ReplaceAll(payload.URL, "\n", "") - if payload.URL == "" { - http.Error(w, "empty url", http.StatusBadRequest) - return - } - - s.memMu.Lock() - for _, kw := range payload.Keywords { - key := kw.Word - entries := s.mem[key] - - // 阈值提前过滤:若该词已有大量条目,则只接受权重足够高的新条目 - if len(entries) > 15 { - low := s.lowThreshold(key) - if float64(kw.Weight) < low { - continue - } - } - s.mem[key] = append(entries, storage.IndexEntry{ - Weight: kw.Weight, - URL: payload.URL, - }) - atomic.AddInt64(&s.rowCount, 1) - } - s.memMu.Unlock() - - // 当内存条目数超过阈值时,异步触发刷盘 - if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) { - go s.flush() - } - - w.Write([]byte("ok")) -} - -// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。 -// 计算方式:找到磁盘上该词第 MaxURLsPerKey 高权重值,取其 5% 作为阈值。 -func (s *Server) lowThreshold(key string) float64 { - existing, _ := s.db.GetIndex(key) - if len(existing) < config.MaxURLsPerKey { - return -1 // 未达上限,所有条目都接受 - } - // 收集所有权重值 - weights := make([]float64, len(existing)) - for i, e := range existing { - weights[i] = float64(e.Weight) - } - // 找第 MaxURLsPerKey-1 大的值(即准入门槛) - return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05 -} - -// flush 将内存中的索引批量合并写入磁盘,然后清空内存。 -// 整个过程:原子快照 → 并行合并 → 批量写入。 -func (s *Server) flush() { - // TryLock:若已有其他 flush 在执行则直接退出 - if !s.flushMu.TryLock() { - return - } - defer s.flushMu.Unlock() - - // 原子快照:取出当前内存数据并立即重置 - s.memMu.Lock() - snapshot := s.mem - s.mem = make(map[string][]storage.IndexEntry) - atomic.StoreInt64(&s.rowCount, 0) - s.memMu.Unlock() - - log.Printf("[harvester] flushing %d keys", len(snapshot)) - - // 转换为切片便于处理,打乱顺序防止热点词优先处理导致堆积 - items := make([]struct { - key string - entries []storage.IndexEntry - }, 0, len(snapshot)) - for k, v := range snapshot { - items = append(items, struct { - key string - entries []storage.IndexEntry - }{k, v}) - } - rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] }) - - // 并行合并:每个关键词独立合并到磁盘 - type result struct { - key string - entries []storage.IndexEntry - } - results := make(chan result, len(items)) - sem := make(chan struct{}, 8) // 最多 8 个并发合并协程 - - for _, item := range items { - sem <- struct{}{} - go func(k string, newEntries []storage.IndexEntry) { - defer func() { <-sem }() - merged := s.mergeKey(k, newEntries) - results <- result{k, merged} - }(item.key, item.entries) - } - - // 收集所有合并结果 - batch := make(map[string][]storage.IndexEntry, len(items)) - for range items { - r := <-results - batch[r.key] = r.entries - } - - // 批量写入 bbolt(一次事务写入所有关键词) - if err := s.db.BatchSetIndex(batch); err != nil { - log.Printf("[harvester] flush write error: %v", err) - } - log.Printf("[harvester] flush done, %d keys written", len(batch)) -} - -// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。 -// 包含:去重 → 概率性 URL 归一化去重 → 超限时按繁荣分数裁剪。 -func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { - existing, _ := s.db.GetIndex(key) - - // 新关键词:如果条目数过少则丢弃(避免索引质量下降) - if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey { - return nil - } - - // 合并新旧条目 - merged := dedup(append(newEntries, existing...)) - - // 2% 概率执行 URL 归一化去重(去除 https/http 重复、尾部斜杠差异等) - if rand.Float64() < 0.02 { - merged = dedupNormalised(merged) - } - - // 超限或 2% 概率触发裁剪:按 (权重 × 繁荣分数) 排序后截断 - if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 { - merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey) - } - - return merged -} - -// ---- 辅助函数 ---- - -// dedup 按 URL 完全匹配去重。 -func dedup(entries []storage.IndexEntry) []storage.IndexEntry { - seen := make(map[string]bool, len(entries)) - out := make([]storage.IndexEntry, 0, len(entries)) - for _, e := range entries { - if seen[e.URL] { - continue - } - seen[e.URL] = true - out = append(out, e) - } - return out -} - -// dedupNormalised 按 URL 归一化去重(去除协议前缀和尾部斜杠后比较)。 -// 按 URL 长度降序排序后处理:短 URL 优先保留(更可能是规范 URL)。 -func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry { - sorted := make([]storage.IndexEntry, len(entries)) - copy(sorted, entries) - // 降序排列(简单冒泡) - for i := 0; i < len(sorted)-1; i++ { - for j := i + 1; j < len(sorted); j++ { - if len(sorted[j].URL) > len(sorted[i].URL) { - sorted[i], sorted[j] = sorted[j], sorted[i] - } - } - } - seen := make(map[string]bool) - out := make([]storage.IndexEntry, 0, len(sorted)) - for _, e := range sorted { - k := normaliseURL(e.URL) - if seen[k] { - continue - } - seen[k] = true - out = append(out, e) - } - return out -} - -// normaliseURL 归一化 URL:去除协议前缀,尾部斜杠去除。 -func normaliseURL(u string) string { - if strings.HasPrefix(u, "https://") { - u = u[8:] - } else if strings.HasPrefix(u, "http://") { - u = u[7:] - } - return strings.TrimRight(u, "/") -} - -// trim 将条目列表裁剪到指定上限,同时限制每个域名的最大条目数。 -// 排序依据:(权重 × (1 + 繁荣分数)),使高权重且高繁荣的 URL 优先保留。 -func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry { - // 按综合分数降序排列 - scored := make([]storage.IndexEntry, len(entries)) - copy(scored, entries) - for i := 0; i < len(scored)-1; i++ { - for j := i + 1; j < len(scored); j++ { - si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL)) - sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL)) - if sj > si { - scored[i], scored[j] = scored[j], scored[i] - } - } - } - - // 按域名计数,每个域名最多保留 sameDomainLimit 条(首页 URL 不受限制) - domainCount := make(map[string]int) - out := make([]storage.IndexEntry, 0, limit) - for _, e := range scored { - host := netloc(e.URL) - if host == "" { - host = e.URL - } - host = strings.ToLower(host) - isHome := isHomepage(e.URL) // 首页 URL 不受域名数量限制 - if !isHome && domainCount[host] >= sameDomainLimit { - continue - } - domainCount[host]++ - out = append(out, e) - if len(out) >= limit { - break - } - } - return out -} - -// isHomepage 判断 URL 是否为网站首页(不含路径层级)。 -func isHomepage(u string) bool { - u = strings.TrimPrefix(u, "https://") - u = strings.TrimPrefix(u, "http://") - return strings.Count(strings.TrimRight(u, "/"), "/") == 0 -} - -// netloc 从 URL 提取主机名(简化版,不依赖 net/url)。 -func netloc(rawURL string) string { - parts := strings.SplitN(rawURL, "/", 4) - if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { - return parts[2] - } - return "" -} - -// nthLargest 返回切片中第 n 大的值(0-indexed,即找第 n+1 大的值)。 -// 用于获取准入权重阈值。 -func nthLargest(values []float64, n int) float64 { - if n >= len(values) { - return 0 - } - cp := make([]float64, len(values)) - copy(cp, values) - // 部分排序:只需将前 n+1 项排好序 - for i := 0; i <= n; i++ { - maxIdx := i - for j := i + 1; j < len(cp); j++ { - if cp[j] > cp[maxIdx] { - maxIdx = j - } - } - cp[i], cp[maxIdx] = cp[maxIdx], cp[i] - } - return cp[n] -} - -// ListenAndServe 启动收获服务器在指定地址监听。 -func (s *Server) ListenAndServe(addr string) error { - log.Printf("[harvester] listening on %s", addr) - return http.ListenAndServe(addr, s.Handler()) -} diff --git a/main.go b/main.go index a390b64..54e36d3 100644 --- a/main.go +++ b/main.go @@ -16,15 +16,13 @@ import ( "os" // 操作系统信号 "os/signal" // 信号捕获 "syscall" // 系统调用(SIGTERM) - "time" // 定期刷盘定时器 "sese-engine/analyzer" // 文本分析和关键词提取 "sese-engine/backlink" // 反向链接(繁荣值)计算 "sese-engine/config" // 全局配置 "sese-engine/crawler" // BFS 爬虫 - "sese-engine/harvester" // 收获服务器(索引写入) "sese-engine/info" // info 服务(繁荣表、调整表、屏蔽词) - "sese-engine/search" // 搜索服务器 + "sese-engine/search" // 搜索服务器(内嵌收获服务) "sese-engine/storage" // 持久化存储 ) @@ -60,25 +58,8 @@ func main() { } defer anal.Close() - // ---- 4. 收获服务器(:5000):接收爬虫发来的索引数据 ---- - harvSrv := harvester.New(db, infoSvc) - go func() { - if err := harvSrv.ListenAndServe(":5000"); err != nil { - log.Fatalf("[harvester] fatal: %v", err) - } - }() - - // ---- 4b. 定期刷盘:每 60 秒强制将内存索引写入磁盘,确保搜索实时可用 ---- - go func() { - ticker := time.NewTicker(60 * time.Second) - defer ticker.Stop() - for range ticker.C { - harvSrv.Flush() - } - }() - - // ---- 5. 搜索服务器(默认 :80):对外提供搜索 API ---- - searchSrv := search.New(db, infoSvc, anal, "http://localhost:5000") + // ---- 4. 搜索服务器(默认 :80):对外提供搜索 API,同时内嵌收获服务(统一端口) + searchSrv := search.New(db, infoSvc, anal) go func() { addr := fmt.Sprintf(":%d", config.SearchServerPort) if err := searchSrv.ListenAndServe(addr); err != nil { @@ -104,5 +85,5 @@ func main() { signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit log.Println("shutdown signal received, flushing index...") - harvSrv.Flush() // 退出前刷盘,不丢数据 + searchSrv.Flush() // 退出前刷盘,不丢数据 } diff --git a/search/server.go b/search/server.go index 6fdaed1..b7133cf 100644 --- a/search/server.go +++ b/search/server.go @@ -3,58 +3,86 @@ package search import ( - "container/heap" // 堆结构(域名交错排序) - "encoding/json" // JSON 序列化(响应输出) - "io" // 代理响应复制 - "log" // 日志 - "math" // 数学运算(Log、幂) - "net/http" // HTTP 服务端 - "net/url" // URL 解析 - "regexp" // 正则表达式(site: 过滤语法) - "sort" // 排序 - "strconv" // 字符串转整数 - "strings" // 字符串操作 - "sync" // 互斥锁(保护并发切片写入) - "time" // 时间戳 + "container/heap" // 堆结构(域名交错排序) + "encoding/json" // JSON 序列化(响应输出) + "fmt" // 错误格式化 + "io" // 读取请求体 + "log" // 日志 + "math" // 数学运算(Log、幂) + "math/rand" // 随机数(刷盘时打乱顺序、概率性去重/裁剪) + "net/http" // HTTP 服务端 + "net/url" // URL 解析 + "regexp" // 正则表达式(site: 过滤语法) + "sort" // 排序 + "strconv" // 字符串转整数 + "strings" // 字符串操作(URL 清洗) + "sync" // 互斥锁(保护内存索引、并发切片写入) + "sync/atomic" // 原子操作(计数器) + "time" // 时间戳 - "sese-engine/analyzer" // 分词和语种检测 + "sese-engine/analyzer" // 分词和语言检测 "sese-engine/config" // 排序权重配置 - "sese-engine/info" // info 服务 - "sese-engine/parser" // HTML 解析(在线摘要) - "sese-engine/storage" // 持久化存储 + "sese-engine/info" // info 服务 + "sese-engine/parser" // HTML 解析(在线摘要) + "sese-engine/storage" // 持久化存储 ) -// Server 是搜索 HTTP 服务器。 +// Server 是搜索 HTTP 服务器,同时内嵌收获服务(统一在同一端口)。 type Server struct { - db *storage.DB - infoSvc *info.Service - analyzer *analyzer.Analyzer - httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) - harvesterURL string // 收获服务器地址(如 "http://localhost:5000") + db *storage.DB + infoSvc *info.Service + analyzer *analyzer.Analyzer + httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) + + // 以下为收获服务(harvester)内嵌字段 + mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目 + memMu sync.Mutex // 保护内存索引的并发写入 + rowCount int64 // 内存中累计的索引条目总数(触发刷盘) + flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 } -// New 创建一个 search Server。 -// harvesterURL 为收获服务器的地址,用于代理刷盘和状态查询。 -func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer, harvesterURL string) *Server { - return &Server{ - db: db, - infoSvc: infoSvc, - analyzer: a, - harvesterURL: harvesterURL, +// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 +func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { + s := &Server{ + db: db, + infoSvc: infoSvc, + analyzer: a, + mem: make(map[string][]storage.IndexEntry), httpCli: &http.Client{ Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second, }, } + // 启动定期刷盘 goroutine + go s.runPeriodicFlush() + return s } -// Handler 返回 HTTP 路由处理器。 +// runPeriodicFlush 每隔 FlushIntervalSeconds 秒触发一次刷盘。 +func (s *Server) runPeriodicFlush() { + ticker := time.NewTicker(time.Duration(config.FlushIntervalSeconds) * time.Second) + defer ticker.Stop() + for range ticker.C { + s.Flush() + } +} + +// Flush 公开的刷盘方法,供定时任务和外部调用。 +func (s *Server) Flush() { s.flush() } + +// Handler 返回 HTTP 路由处理器(统一端口,同时提供搜索和收获服务)。 func (s *Server) Handler() http.Handler { mux := http.NewServeMux() + // 搜索路由 mux.HandleFunc("/search", s.handleSearch) + // 收获服务路由(爬虫数据写入) + mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据 + mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘 + // 管理接口 mux.HandleFunc("/admin/recent", s.handleAdminRecent) mux.HandleFunc("/admin/stats", s.handleAdminStats) mux.HandleFunc("/admin/priority", s.handleAdminPriority) mux.HandleFunc("/admin/flush", s.handleAdminFlush) + mux.HandleFunc("/admin/pending", s.handleAdminPending) return mux } @@ -203,18 +231,7 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { "total_words": totalWords, "domains": domainsMap, "languages": langsMap, - } - - // 从 harvester 代理获取未刷盘数据条数 - if s.harvesterURL != "" { - req, _ := http.NewRequest(http.MethodGet, s.harvesterURL+"/admin/pending", nil) - if proxyResp, err := s.httpCli.Do(req); err == nil { - defer proxyResp.Body.Close() - var pendingResp map[string]int64 - if err := json.NewDecoder(proxyResp.Body).Decode(&pendingResp); err == nil { - resp["pending"] = pendingResp["pending"] - } - } + "pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数 } json.NewEncoder(w).Encode(resp) @@ -300,31 +317,23 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) { } } -// handleAdminFlush 代理到 harvester 的 /flush 接口,执行刷盘。 +// handleAdminFlush 强制刷盘。 func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Content-Type", "application/json; charset=utf-8") - if r.Method != http.MethodGet && r.Method != http.MethodPost { http.Error(w, `{"error":"method not allowed"}`, 405) return } + s.Flush() + w.Write([]byte("flushed")) +} - // 代理请求到 harvester - proxyURL := s.harvesterURL + "/flush" - req, err := http.NewRequest(http.MethodGet, proxyURL, nil) - if err != nil { - http.Error(w, `{"error":"`+err.Error()+`"}`, 500) - return - } - resp, err := s.httpCli.Do(req) - if err != nil { - http.Error(w, `{"error":"harvester unreachable: `+err.Error()+`"}`, 502) - return - } - defer resp.Body.Close() - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) +// handleAdminPending 返回内存中未刷盘的索引条目数量。 +func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + count := atomic.LoadInt64(&s.rowCount) + json.NewEncoder(w).Encode(map[string]int64{"pending": count}) } // ---- 搜索处理器 ---- @@ -1022,3 +1031,294 @@ func min(a, b int) int { } return b } + +// ---- 以下为内嵌的收获服务(harvester)逻辑 ---- + +// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。 +type ingestPayload struct { + URL string `json:"url"` // 被索引页面的最终 URL + Keywords []struct { + Word string `json:"word"` // 关键词 + Weight float32 `json:"weight"` // 该 URL 在该词下的权重 + } `json:"keywords"` +} + +// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]] +type ingestPayloadLegacy []any + +// parsePayload 解析爬虫请求体,兼容新旧两种格式。 +// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]} +// 旧格式(Python 爬虫):["url", [["word", weight], ...]] +func parsePayload(r *http.Request) (*ingestPayload, error) { + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + return nil, err + } + // 先尝试新格式(Go 爬虫) + var modern ingestPayload + if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" { + return &modern, nil + } + // 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]] + var legacy ingestPayloadLegacy + if err := json.Unmarshal(body, &legacy); err != nil { + return nil, fmt.Errorf("invalid payload: %w", err) + } + if len(legacy) < 2 { + return nil, fmt.Errorf("invalid legacy payload: too few elements") + } + url, ok := legacy[0].(string) + if !ok { + return nil, fmt.Errorf("invalid url type") + } + kwsRaw, ok := legacy[1].([]any) + if !ok { + return nil, fmt.Errorf("invalid keywords type") + } + payload := &ingestPayload{URL: url} + for _, kw := range kwsRaw { + kwSlice, ok := kw.([]any) + if !ok || len(kwSlice) < 2 { + continue + } + word, _ := kwSlice[0].(string) + weight, _ := kwSlice[1].(float64) + if word == "" { + continue + } + payload.Keywords = append(payload.Keywords, struct { + Word string `json:"word"` + Weight float32 `json:"weight"` + }{word, float32(weight)}) + } + if payload.URL == "" { + return nil, fmt.Errorf("empty url after parsing") + } + return payload, nil +} + +// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。 +func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + payload, err := parsePayload(r) + if err != nil { + http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest) + return + } + payload.URL = strings.ReplaceAll(payload.URL, "\n", "") + if payload.URL == "" { + http.Error(w, "empty url", http.StatusBadRequest) + return + } + s.memMu.Lock() + for _, kw := range payload.Keywords { + key := kw.Word + entries := s.mem[key] + if len(entries) > 15 { + low := s.lowThreshold(key) + if float64(kw.Weight) < low { + continue + } + } + s.mem[key] = append(entries, storage.IndexEntry{ + Weight: kw.Weight, + URL: payload.URL, + }) + atomic.AddInt64(&s.rowCount, 1) + } + s.memMu.Unlock() + if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) { + go s.Flush() + } + w.Write([]byte("ok")) +} + +// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。 +func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) { + s.Flush() + w.Write([]byte("flushed")) +} + +// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。 +func (s *Server) lowThreshold(key string) float64 { + existing, _ := s.db.GetIndex(key) + if len(existing) < config.MaxURLsPerKey { + return -1 + } + weights := make([]float64, len(existing)) + for i, e := range existing { + weights[i] = float64(e.Weight) + } + return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05 +} + +// flush 将内存中的索引批量合并写入磁盘,然后清空内存。 +func (s *Server) flush() { + if !s.flushMu.TryLock() { + return + } + defer s.flushMu.Unlock() + s.memMu.Lock() + snapshot := s.mem + s.mem = make(map[string][]storage.IndexEntry) + atomic.StoreInt64(&s.rowCount, 0) + s.memMu.Unlock() + log.Printf("[harvester] flushing %d keys", len(snapshot)) + items := make([]struct { + key string + entries []storage.IndexEntry + }, 0, len(snapshot)) + for k, v := range snapshot { + items = append(items, struct { + key string + entries []storage.IndexEntry + }{k, v}) + } + rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] }) + type result struct { + key string + entries []storage.IndexEntry + } + results := make(chan result, len(items)) + sem := make(chan struct{}, 8) + for _, item := range items { + sem <- struct{}{} + go func(k string, newEntries []storage.IndexEntry) { + defer func() { <-sem }() + merged := s.mergeKey(k, newEntries) + results <- result{k, merged} + }(item.key, item.entries) + } + batch := make(map[string][]storage.IndexEntry, len(items)) + for range items { + r := <-results + batch[r.key] = r.entries + } + if err := s.db.BatchSetIndex(batch); err != nil { + log.Printf("[harvester] flush write error: %v", err) + } + log.Printf("[harvester] flush done, %d keys written", len(batch)) +} + +// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。 +func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { + existing, _ := s.db.GetIndex(key) + if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey { + return nil + } + merged := dedup(append(newEntries, existing...)) + if rand.Float64() < 0.02 { + merged = dedupNormalised(merged) + } + if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 { + merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey) + } + return merged +} + +// ---- 收获服务辅助函数 ---- + +func dedup(entries []storage.IndexEntry) []storage.IndexEntry { + seen := make(map[string]bool, len(entries)) + out := make([]storage.IndexEntry, 0, len(entries)) + for _, e := range entries { + if seen[e.URL] { + continue + } + seen[e.URL] = true + out = append(out, e) + } + return out +} + +func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry { + sorted := make([]storage.IndexEntry, len(entries)) + copy(sorted, entries) + for i := 0; i < len(sorted)-1; i++ { + for j := i + 1; j < len(sorted); j++ { + if len(sorted[j].URL) > len(sorted[i].URL) { + sorted[i], sorted[j] = sorted[j], sorted[i] + } + } + } + seen := make(map[string]bool) + out := make([]storage.IndexEntry, 0, len(sorted)) + for _, e := range sorted { + k := normaliseURL(e.URL) + if seen[k] { + continue + } + seen[k] = true + out = append(out, e) + } + return out +} + +func normaliseURL(u string) string { + if strings.HasPrefix(u, "https://") { + u = u[8:] + } else if strings.HasPrefix(u, "http://") { + u = u[7:] + } + return strings.TrimRight(u, "/") +} + +func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry { + scored := make([]storage.IndexEntry, len(entries)) + copy(scored, entries) + for i := 0; i < len(scored)-1; i++ { + for j := i + 1; j < len(scored); j++ { + si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL)) + sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL)) + if sj > si { + scored[i], scored[j] = scored[j], scored[i] + } + } + } + domainCount := make(map[string]int) + out := make([]storage.IndexEntry, 0, limit) + for _, e := range scored { + host := netloc(e.URL) + if host == "" { + host = e.URL + } + host = strings.ToLower(host) + isHome := isHomepage(e.URL) + if !isHome && domainCount[host] >= sameDomainLimit { + continue + } + domainCount[host]++ + out = append(out, e) + if len(out) >= limit { + break + } + } + return out +} + +func isHomepage(u string) bool { + u = strings.TrimPrefix(u, "https://") + u = strings.TrimPrefix(u, "http://") + return strings.Count(strings.TrimRight(u, "/"), "/") == 0 +} + +func nthLargest(values []float64, n int) float64 { + if n >= len(values) { + return 0 + } + cp := make([]float64, len(values)) + copy(cp, values) + for i := 0; i <= n; i++ { + maxIdx := i + for j := i + 1; j < len(cp); j++ { + if cp[j] > cp[maxIdx] { + maxIdx = j + } + } + cp[i], cp[maxIdx] = cp[maxIdx], cp[i] + } + return cp[n] +}