合并路由

This commit is contained in:
2026-04-08 20:12:23 +08:00
parent d14c9caa56
commit 8520b104eb
5 changed files with 378 additions and 522 deletions
+362 -62
View File
@@ -3,58 +3,86 @@
package search
import (
"container/heap" // 堆结构(域名交错排序)
"encoding/json" // JSON 序列化(响应输出)
"io" // 代理响应复制
"log" // 日志
"math" // 数学运算(Log、幂)
"net/http" // HTTP 服务端
"net/url" // URL 解析
"regexp" // 正则表达式(site: 过滤语法)
"sort" // 排序
"strconv" // 字符串转整数
"strings" // 字符串操作
"sync" // 互斥锁(保护并发切片写入)
"time" // 时间戳
"container/heap" // 堆结构(域名交错排序)
"encoding/json" // JSON 序列化(响应输出)
"fmt" // 错误格式化
"io" // 读取请求体
"log" // 日志
"math" // 数学运算(Log、幂)
"math/rand" // 随机数(刷盘时打乱顺序、概率性去重/裁剪)
"net/http" // HTTP 服务端
"net/url" // URL 解析
"regexp" // 正则表达式(site: 过滤语法)
"sort" // 排序
"strconv" // 字符串转整数
"strings" // 字符串操作(URL 清洗)
"sync" // 互斥锁(保护内存索引、并发切片写入)
"sync/atomic" // 原子操作(计数器)
"time" // 时间戳
"sese-engine/analyzer" // 分词和语检测
"sese-engine/analyzer" // 分词和语检测
"sese-engine/config" // 排序权重配置
"sese-engine/info" // info 服务
"sese-engine/parser" // HTML 解析(在线摘要)
"sese-engine/storage" // 持久化存储
"sese-engine/info" // info 服务
"sese-engine/parser" // HTML 解析(在线摘要)
"sese-engine/storage" // 持久化存储
)
// Server 是搜索 HTTP 服务器。
// Server 是搜索 HTTP 服务器,同时内嵌收获服务(统一在同一端口)
type Server struct {
db *storage.DB
infoSvc *info.Service
analyzer *analyzer.Analyzer
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
harvesterURL string // 收获服务器地址(如 "http://localhost:5000"
db *storage.DB
infoSvc *info.Service
analyzer *analyzer.Analyzer
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
// 以下为收获服务(harvester)内嵌字段
mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目
memMu sync.Mutex // 保护内存索引的并发写入
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
}
// New 创建一个 search Server。
// harvesterURL 为收获服务器的地址,用于代理刷盘和状态查询。
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer, harvesterURL string) *Server {
return &Server{
db: db,
infoSvc: infoSvc,
analyzer: a,
harvesterURL: harvesterURL,
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
s := &Server{
db: db,
infoSvc: infoSvc,
analyzer: a,
mem: make(map[string][]storage.IndexEntry),
httpCli: &http.Client{
Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second,
},
}
// 启动定期刷盘 goroutine
go s.runPeriodicFlush()
return s
}
// Handler 返回 HTTP 路由处理器
// runPeriodicFlush 每隔 FlushIntervalSeconds 秒触发一次刷盘
func (s *Server) runPeriodicFlush() {
ticker := time.NewTicker(time.Duration(config.FlushIntervalSeconds) * time.Second)
defer ticker.Stop()
for range ticker.C {
s.Flush()
}
}
// Flush 公开的刷盘方法,供定时任务和外部调用。
func (s *Server) Flush() { s.flush() }
// Handler 返回 HTTP 路由处理器(统一端口,同时提供搜索和收获服务)。
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
// 搜索路由
mux.HandleFunc("/search", s.handleSearch)
// 收获服务路由(爬虫数据写入)
mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据
mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘
// 管理接口
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
mux.HandleFunc("/admin/stats", s.handleAdminStats)
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
mux.HandleFunc("/admin/pending", s.handleAdminPending)
return mux
}
@@ -203,18 +231,7 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
"total_words": totalWords,
"domains": domainsMap,
"languages": langsMap,
}
// 从 harvester 代理获取未刷盘数据条数
if s.harvesterURL != "" {
req, _ := http.NewRequest(http.MethodGet, s.harvesterURL+"/admin/pending", nil)
if proxyResp, err := s.httpCli.Do(req); err == nil {
defer proxyResp.Body.Close()
var pendingResp map[string]int64
if err := json.NewDecoder(proxyResp.Body).Decode(&pendingResp); err == nil {
resp["pending"] = pendingResp["pending"]
}
}
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
}
json.NewEncoder(w).Encode(resp)
@@ -300,31 +317,23 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
}
}
// handleAdminFlush 代理到 harvester 的 /flush 接口,执行刷盘。
// handleAdminFlush 强制刷盘。
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, `{"error":"method not allowed"}`, 405)
return
}
s.Flush()
w.Write([]byte("flushed"))
}
// 代理请求到 harvester
proxyURL := s.harvesterURL + "/flush"
req, err := http.NewRequest(http.MethodGet, proxyURL, nil)
if err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
resp, err := s.httpCli.Do(req)
if err != nil {
http.Error(w, `{"error":"harvester unreachable: `+err.Error()+`"}`, 502)
return
}
defer resp.Body.Close()
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
// handleAdminPending 返回内存中未刷盘的索引条目数量。
func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
count := atomic.LoadInt64(&s.rowCount)
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
}
// ---- 搜索处理器 ----
@@ -1022,3 +1031,294 @@ func min(a, b int) int {
}
return b
}
// ---- 以下为内嵌的收获服务(harvester)逻辑 ----
// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。
type ingestPayload struct {
URL string `json:"url"` // 被索引页面的最终 URL
Keywords []struct {
Word string `json:"word"` // 关键词
Weight float32 `json:"weight"` // 该 URL 在该词下的权重
} `json:"keywords"`
}
// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]]
type ingestPayloadLegacy []any
// parsePayload 解析爬虫请求体,兼容新旧两种格式。
// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]}
// 旧格式(Python 爬虫):["url", [["word", weight], ...]]
func parsePayload(r *http.Request) (*ingestPayload, error) {
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
if err != nil {
return nil, err
}
// 先尝试新格式(Go 爬虫)
var modern ingestPayload
if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" {
return &modern, nil
}
// 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]]
var legacy ingestPayloadLegacy
if err := json.Unmarshal(body, &legacy); err != nil {
return nil, fmt.Errorf("invalid payload: %w", err)
}
if len(legacy) < 2 {
return nil, fmt.Errorf("invalid legacy payload: too few elements")
}
url, ok := legacy[0].(string)
if !ok {
return nil, fmt.Errorf("invalid url type")
}
kwsRaw, ok := legacy[1].([]any)
if !ok {
return nil, fmt.Errorf("invalid keywords type")
}
payload := &ingestPayload{URL: url}
for _, kw := range kwsRaw {
kwSlice, ok := kw.([]any)
if !ok || len(kwSlice) < 2 {
continue
}
word, _ := kwSlice[0].(string)
weight, _ := kwSlice[1].(float64)
if word == "" {
continue
}
payload.Keywords = append(payload.Keywords, struct {
Word string `json:"word"`
Weight float32 `json:"weight"`
}{word, float32(weight)})
}
if payload.URL == "" {
return nil, fmt.Errorf("empty url after parsing")
}
return payload, nil
}
// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。
func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
payload, err := parsePayload(r)
if err != nil {
http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest)
return
}
payload.URL = strings.ReplaceAll(payload.URL, "\n", "")
if payload.URL == "" {
http.Error(w, "empty url", http.StatusBadRequest)
return
}
s.memMu.Lock()
for _, kw := range payload.Keywords {
key := kw.Word
entries := s.mem[key]
if len(entries) > 15 {
low := s.lowThreshold(key)
if float64(kw.Weight) < low {
continue
}
}
s.mem[key] = append(entries, storage.IndexEntry{
Weight: kw.Weight,
URL: payload.URL,
})
atomic.AddInt64(&s.rowCount, 1)
}
s.memMu.Unlock()
if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) {
go s.Flush()
}
w.Write([]byte("ok"))
}
// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。
func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) {
s.Flush()
w.Write([]byte("flushed"))
}
// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。
func (s *Server) lowThreshold(key string) float64 {
existing, _ := s.db.GetIndex(key)
if len(existing) < config.MaxURLsPerKey {
return -1
}
weights := make([]float64, len(existing))
for i, e := range existing {
weights[i] = float64(e.Weight)
}
return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05
}
// flush 将内存中的索引批量合并写入磁盘,然后清空内存。
func (s *Server) flush() {
if !s.flushMu.TryLock() {
return
}
defer s.flushMu.Unlock()
s.memMu.Lock()
snapshot := s.mem
s.mem = make(map[string][]storage.IndexEntry)
atomic.StoreInt64(&s.rowCount, 0)
s.memMu.Unlock()
log.Printf("[harvester] flushing %d keys", len(snapshot))
items := make([]struct {
key string
entries []storage.IndexEntry
}, 0, len(snapshot))
for k, v := range snapshot {
items = append(items, struct {
key string
entries []storage.IndexEntry
}{k, v})
}
rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] })
type result struct {
key string
entries []storage.IndexEntry
}
results := make(chan result, len(items))
sem := make(chan struct{}, 8)
for _, item := range items {
sem <- struct{}{}
go func(k string, newEntries []storage.IndexEntry) {
defer func() { <-sem }()
merged := s.mergeKey(k, newEntries)
results <- result{k, merged}
}(item.key, item.entries)
}
batch := make(map[string][]storage.IndexEntry, len(items))
for range items {
r := <-results
batch[r.key] = r.entries
}
if err := s.db.BatchSetIndex(batch); err != nil {
log.Printf("[harvester] flush write error: %v", err)
}
log.Printf("[harvester] flush done, %d keys written", len(batch))
}
// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。
func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry {
existing, _ := s.db.GetIndex(key)
if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey {
return nil
}
merged := dedup(append(newEntries, existing...))
if rand.Float64() < 0.02 {
merged = dedupNormalised(merged)
}
if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 {
merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey)
}
return merged
}
// ---- 收获服务辅助函数 ----
func dedup(entries []storage.IndexEntry) []storage.IndexEntry {
seen := make(map[string]bool, len(entries))
out := make([]storage.IndexEntry, 0, len(entries))
for _, e := range entries {
if seen[e.URL] {
continue
}
seen[e.URL] = true
out = append(out, e)
}
return out
}
func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry {
sorted := make([]storage.IndexEntry, len(entries))
copy(sorted, entries)
for i := 0; i < len(sorted)-1; i++ {
for j := i + 1; j < len(sorted); j++ {
if len(sorted[j].URL) > len(sorted[i].URL) {
sorted[i], sorted[j] = sorted[j], sorted[i]
}
}
}
seen := make(map[string]bool)
out := make([]storage.IndexEntry, 0, len(sorted))
for _, e := range sorted {
k := normaliseURL(e.URL)
if seen[k] {
continue
}
seen[k] = true
out = append(out, e)
}
return out
}
func normaliseURL(u string) string {
if strings.HasPrefix(u, "https://") {
u = u[8:]
} else if strings.HasPrefix(u, "http://") {
u = u[7:]
}
return strings.TrimRight(u, "/")
}
func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry {
scored := make([]storage.IndexEntry, len(entries))
copy(scored, entries)
for i := 0; i < len(scored)-1; i++ {
for j := i + 1; j < len(scored); j++ {
si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL))
sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL))
if sj > si {
scored[i], scored[j] = scored[j], scored[i]
}
}
}
domainCount := make(map[string]int)
out := make([]storage.IndexEntry, 0, limit)
for _, e := range scored {
host := netloc(e.URL)
if host == "" {
host = e.URL
}
host = strings.ToLower(host)
isHome := isHomepage(e.URL)
if !isHome && domainCount[host] >= sameDomainLimit {
continue
}
domainCount[host]++
out = append(out, e)
if len(out) >= limit {
break
}
}
return out
}
func isHomepage(u string) bool {
u = strings.TrimPrefix(u, "https://")
u = strings.TrimPrefix(u, "http://")
return strings.Count(strings.TrimRight(u, "/"), "/") == 0
}
func nthLargest(values []float64, n int) float64 {
if n >= len(values) {
return 0
}
cp := make([]float64, len(values))
copy(cp, values)
for i := 0; i <= n; i++ {
maxIdx := i
for j := i + 1; j < len(cp); j++ {
if cp[j] > cp[maxIdx] {
maxIdx = j
}
}
cp[i], cp[maxIdx] = cp[maxIdx], cp[i]
}
return cp[n]
}