Files
sese-engine-go/search/server.go
T

1388 lines
39 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package search implements the user-facing search HTTP server.
// search 包对外提供 HTTP 搜索服务,接收查询请求并返回按多因子排序的搜索结果。
package search
import (
"container/heap" // 堆结构(域名交错排序)
"encoding/json" // JSON 序列化(响应输出)
"fmt" // 错误格式化
"io" // 读取请求体
"log" // 日志
"math" // 数学运算(Log、幂)
"math/rand" // 随机数(刷盘时打乱顺序、概率性去重/裁剪)
"net/http" // HTTP 服务端
"net/url" // URL 解析
"os" // 文件系统(静态文件读取)
"regexp" // 正则表达式(site: 过滤语法)
"sort" // 排序
"strconv" // 字符串转整数
"strings" // 字符串操作(URL 清洗)
"sync" // 互斥锁(保护内存索引、并发切片写入)
"sync/atomic" // 原子操作(计数器)
"time" // 时间戳
"sese-engine/analyzer" // 分词和语言检测
"sese-engine/config" // 排序权重配置
"sese-engine/info" // info 服务
"sese-engine/parser" // HTML 解析(在线摘要)
"sese-engine/storage" // 持久化存储
)
// Server 是搜索 HTTP 服务器,同时内嵌收获服务(统一在同一端口)。
type Server struct {
db *storage.DB
infoSvc *info.Service
analyzer *analyzer.Analyzer
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
// 以下为收获服务(harvester)内嵌字段
mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目
memMu sync.Mutex // 保护内存索引的并发写入
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
}
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
s := &Server{
db: db,
infoSvc: infoSvc,
analyzer: a,
mem: make(map[string][]storage.IndexEntry),
httpCli: &http.Client{
Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second,
},
}
// 启动定期刷盘 goroutine
go s.runPeriodicFlush()
return s
}
// runPeriodicFlush 每隔 FlushIntervalSeconds 秒触发一次刷盘。
func (s *Server) runPeriodicFlush() {
ticker := time.NewTicker(time.Duration(config.FlushIntervalSeconds) * time.Second)
defer ticker.Stop()
for range ticker.C {
s.Flush()
}
}
// Flush 公开的刷盘方法,供定时任务和外部调用。
func (s *Server) Flush() { s.flush() }
// Handler 返回 HTTP 路由处理器(统一端口,同时提供搜索、收获和静态文件服务)。
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
// 搜索路由
mux.HandleFunc("/search", s.handleSearch)
// 收获服务路由(爬虫数据写入)
mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据
mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘
// 管理接口
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
mux.HandleFunc("/admin/stats", s.handleAdminStats)
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
mux.HandleFunc("/admin/pending", s.handleAdminPending)
// 静态文件(SPA fallback
mux.Handle("/", spaHandler{dist: "dist"})
return mux
}
// spaHandler 提供静态文件服务,并对非文件路径(如 /admin)返回 index.htmlSPA 支持)。
type spaHandler struct {
dist string
}
func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
// API 路径(可能有后续处理)
if strings.HasPrefix(path, "/search") ||
strings.HasPrefix(path, "/admin") ||
strings.HasPrefix(path, "/l") ||
path == "/flush" {
http.NotFound(w, r)
return
}
// 去掉前缀斜杠
staticPath := strings.TrimPrefix(path, "/")
if staticPath == "" {
staticPath = "index.html"
}
// 尝试直接文件
if data, err := os.ReadFile(h.dist + "/" + staticPath); err == nil {
w.Header().Set("Access-Control-Allow-Origin", "*")
ext := strings.ToLower(path[strings.LastIndex(path, ".")+1:])
switch ext {
case "js":
w.Header().Set("Content-Type", "application/javascript")
case "css":
w.Header().Set("Content-Type", "text/css")
case "svg":
w.Header().Set("Content-Type", "image/svg+xml")
case "html":
w.Header().Set("Content-Type", "text/html; charset=utf-8")
case "png", "jpg", "jpeg", "gif", "ico", "webp":
w.Header().Set("Content-Type", "image/"+ext)
}
w.Write(data)
return
}
// 如果路径包含文件扩展名但文件不存在,返回 404
if strings.Contains(staticPath, ".") {
http.NotFound(w, r)
return
}
// SPA fallback:返回 index.html
if data, err := os.ReadFile(h.dist + "/index.html"); err == nil {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Write(data)
return
}
http.NotFound(w, r)
}
// ListenAndServe 启动搜索服务器。
func (s *Server) ListenAndServe(addr string) error {
log.Printf("[search] listening on %s", addr)
return http.ListenAndServe(addr, s.Handler())
}
// ---- Admin 接口 ----
// recentItem 是 /admin/recent 接口返回的单条记录。
type recentItem struct {
URL string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Domain string `json:"domain"`
Language map[string]float64 `json:"language"`
WordCount int `json:"word_count"`
CrawledAt int64 `json:"crawled_at"`
}
// handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。
// 参数:limit(默认50,最大200)。
func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
limit := 50
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil && v > 0 {
limit = v
}
}
if limit > 200 {
limit = 200
}
type entry struct {
url string
snippet *storage.SnippetEntry
siteInfo *storage.SiteInfo
}
var items []entry
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
siteInfo, _ := s.db.GetSiteInfo(netloc(url))
items = append(items, entry{url, snippet, siteInfo})
return nil
})
// 按时间倒序
sort.Slice(items, func(i, j int) bool {
return items[i].snippet.Timestamp > items[j].snippet.Timestamp
})
if len(items) > limit {
items = items[:limit]
}
result := make([]recentItem, 0, len(items))
for _, e := range items {
lang := e.siteInfo.Languages
if lang == nil {
lang = make(map[string]float64)
}
desc := e.snippet.Description
if len(desc) > 200 {
desc = desc[:200]
}
result = append(result, recentItem{
URL: e.url,
Title: e.snippet.Title,
Description: desc,
Domain: netloc(e.url),
Language: lang,
WordCount: len(e.snippet.Text),
CrawledAt: e.snippet.Timestamp,
})
}
resp := map[string]any{
"items": result,
"total": len(items),
}
json.NewEncoder(w).Encode(resp)
}
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
domainCount := make(map[string]int)
langCount := make(map[string]int)
totalWords := 0
total := 0
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
total++
domain := netloc(url)
domainCount[domain]++
totalWords += len(snippet.Text)
siteInfo, _ := s.db.GetSiteInfo(domain)
if siteInfo != nil {
for lang, ratio := range siteInfo.Languages {
if ratio > 0.1 {
langCount[lang]++
}
}
}
return nil
})
// 排序取 Top
type kv struct{ k string; v int }
topDomains := make([]kv, 0, len(domainCount))
for k, v := range domainCount {
topDomains = append(topDomains, kv{k, v})
}
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
if len(topDomains) > 20 {
topDomains = topDomains[:20]
}
topLangs := make([]kv, 0, len(langCount))
for k, v := range langCount {
topLangs = append(topLangs, kv{k, v})
}
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
if len(topLangs) > 10 {
topLangs = topLangs[:10]
}
domainsMap := make(map[string]int)
for _, kv := range topDomains {
domainsMap[kv.k] = kv.v
}
langsMap := make(map[string]int)
for _, kv := range topLangs {
langsMap[kv.k] = kv.v
}
resp := map[string]any{
"total_urls": total,
"total_words": totalWords,
"domains": domainsMap,
"languages": langsMap,
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
}
json.NewEncoder(w).Encode(resp)
}
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
// GET: 返回所有未访问的 priority 条目列表
// POST: 添加一条 URL 或域名(body: {url: "..."}
// DELETE: 删除指定 URLquery: ?url=...
func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
switch r.Method {
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,DELETE,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(204)
return
case http.MethodGet:
entries, err := s.db.GetPriorityURLs()
if err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
if entries == nil {
entries = []storage.PriorityEntry{}
}
json.NewEncoder(w).Encode(map[string]any{"items": entries, "count": len(entries)})
case http.MethodPost:
var body struct {
URL string `json:"url"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.URL) == "" {
http.Error(w, `{"error":"missing url field"}`, 400)
return
}
raw := strings.TrimSpace(body.URL)
entry := storage.PriorityEntry{AddedAt: time.Now().Unix()}
// 判断是完整 URL 还是纯域名
if strings.Contains(raw, "/") || strings.HasPrefix(raw, "http") {
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
http.Error(w, `{"error":"invalid url"}`, 400)
return
}
if u.Scheme == "" {
u.Scheme = "https"
}
entry.URL = u.String()
entry.IsDomain = false
} else {
entry.URL = "https://" + raw
entry.IsDomain = true
}
if err := s.db.AddPriorityURL(entry); err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
json.NewEncoder(w).Encode(map[string]string{"status": "added", "url": entry.URL})
case http.MethodDelete:
urlParam := r.URL.Query().Get("url")
if urlParam == "" {
http.Error(w, `{"error":"missing url parameter"}`, 400)
return
}
if err := s.db.RemovePriorityURL(urlParam); err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
json.NewEncoder(w).Encode(map[string]string{"status": "removed"})
default:
w.Header().Set("Allow", "GET,POST,DELETE")
http.Error(w, `{"error":"method not allowed"}`, 405)
}
}
// handleAdminFlush 强制刷盘。
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, `{"error":"method not allowed"}`, 405)
return
}
s.Flush()
w.Write([]byte("flushed"))
}
// handleAdminPending 返回内存中未刷盘的索引条目数量。
func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
count := atomic.LoadInt64(&s.rowCount)
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
}
// ---- 搜索处理器 ----
// searchResponse 是搜索 API 的 JSON 响应结构。
type searchResponse struct {
Tokens []string `json:"tokens"` // 查询的分词结果
Counts map[string]int `json:"counts"` // 每个词在索引中出现的 URL 数量
Results []searchResult `json:"results"` // 排序后的搜索结果列表
Total int `json:"total"` // 符合 site: 过滤条件前的总候选数
}
// searchResult 是单条搜索结果。
type searchResult struct {
Score float64 `json:"score"` // 综合排序分数
URL string `json:"url"` // 页面 URL
Snippet *snippetInfo `json:"snippet,omitempty"` // 摘要信息(标题/描述/正文)
Relevance map[string]float64 `json:"relevance"` // 每个关键词在该 URL 下的权重
DomainCount int `json:"domain_count"` // 该 URL 所属域名的总候选数
Factors map[string]float64 `json:"factors,omitempty"` // 各排序因子的详细分数
}
// snippetInfo 封装页面摘要的标题、描述和正文片段。
type snippetInfo struct {
Title string `json:"title"` // 页面标题
Description string `json:"description"` // meta description
Text string `json:"text"` // 正文前 256 字符
}
// siteRe 用于匹配 site: 过滤语法的正则(支持 site:example.com 语法)。
var siteRe = regexp.MustCompile(`^site:(.+)$`)
// handleSearch 处理 GET /search 请求。
// 参数:q(查询词),qh(URL 编码的查询词),slice(分页范围,格式 "from:to")。
func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") // 允许跨域
w.Header().Set("Content-Type", "application/json; charset=utf-8")
// 获取查询词
q := r.URL.Query().Get("q")
if q == "" {
// qh:URL 编码的查询词(用于含特殊字符的查询)
if qh := r.URL.Query().Get("qh"); qh != "" {
decoded, err := url.PathUnescape(qh)
if err == nil {
q = decoded
}
}
}
// 解析分页参数(格式 "0:10"
sliceStr := r.URL.Query().Get("slice")
sliceFrom, sliceTo := 0, 10
if sliceStr != "" {
parts := strings.SplitN(sliceStr, ":", 2)
if len(parts) == 2 {
a := atoi(parts[0])
b := atoi(parts[1])
if a >= 0 && b > a && b-a <= 20 {
sliceFrom, sliceTo = a, b
}
}
}
// 解析查询分词,并提取 site: 过滤条件
var tokens []string
var siteFilter string
for _, part := range strings.Fields(q) {
if m := siteRe.FindStringSubmatch(part); len(m) > 1 {
siteFilter = m[1] // site:example.com 提取目标主机名
} else {
segs := s.analyzer.Segment(part, false)
for _, t := range segs {
if !s.infoSvc.IsBlocked(t) { // 过滤屏蔽词
tokens = append(tokens, t)
}
}
}
}
// 最多保留 20 个词(避免查询过于宽泛)
if len(tokens) > 20 {
tokens = tokens[:20]
}
results, total := s.query(tokens, sliceFrom, sliceTo, siteFilter)
// 统计每个词命中的 URL 数量(供前端展示)
counts := make(map[string]int, len(tokens))
for _, t := range tokens {
entries, _ := s.db.GetIndex(t)
counts[t] = len(entries)
}
resp := searchResponse{
Tokens: tokens,
Counts: counts,
Results: results,
Total: total,
}
json.NewEncoder(w).Encode(resp)
}
// query 执行多关键词搜索,返回排序后的结果列表。
// 搜索流程:加载倒排索引 → 构建 URL 候选集 → 多因子评分 → 域名交错重排 → 截取分页。
func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]searchResult, int) {
if len(tokens) == 0 {
return nil, 0
}
// 加载每个词对应的倒排索引条目
type tokenIndex struct {
token string
entries []storage.IndexEntry
defVal float64 // 缺省权重(词在索引中条目已满时使用)
}
tokenIndexes := make([]tokenIndex, 0, len(tokens))
for _, t := range tokens {
entries, _ := s.db.GetIndex(t)
// 计算缺省权重:当条目数达到上限时,权重低于第 MaxURLsPerKey 名的条目使用缺省权重
defVal := 1.0 / 10000 * float64(max(100, len(entries))) / float64(config.MaxURLsPerKey)
if len(entries) >= config.MaxURLsPerKey {
weights := make([]float64, len(entries))
for i, e := range entries {
weights[i] = float64(e.Weight)
}
sort.Sort(sort.Reverse(sort.Float64Slice(weights)))
defVal = math.Max(1.0/10000, weights[config.MaxURLsPerKey-1]/2)
}
tokenIndexes = append(tokenIndexes, tokenIndex{t, entries, defVal})
}
// 构建 URL → (词 → 权重) 映射,收集所有候选 URL
urlWeights := make(map[string]map[string]float64)
for _, ti := range tokenIndexes {
for _, e := range ti.entries {
if urlWeights[e.URL] == nil {
urlWeights[e.URL] = make(map[string]float64)
}
urlWeights[e.URL][ti.token] = float64(e.Weight)
}
}
// site: 过滤
total := len(urlWeights)
if siteFilter != "" {
filtered := make(map[string]map[string]float64)
for u, vs := range urlWeights {
h := netloc(u)
if matchSite(h, siteFilter) {
filtered[u] = vs
}
}
urlWeights = filtered
total = len(urlWeights)
}
// 构建每个词对应的缺省权重 map
defVals := make(map[string]float64, len(tokenIndexes))
for _, ti := range tokenIndexes {
defVals[ti.token] = ti.defVal
}
// 计算每个 URL 的相关性和初始分数
candidates := make([]candidate, 0, len(urlWeights))
for u, vs := range urlWeights {
// 词权重相乘(贝叶斯概率近似),缺省权重填充缺失词
rel := 1.0
for _, ti := range tokenIndexes {
vp := vs[ti.token]
if vp == 0 {
vp = defVals[ti.token]
}
if vp > 0.06 {
vp = math.Log((vp-0.06)*40+1)/40 + 0.06
}
rel *= vp
}
// 反向链接繁荣加分
prosper := 1 + s.infoSvc.Prosper(u)*config.BacklinkWeight
bad := badURL(u)
adjust := s.infoSvc.Adjust(netloc(u))
// 基础分数 = 相关性 × 繁荣值 × URL质量 × 人工调整
score := rel * prosper * (1 - bad) * adjust * 0.1
// 12 维分数向量:分别记录各项因子,供后续多阶段调整
var vec [12]float64
vec[0] = score // 0: 综合分数
vec[1] = rel // 1: 相关性
vec[2] = prosper // 2: 繁荣值
vec[3] = 1 - bad // 3: URL 质量
vec[4] = 1 // 4: 语种倍数(待填充)
vec[5] = 1 // 5: 重复惩罚(待填充)
vec[6] = adjust // 6: 人工调整
vec[7] = 1 // 7: 网站时间衰减(待填充)
vec[8] = 1 // 8: 连续词加成(待填充)
vec[9] = 1 // 9: 关键词内容(预留)
vec[10] = 1 // 10: URL 时间衰减(待填充)
vec[11] = 0.1 // 11: 常数因子
candidates = append(candidates, candidate{u, rel, vec})
}
// 初步排序
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
})
// 阶段一:加载网站信息,计算语种倍数和时间衰减(Top 256 并发)
now := time.Now().Unix()
limit256 := 256
if len(candidates) < 256 {
limit256 = len(candidates)
}
var wg sync.WaitGroup
for i := 0; i < limit256; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
c := &candidates[idx]
h := netloc(c.url)
siteInfo, _ := s.db.GetSiteInfo(h)
langMul := languageMultiplier(siteInfo)
timeMul := timeMul(siteInfo, now)
urlTimeMul := urlTimeMul(s.db, c.url, now)
// 更新综合分数和各项因子
c.scoreVec[0] = c.scoreVec[0] * 10 * langMul * timeMul * urlTimeMul
c.scoreVec[4] = langMul
c.scoreVec[7] = timeMul
c.scoreVec[10] = urlTimeMul
}(i)
}
wg.Wait()
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
})
// 阶段二:连续词加成和标题重复惩罚(Top 80)
limit80 := 80
if len(candidates) < 80 {
limit80 = len(candidates)
}
titles := make([]string, limit80)
for i := 0; i < limit80; i++ {
if snippet, err := s.db.GetSnippet(candidates[i].url); err == nil {
titles[i] = snippet.Title
}
}
// 重复惩罚:与前序结果标题相似度过高则降权
for i := 0; i < limit80; i++ {
h := repetitionSimilarity(titles, i)
consecutive := consecutiveCount(titles[i], tokens)
repMul := 1.0
if h > 0.5 {
repMul = 1 - (h - 0.5)
}
// 连续词出现越多,乘以 config.ConsecutiveKeyWeight>1)加成
consMul := math.Pow(config.ConsecutiveKeyWeight, float64(consecutive))
candidates[i].scoreVec[0] *= repMul * consMul
candidates[i].scoreVec[5] = repMul
candidates[i].scoreVec[8] = consMul
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
})
// 阶段三:域名交错重排(使结果更丰富多样)
reranked := rerank(candidates, from, to)
// 并发获取摘要
results := make([]searchResult, 0, len(reranked))
var snippetMu sync.Mutex
var snippetWg sync.WaitGroup
for _, c := range reranked {
snippetWg.Add(1)
go func(cand candidate) {
defer snippetWg.Done()
snip := s.getSnippet(cand.url)
r := searchResult{
Score: cand.scoreVec[0],
URL: unescapeURL(cand.url),
Snippet: snip,
Relevance: make(map[string]float64),
DomainCount: 0,
Factors: map[string]float64{
"relevance": cand.scoreVec[1],
"backlink": cand.scoreVec[2],
"url_quality": cand.scoreVec[3],
"language": cand.scoreVec[4],
"repetition": cand.scoreVec[5],
"adjust": cand.scoreVec[6],
"site_time": cand.scoreVec[7],
"consecutive": cand.scoreVec[8],
"url_time": cand.scoreVec[10],
},
}
for _, ti := range tokenIndexes {
r.Relevance[ti.token] = urlWeights[cand.url][ti.token]
}
snippetMu.Lock()
results = append(results, r)
snippetMu.Unlock()
}(c)
}
snippetWg.Wait()
// 保持 rerank 的原始顺序(并发写入打乱了顺序)
urlOrder := make(map[string]int)
for i, c := range reranked {
urlOrder[c.url] = i
}
sort.Slice(results, func(i, j int) bool {
return urlOrder[results[i].URL] < urlOrder[results[j].URL]
})
return results, total
}
// getSnippet 获取某 URL 的摘要,优先从缓存读取,缓存未命中则在线抓取。
func (s *Server) getSnippet(rawURL string) *snippetInfo {
// 优先读缓存
if entry, err := s.db.GetSnippet(rawURL); err == nil {
snip := buildSnippet(entry)
return snip
}
if !config.UseOnlineSnippet {
return nil
}
// 在线抓取(不使用 robots.txt,适用于搜索摘要场景)
req, err := http.NewRequest("GET", rawURL, nil)
if err != nil {
return nil
}
req.Header.Set("User-Agent", config.SpiderName)
resp, err := s.httpCli.Do(req)
if err != nil || resp.StatusCode != 200 {
return nil
}
defer resp.Body.Close()
ct := resp.Header.Get("Content-Type")
if !strings.Contains(ct, "text/html") {
return nil
}
body := readBodyLimited(resp, 60000)
title, desc, text, _ := parser.ParseHTML(body, resp.Request.URL.String())
entry := &storage.SnippetEntry{
Title: title,
Description: truncate(desc, 256),
Text: truncate(text, 256),
Timestamp: time.Now().Unix(),
}
_ = s.db.SetSnippet(rawURL, entry)
return buildSnippet(entry)
}
func buildSnippet(entry *storage.SnippetEntry) *snippetInfo {
if entry == nil || (entry.Title == "" && entry.Description == "" && entry.Text == "") {
return nil
}
return &snippetInfo{
Title: entry.Title,
Description: entry.Description,
Text: entry.Text,
}
}
// ---- 评分辅助函数 ----
// languageMultiplier 根据网站的语种分布计算语种倍数。
// 中文占比越高且无关语言占比越低,则倍数越高(加分);否则降权。
func languageMultiplier(si *storage.SiteInfo) float64 {
if si == nil || len(si.Languages) == 0 {
return 1.0
}
total := 0.0
for _, v := range si.Languages {
total += v
}
chinese := si.Languages["zh"] / total
weird := (total - si.Languages["zh"] - si.Languages["en"] - si.Languages["ja"]) / total
return 1 + chinese*config.LanguageWeight - weird*config.LanguageWeight
}
// timeMul 根据网站最后访问时间计算时间衰减倍数(越久远衰减越多)。
func timeMul(si *storage.SiteInfo, now int64) float64 {
if si == nil {
return 1.0
}
t := si.LastVisitTime
if t == 0 {
t = 1648000000 // 默认时间戳(2022 年初)
}
days := (now - t) / (3600 * 24)
if days < 0 {
days = 0
}
if days > 180 {
days = 180 // 最多衰减到约半年前
}
if days > 0 {
days-- // 跳过第一天
}
return math.Pow(config.WeightDailyDecay, float64(days))
}
// urlTimeMul 根据该 URL 的摘要抓取时间计算时间衰减倍数(30 天内不衰减)。
func urlTimeMul(db *storage.DB, rawURL string, now int64) float64 {
entry, err := db.GetSnippet(rawURL)
if err != nil || entry == nil {
return 1.0
}
days := (now - entry.Timestamp) / (3600 * 24)
if days <= 30 {
return 1.0
}
return math.Pow((2+config.WeightDailyDecay)/3, float64(days))
}
// badURL 返回 URL 的"劣质"评分(0~0.9)。
func badURL(u string) float64 {
s := math.Max(0, float64(len(u)-30)/200.0)
if strings.Contains(u, ".htm") || strings.Contains(u, ".php") {
s += (1 - s) * 0.3
}
if strings.Count(strings.TrimRight(u, "/"), "/") > 2 {
s += (1 - s) * 0.1
}
if len(u) < 5 || u[4] == ':' {
s += (1 - s) * 0.3
}
return math.Min(s, 0.9)
}
// netloc 从 URL 提取主机名。
func netloc(rawURL string) string {
parts := strings.SplitN(rawURL, "/", 4)
if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" {
return parts[2]
}
return rawURL
}
// matchSite 判断主机名是否匹配 site: 过滤模式(支持子域名)。
func matchSite(host, pattern string) bool {
if host == pattern {
return true
}
if strings.HasSuffix(host, "."+pattern) {
return true
}
return false
}
// consecutiveCount 统计标题中连续词对出现的次数(用于连续词加成)。
func consecutiveCount(title string, tokens []string) int {
c := 0
for i := 0; i < len(tokens)-1; i++ {
if strings.Contains(title, tokens[i]+tokens[i+1]) {
c++
}
}
return c
}
// repetitionSimilarity 计算标题与前序所有标题的最大相似度(基于编辑距离)。
// 相似度 > 0.5 的结果将被降权。
func repetitionSimilarity(titles []string, idx int) float64 {
if idx == 0 {
return 0
}
t := titles[idx]
if t == "" {
return 0
}
best := 0.0
for _, prev := range titles[:idx] {
if prev == "" {
continue
}
sim := 1 - float64(levenshtein(t, prev))/float64(max(len(t), len(prev)))
if sim > best {
best = sim
}
}
return best
}
// levenshtein 计算两个字符串的编辑距离(动态规划)。
func levenshtein(a, b string) int {
ra := []rune(a)
rb := []rune(b)
la, lb := len(ra), len(rb)
if la == 0 {
return lb
}
if lb == 0 {
return la
}
prev := make([]int, lb+1)
curr := make([]int, lb+1)
for j := 0; j <= lb; j++ {
prev[j] = j
}
for i := 1; i <= la; i++ {
curr[0] = i
for j := 1; j <= lb; j++ {
cost := 1
if ra[i-1] == rb[j-1] {
cost = 0
}
curr[j] = min3(curr[j-1]+1, prev[j]+1, prev[j-1]+cost)
}
prev, curr = curr, prev
}
return prev[lb]
}
func min3(a, b, c int) int {
if a < b {
if a < c {
return a
}
return c
}
if b < c {
return b
}
return c
}
// ---- 域名交错重排 ----
// rerank 使用堆结构对候选结果按域名交错排列,使不同域名的 URL 交替出现。
// 每个域名的第二次出现分数乘以 1/8,第三次 1/64,以此类推,确保结果多样性。
type domainHeap []rerankItem
type rerankItem struct {
score float64
url string
domainMul float64 // 域名衰减倍数
vec [12]float64
}
func (h domainHeap) Len() int { return len(h) }
func (h domainHeap) Less(i, j int) bool { return h[i].score*h[i].domainMul > h[j].score*h[j].domainMul }
func (h domainHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *domainHeap) Push(x interface{}) { *h = append(*h, x.(rerankItem)) }
func (h *domainHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// candidate 是候选 URL 的内部表示。
type candidate struct {
url string
relevance float64
scoreVec [12]float64
}
// rerank 对候选列表进行域名交错重排,返回分页范围内的结果。
func rerank(candidates []candidate, from, to int) []candidate {
// 按域名分组
domainItems := make(map[string][]candidate)
for _, c := range candidates {
h := netloc(c.url)
domainItems[h] = append(domainItems[h], c)
}
// 每个域名的 URL 列表取最后一个(分数最高)放入堆,其余保留
h := &domainHeap{}
heap.Init(h)
domainMul := make(map[string]float64)
for domain, items := range domainItems {
domainMul[domain] = 1.0
sort.Slice(items, func(i, j int) bool {
return items[i].scoreVec[0] < items[j].scoreVec[0]
})
top := items[len(items)-1]
domainItems[domain] = items[:len(items)-1]
heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec})
}
// 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够 to 条,
// 然后截取 [from:to] 段返回。
var result []candidate
for len(result) < to {
if h.Len() == 0 {
// 堆为空时,将所有域名剩余条目依次推入堆(每域一条)
anyPushed := false
for domain, items := range domainItems {
if len(items) == 0 {
continue
}
next := items[len(items)-1]
domainItems[domain] = items[:len(items)-1]
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
anyPushed = true
}
if !anyPushed {
break // 所有域名都没有剩余条目,结束
}
}
item := heap.Pop(h).(rerankItem)
result = append(result, candidate{url: item.url, scoreVec: item.vec})
domain := netloc(item.url)
domainMul[domain] /= 8 // 该域名的下一次出现衰减到 1/8
remaining := domainItems[domain]
if len(remaining) > 0 {
next := remaining[len(remaining)-1]
domainItems[domain] = remaining[:len(remaining)-1]
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
}
}
// 截取分页段
if from >= len(result) {
return nil
}
return result[from:]
}
// ---- 杂项辅助函数 ----
// readBodyLimited 从 HTTP 响应体读取最多 limit 字节(用于限制在线摘要抓取大小)。
func readBodyLimited(resp *http.Response, limit int64) string {
data := make([]byte, 0, limit)
buf := make([]byte, 4096)
var total int64
for {
n, err := resp.Body.Read(buf)
if n > 0 {
data = append(data, buf[:n]...)
total += int64(n)
if total >= limit {
break
}
}
if err != nil {
break
}
}
return string(data)
}
// truncate 将字符串截断到最多 n 个字符。
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}
// unescapeURL 对 URL 进行解码(%XX 转义)。
func unescapeURL(u string) string {
decoded, err := url.PathUnescape(u)
if err != nil {
return u
}
return decoded
}
// atoi 手写字符串转整数(不含负数和浮点)。
func atoi(s string) int {
n := 0
for _, c := range s {
if c < '0' || c > '9' {
return n
}
n = n*10 + int(c-'0')
}
return n
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// ---- 以下为内嵌的收获服务(harvester)逻辑 ----
// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。
type ingestPayload struct {
URL string `json:"url"` // 被索引页面的最终 URL
Keywords []struct {
Word string `json:"word"` // 关键词
Weight float32 `json:"weight"` // 该 URL 在该词下的权重
} `json:"keywords"`
}
// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]]
type ingestPayloadLegacy []any
// parsePayload 解析爬虫请求体,兼容新旧两种格式。
// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]}
// 旧格式(Python 爬虫):["url", [["word", weight], ...]]
func parsePayload(r *http.Request) (*ingestPayload, error) {
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
if err != nil {
return nil, err
}
// 先尝试新格式(Go 爬虫)
var modern ingestPayload
if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" {
return &modern, nil
}
// 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]]
var legacy ingestPayloadLegacy
if err := json.Unmarshal(body, &legacy); err != nil {
return nil, fmt.Errorf("invalid payload: %w", err)
}
if len(legacy) < 2 {
return nil, fmt.Errorf("invalid legacy payload: too few elements")
}
url, ok := legacy[0].(string)
if !ok {
return nil, fmt.Errorf("invalid url type")
}
kwsRaw, ok := legacy[1].([]any)
if !ok {
return nil, fmt.Errorf("invalid keywords type")
}
payload := &ingestPayload{URL: url}
for _, kw := range kwsRaw {
kwSlice, ok := kw.([]any)
if !ok || len(kwSlice) < 2 {
continue
}
word, _ := kwSlice[0].(string)
weight, _ := kwSlice[1].(float64)
if word == "" {
continue
}
payload.Keywords = append(payload.Keywords, struct {
Word string `json:"word"`
Weight float32 `json:"weight"`
}{word, float32(weight)})
}
if payload.URL == "" {
return nil, fmt.Errorf("empty url after parsing")
}
return payload, nil
}
// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。
func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
payload, err := parsePayload(r)
if err != nil {
http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest)
return
}
payload.URL = strings.ReplaceAll(payload.URL, "\n", "")
if payload.URL == "" {
http.Error(w, "empty url", http.StatusBadRequest)
return
}
s.memMu.Lock()
for _, kw := range payload.Keywords {
key := kw.Word
entries := s.mem[key]
if len(entries) > 15 {
low := s.lowThreshold(key)
if float64(kw.Weight) < low {
continue
}
}
s.mem[key] = append(entries, storage.IndexEntry{
Weight: kw.Weight,
URL: payload.URL,
})
atomic.AddInt64(&s.rowCount, 1)
}
s.memMu.Unlock()
if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) {
go s.Flush()
}
w.Write([]byte("ok"))
}
// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。
func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) {
s.Flush()
w.Write([]byte("flushed"))
}
// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。
func (s *Server) lowThreshold(key string) float64 {
existing, _ := s.db.GetIndex(key)
if len(existing) < config.MaxURLsPerKey {
return -1
}
weights := make([]float64, len(existing))
for i, e := range existing {
weights[i] = float64(e.Weight)
}
return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05
}
// flush 将内存中的索引批量合并写入磁盘,然后清空内存。
func (s *Server) flush() {
if !s.flushMu.TryLock() {
return
}
defer s.flushMu.Unlock()
s.memMu.Lock()
snapshot := s.mem
s.mem = make(map[string][]storage.IndexEntry)
atomic.StoreInt64(&s.rowCount, 0)
s.memMu.Unlock()
log.Printf("[harvester] flushing %d keys", len(snapshot))
items := make([]struct {
key string
entries []storage.IndexEntry
}, 0, len(snapshot))
for k, v := range snapshot {
items = append(items, struct {
key string
entries []storage.IndexEntry
}{k, v})
}
rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] })
type result struct {
key string
entries []storage.IndexEntry
}
results := make(chan result, len(items))
sem := make(chan struct{}, 8)
for _, item := range items {
sem <- struct{}{}
go func(k string, newEntries []storage.IndexEntry) {
defer func() { <-sem }()
merged := s.mergeKey(k, newEntries)
results <- result{k, merged}
}(item.key, item.entries)
}
batch := make(map[string][]storage.IndexEntry, len(items))
for range items {
r := <-results
batch[r.key] = r.entries
}
if err := s.db.BatchSetIndex(batch); err != nil {
log.Printf("[harvester] flush write error: %v", err)
}
log.Printf("[harvester] flush done, %d keys written", len(batch))
}
// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。
func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry {
existing, _ := s.db.GetIndex(key)
if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey {
return nil
}
merged := dedup(append(newEntries, existing...))
if rand.Float64() < 0.02 {
merged = dedupNormalised(merged)
}
if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 {
merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey)
}
return merged
}
// ---- 收获服务辅助函数 ----
func dedup(entries []storage.IndexEntry) []storage.IndexEntry {
seen := make(map[string]bool, len(entries))
out := make([]storage.IndexEntry, 0, len(entries))
for _, e := range entries {
if seen[e.URL] {
continue
}
seen[e.URL] = true
out = append(out, e)
}
return out
}
func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry {
sorted := make([]storage.IndexEntry, len(entries))
copy(sorted, entries)
for i := 0; i < len(sorted)-1; i++ {
for j := i + 1; j < len(sorted); j++ {
if len(sorted[j].URL) > len(sorted[i].URL) {
sorted[i], sorted[j] = sorted[j], sorted[i]
}
}
}
seen := make(map[string]bool)
out := make([]storage.IndexEntry, 0, len(sorted))
for _, e := range sorted {
k := normaliseURL(e.URL)
if seen[k] {
continue
}
seen[k] = true
out = append(out, e)
}
return out
}
func normaliseURL(u string) string {
if strings.HasPrefix(u, "https://") {
u = u[8:]
} else if strings.HasPrefix(u, "http://") {
u = u[7:]
}
return strings.TrimRight(u, "/")
}
func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry {
scored := make([]storage.IndexEntry, len(entries))
copy(scored, entries)
for i := 0; i < len(scored)-1; i++ {
for j := i + 1; j < len(scored); j++ {
si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL))
sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL))
if sj > si {
scored[i], scored[j] = scored[j], scored[i]
}
}
}
domainCount := make(map[string]int)
out := make([]storage.IndexEntry, 0, limit)
for _, e := range scored {
host := netloc(e.URL)
if host == "" {
host = e.URL
}
host = strings.ToLower(host)
isHome := isHomepage(e.URL)
if !isHome && domainCount[host] >= sameDomainLimit {
continue
}
domainCount[host]++
out = append(out, e)
if len(out) >= limit {
break
}
}
return out
}
func isHomepage(u string) bool {
u = strings.TrimPrefix(u, "https://")
u = strings.TrimPrefix(u, "http://")
return strings.Count(strings.TrimRight(u, "/"), "/") == 0
}
func nthLargest(values []float64, n int) float64 {
if n >= len(values) {
return 0
}
cp := make([]float64, len(values))
copy(cp, values)
for i := 0; i <= n; i++ {
maxIdx := i
for j := i + 1; j < len(cp); j++ {
if cp[j] > cp[maxIdx] {
maxIdx = j
}
}
cp[i], cp[maxIdx] = cp[maxIdx], cp[i]
}
return cp[n]
}