// Package harvester implements the index-writing server (port 5000). // 收获服务器包:接收爬虫发送的关键词索引数据,批量写入 bbolt 持久化存储。 // // 工作流程:爬虫每抓取一个页面,将 (URL, 关键词列表) 通过 HTTP POST 发送到本服务; // 本服务先将数据积累在内存中,当内存中索引条目数量超过阈值时,批量合并到磁盘索引。 package harvester import ( "encoding/json" // JSON 反序列化(解析爬虫请求) "log" // 日志输出 "math/rand" // 随机数(打乱合并顺序、触发概率性操作) "net/http" // HTTP 服务端 "strings" // 字符串操作(URL 清洗) "sync" // 互斥锁(保护内存索引、防止并发刷盘) "sync/atomic" // 原子操作(计数器) "sese-engine/config" // 全局配置(刷盘阈值、URL 上限) "sese-engine/info" // info 服务(查询繁荣分数用于裁剪) "sese-engine/storage" // 持久化存储 ) // Server 是收获 HTTP 服务器,负责接收爬虫数据、内存聚合、批量写入。 type Server struct { db *storage.DB // 内存索引聚合器:关键词 → 该词关联的 [权重, URL] 条目列表 mem map[string][]storage.IndexEntry memMu sync.Mutex // 保护内存索引的并发写入 rowCount int64 // 内存中累计的索引条目总数(用于触发刷盘) flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 infoSvc *info.Service // info 服务:用于查询繁荣分数来决定索引裁剪优先级 } // New 创建一个 harvester Server 实例。 func New(db *storage.DB, infoSvc *info.Service) *Server { return &Server{ db: db, mem: make(map[string][]storage.IndexEntry), infoSvc: infoSvc, } } // ingestPayload 是爬虫发送的 JSON 请求体结构。 type ingestPayload struct { URL string `json:"url"` // 被索引页面的最终 URL Keywords []struct { Word string `json:"word"` // 关键词 Weight float32 `json:"weight"` // 该 URL 在该词下的权重 } `json:"keywords"` } // Handler 返回 HTTP 路由处理器。 func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/l", s.handleIngest) // /l 端点:接收爬虫数据 return mux } // handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。 func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var payload ingestPayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest) return } // 清洗 URL:去除换行符(防止注入) payload.URL = strings.ReplaceAll(payload.URL, "\n", "") if payload.URL == "" { http.Error(w, "empty url", http.StatusBadRequest) return } s.memMu.Lock() for _, kw := range payload.Keywords { key := kw.Word entries := s.mem[key] // 阈值提前过滤:若该词已有大量条目,则只接受权重足够高的新条目 if len(entries) > 15 { low := s.lowThreshold(key) if float64(kw.Weight) < low { continue } } s.mem[key] = append(entries, storage.IndexEntry{ Weight: kw.Weight, URL: payload.URL, }) atomic.AddInt64(&s.rowCount, 1) } s.memMu.Unlock() // 当内存条目数超过阈值时,异步触发刷盘 if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) { go s.flush() } w.Write([]byte("ok")) } // lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。 // 计算方式:找到磁盘上该词第 MaxURLsPerKey 高权重值,取其 5% 作为阈值。 func (s *Server) lowThreshold(key string) float64 { existing, _ := s.db.GetIndex(key) if len(existing) < config.MaxURLsPerKey { return -1 // 未达上限,所有条目都接受 } // 收集所有权重值 weights := make([]float64, len(existing)) for i, e := range existing { weights[i] = float64(e.Weight) } // 找第 MaxURLsPerKey-1 大的值(即准入门槛) return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05 } // flush 将内存中的索引批量合并写入磁盘,然后清空内存。 // 整个过程:原子快照 → 并行合并 → 批量写入。 func (s *Server) flush() { // TryLock:若已有其他 flush 在执行则直接退出 if !s.flushMu.TryLock() { return } defer s.flushMu.Unlock() // 原子快照:取出当前内存数据并立即重置 s.memMu.Lock() snapshot := s.mem s.mem = make(map[string][]storage.IndexEntry) atomic.StoreInt64(&s.rowCount, 0) s.memMu.Unlock() log.Printf("[harvester] flushing %d keys", len(snapshot)) // 转换为切片便于处理,打乱顺序防止热点词优先处理导致堆积 items := make([]struct { key string entries []storage.IndexEntry }, 0, len(snapshot)) for k, v := range snapshot { items = append(items, struct { key string entries []storage.IndexEntry }{k, v}) } rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] }) // 并行合并:每个关键词独立合并到磁盘 type result struct { key string entries []storage.IndexEntry } results := make(chan result, len(items)) sem := make(chan struct{}, 8) // 最多 8 个并发合并协程 for _, item := range items { sem <- struct{}{} go func(k string, newEntries []storage.IndexEntry) { defer func() { <-sem }() merged := s.mergeKey(k, newEntries) results <- result{k, merged} }(item.key, item.entries) } // 收集所有合并结果 batch := make(map[string][]storage.IndexEntry, len(items)) for range items { r := <-results batch[r.key] = r.entries } // 批量写入 bbolt(一次事务写入所有关键词) if err := s.db.BatchSetIndex(batch); err != nil { log.Printf("[harvester] flush write error: %v", err) } log.Printf("[harvester] flush done, %d keys written", len(batch)) } // mergeKey 将新条目和磁盘已有条目合并后返回最终列表。 // 包含:去重 → 概率性 URL 归一化去重 → 超限时按繁荣分数裁剪。 func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { existing, _ := s.db.GetIndex(key) // 新关键词:如果条目数过少则丢弃(避免索引质量下降) if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey { return nil } // 合并新旧条目 merged := dedup(append(newEntries, existing...)) // 2% 概率执行 URL 归一化去重(去除 https/http 重复、尾部斜杠差异等) if rand.Float64() < 0.02 { merged = dedupNormalised(merged) } // 超限或 2% 概率触发裁剪:按 (权重 × 繁荣分数) 排序后截断 if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 { merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey) } return merged } // ---- 辅助函数 ---- // dedup 按 URL 完全匹配去重。 func dedup(entries []storage.IndexEntry) []storage.IndexEntry { seen := make(map[string]bool, len(entries)) out := make([]storage.IndexEntry, 0, len(entries)) for _, e := range entries { if seen[e.URL] { continue } seen[e.URL] = true out = append(out, e) } return out } // dedupNormalised 按 URL 归一化去重(去除协议前缀和尾部斜杠后比较)。 // 按 URL 长度降序排序后处理:短 URL 优先保留(更可能是规范 URL)。 func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry { sorted := make([]storage.IndexEntry, len(entries)) copy(sorted, entries) // 降序排列(简单冒泡) for i := 0; i < len(sorted)-1; i++ { for j := i + 1; j < len(sorted); j++ { if len(sorted[j].URL) > len(sorted[i].URL) { sorted[i], sorted[j] = sorted[j], sorted[i] } } } seen := make(map[string]bool) out := make([]storage.IndexEntry, 0, len(sorted)) for _, e := range sorted { k := normaliseURL(e.URL) if seen[k] { continue } seen[k] = true out = append(out, e) } return out } // normaliseURL 归一化 URL:去除协议前缀,尾部斜杠去除。 func normaliseURL(u string) string { if strings.HasPrefix(u, "https://") { u = u[8:] } else if strings.HasPrefix(u, "http://") { u = u[7:] } return strings.TrimRight(u, "/") } // trim 将条目列表裁剪到指定上限,同时限制每个域名的最大条目数。 // 排序依据:(权重 × (1 + 繁荣分数)),使高权重且高繁荣的 URL 优先保留。 func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry { // 按综合分数降序排列 scored := make([]storage.IndexEntry, len(entries)) copy(scored, entries) for i := 0; i < len(scored)-1; i++ { for j := i + 1; j < len(scored); j++ { si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL)) sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL)) if sj > si { scored[i], scored[j] = scored[j], scored[i] } } } // 按域名计数,每个域名最多保留 sameDomainLimit 条(首页 URL 不受限制) domainCount := make(map[string]int) out := make([]storage.IndexEntry, 0, limit) for _, e := range scored { host := netloc(e.URL) if host == "" { host = e.URL } host = strings.ToLower(host) isHome := isHomepage(e.URL) // 首页 URL 不受域名数量限制 if !isHome && domainCount[host] >= sameDomainLimit { continue } domainCount[host]++ out = append(out, e) if len(out) >= limit { break } } return out } // isHomepage 判断 URL 是否为网站首页(不含路径层级)。 func isHomepage(u string) bool { u = strings.TrimPrefix(u, "https://") u = strings.TrimPrefix(u, "http://") return strings.Count(strings.TrimRight(u, "/"), "/") == 0 } // netloc 从 URL 提取主机名(简化版,不依赖 net/url)。 func netloc(rawURL string) string { parts := strings.SplitN(rawURL, "/", 4) if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { return parts[2] } return "" } // nthLargest 返回切片中第 n 大的值(0-indexed,即找第 n+1 大的值)。 // 用于获取准入权重阈值。 func nthLargest(values []float64, n int) float64 { if n >= len(values) { return 0 } cp := make([]float64, len(values)) copy(cp, values) // 部分排序:只需将前 n+1 项排好序 for i := 0; i <= n; i++ { maxIdx := i for j := i + 1; j < len(cp); j++ { if cp[j] > cp[maxIdx] { maxIdx = j } } cp[i], cp[maxIdx] = cp[maxIdx], cp[i] } return cp[n] } // ListenAndServe 启动收获服务器在指定地址监听。 func (s *Server) ListenAndServe(addr string) error { log.Printf("[harvester] listening on %s", addr) return http.ListenAndServe(addr, s.Handler()) }