1668 lines
49 KiB
Go
1668 lines
49 KiB
Go
// Package search implements the user-facing search HTTP server.
|
||
// search 包对外提供 HTTP 搜索服务,接收查询请求并返回按多因子排序的搜索结果。
|
||
package search
|
||
|
||
import (
|
||
"container/heap" // 堆结构(域名交错排序)
|
||
"encoding/json" // JSON 序列化(响应输出)
|
||
"fmt" // 错误格式化
|
||
"io" // 读取请求体
|
||
"log" // 日志
|
||
"math" // 数学运算(Log、幂)
|
||
"math/rand" // 随机数(刷盘时打乱顺序、概率性去重/裁剪)
|
||
"net/http" // HTTP 服务端
|
||
"net/url" // URL 解析
|
||
"os" // 文件系统(静态文件读取)
|
||
"regexp" // 正则表达式(site: 过滤语法)
|
||
"sort" // 排序
|
||
"strconv" // 字符串转整数
|
||
"strings" // 字符串操作(URL 清洗)
|
||
"sync" // 互斥锁(保护内存索引、并发切片写入)
|
||
"sync/atomic" // 原子操作(计数器)
|
||
"time" // 时间戳
|
||
|
||
"sese-engine/analyzer" // 分词和语言检测
|
||
"sese-engine/config" // 排序权重配置
|
||
"sese-engine/crawler" // 爬虫(读取活跃线程数)
|
||
"sese-engine/info" // info 服务
|
||
"sese-engine/parser" // HTML 解析(在线摘要)
|
||
"sese-engine/storage" // 持久化存储
|
||
)
|
||
|
||
// Server 是搜索 HTTP 服务器,同时内嵌收获服务(统一在同一端口)。
|
||
type Server struct {
|
||
db *storage.DB
|
||
infoSvc *info.Service
|
||
analyzer *analyzer.Analyzer
|
||
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
|
||
|
||
// 以下为收获服务(harvester)内嵌字段
|
||
mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目
|
||
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
||
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
||
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
||
|
||
// flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务
|
||
indexCache map[string][]storage.IndexEntry
|
||
indexCacheMu sync.RWMutex
|
||
indexCacheHits int64 // 缓存命中计数(原子)
|
||
|
||
// backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发)
|
||
backlinkRunner interface {
|
||
Status() map[string]interface{}
|
||
RunNow() error
|
||
}
|
||
}
|
||
|
||
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
|
||
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
|
||
s := &Server{
|
||
db: db,
|
||
infoSvc: infoSvc,
|
||
analyzer: a,
|
||
mem: make(map[string][]storage.IndexEntry),
|
||
httpCli: &http.Client{
|
||
Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second,
|
||
},
|
||
}
|
||
// 启动定期刷盘 goroutine
|
||
go s.runPeriodicFlush()
|
||
return s
|
||
}
|
||
|
||
// SetBacklinkRunner 注入反向链接计算器(用于 admin 手动触发)。
|
||
func (s *Server) SetBacklinkRunner(r interface {
|
||
Status() map[string]interface{}
|
||
RunNow() error
|
||
}) {
|
||
s.backlinkRunner = r
|
||
}
|
||
|
||
// runPeriodicFlush 每隔 FlushIntervalSeconds 秒触发一次刷盘。
|
||
func (s *Server) runPeriodicFlush() {
|
||
ticker := time.NewTicker(time.Duration(config.FlushIntervalSeconds()) * time.Second)
|
||
defer ticker.Stop()
|
||
for range ticker.C {
|
||
s.Flush()
|
||
}
|
||
}
|
||
|
||
// Flush 公开的刷盘方法,供定时任务和外部调用。
|
||
func (s *Server) Flush() { s.flush() }
|
||
|
||
// Handler 返回 HTTP 路由处理器(统一端口,同时提供搜索、收获和静态文件服务)。
|
||
func (s *Server) Handler() http.Handler {
|
||
mux := http.NewServeMux()
|
||
// 搜索路由
|
||
mux.HandleFunc("/search", s.handleSearch)
|
||
// 收获服务路由(爬虫数据写入)
|
||
mux.HandleFunc("/l", s.handleIngest) // /l:接收爬虫关键词索引数据
|
||
mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘
|
||
// 管理接口
|
||
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
|
||
mux.HandleFunc("/admin/stats", s.handleAdminStats)
|
||
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
|
||
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
|
||
mux.HandleFunc("/admin/pending", s.handleAdminPending)
|
||
mux.HandleFunc("/admin/workers", s.handleAdminWorkers)
|
||
mux.HandleFunc("/admin/backlink", s.handleAdminBacklink)
|
||
// 静态文件(SPA fallback)
|
||
mux.Handle("/", spaHandler{dist: "dist"})
|
||
return mux
|
||
}
|
||
|
||
// spaHandler 提供静态文件服务,并对非文件路径(如 /admin)返回 index.html(SPA 支持)。
|
||
type spaHandler struct {
|
||
dist string
|
||
}
|
||
|
||
func (h spaHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||
path := r.URL.Path
|
||
|
||
// API 路径(可能有后续处理)
|
||
if strings.HasPrefix(path, "/search") ||
|
||
strings.HasPrefix(path, "/admin") ||
|
||
strings.HasPrefix(path, "/l") ||
|
||
path == "/flush" {
|
||
http.NotFound(w, r)
|
||
return
|
||
}
|
||
|
||
// 去掉前缀斜杠
|
||
staticPath := strings.TrimPrefix(path, "/")
|
||
if staticPath == "" {
|
||
staticPath = "index.html"
|
||
}
|
||
|
||
// 尝试直接文件
|
||
if data, err := os.ReadFile(h.dist + "/" + staticPath); err == nil {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
ext := strings.ToLower(path[strings.LastIndex(path, ".")+1:])
|
||
switch ext {
|
||
case "js":
|
||
w.Header().Set("Content-Type", "application/javascript")
|
||
case "css":
|
||
w.Header().Set("Content-Type", "text/css")
|
||
case "svg":
|
||
w.Header().Set("Content-Type", "image/svg+xml")
|
||
case "html":
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
case "png", "jpg", "jpeg", "gif", "ico", "webp":
|
||
w.Header().Set("Content-Type", "image/"+ext)
|
||
}
|
||
w.Write(data)
|
||
return
|
||
}
|
||
|
||
// 如果路径包含文件扩展名但文件不存在,返回 404
|
||
if strings.Contains(staticPath, ".") {
|
||
http.NotFound(w, r)
|
||
return
|
||
}
|
||
|
||
// SPA fallback:返回 index.html
|
||
if data, err := os.ReadFile(h.dist + "/index.html"); err == nil {
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Write(data)
|
||
return
|
||
}
|
||
|
||
http.NotFound(w, r)
|
||
}
|
||
|
||
// ListenAndServe 启动搜索服务器。
|
||
func (s *Server) ListenAndServe(addr string) error {
|
||
log.Printf("[search] listening on %s", addr)
|
||
return http.ListenAndServe(addr, s.Handler())
|
||
}
|
||
|
||
// ---- Admin 接口 ----
|
||
|
||
// recentItem 是 /admin/recent 接口返回的单条记录。
|
||
type recentItem struct {
|
||
URL string `json:"url"`
|
||
Title string `json:"title"`
|
||
Description string `json:"description"`
|
||
Domain string `json:"domain"`
|
||
Language map[string]float64 `json:"language"`
|
||
WordCount int `json:"word_count"`
|
||
CrawledAt int64 `json:"crawled_at"`
|
||
}
|
||
|
||
// handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。
|
||
// 参数:limit(默认50,最大200)。
|
||
func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
limit := 50
|
||
if l := r.URL.Query().Get("limit"); l != "" {
|
||
if v, err := strconv.Atoi(l); err == nil && v > 0 {
|
||
limit = v
|
||
}
|
||
}
|
||
if limit > 200 {
|
||
limit = 200
|
||
}
|
||
|
||
type entry struct {
|
||
url string
|
||
snippet *storage.SnippetEntry
|
||
siteInfo *storage.SiteInfo
|
||
}
|
||
|
||
var items []entry
|
||
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
|
||
siteInfo, _ := s.db.GetSiteInfo(netloc(url))
|
||
items = append(items, entry{url, snippet, siteInfo})
|
||
return nil
|
||
})
|
||
|
||
// 按时间倒序
|
||
sort.Slice(items, func(i, j int) bool {
|
||
return items[i].snippet.Timestamp > items[j].snippet.Timestamp
|
||
})
|
||
|
||
if len(items) > limit {
|
||
items = items[:limit]
|
||
}
|
||
|
||
result := make([]recentItem, 0, len(items))
|
||
for _, e := range items {
|
||
lang := e.siteInfo.Languages
|
||
if lang == nil {
|
||
lang = make(map[string]float64)
|
||
}
|
||
desc := e.snippet.Description
|
||
if len(desc) > 200 {
|
||
desc = desc[:200]
|
||
}
|
||
result = append(result, recentItem{
|
||
URL: e.url,
|
||
Title: e.snippet.Title,
|
||
Description: desc,
|
||
Domain: netloc(e.url),
|
||
Language: lang,
|
||
WordCount: len(e.snippet.Text),
|
||
CrawledAt: e.snippet.Timestamp,
|
||
})
|
||
}
|
||
|
||
resp := map[string]any{
|
||
"items": result,
|
||
"total": len(items),
|
||
}
|
||
json.NewEncoder(w).Encode(resp)
|
||
}
|
||
|
||
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
|
||
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
domainCount := make(map[string]int)
|
||
langCount := make(map[string]int)
|
||
totalWords := 0
|
||
total := 0
|
||
recrawlEligible := 0
|
||
now := time.Now().Unix()
|
||
maxAge := int64(config.RecrawlMaxAge())
|
||
|
||
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
|
||
total++
|
||
domain := netloc(url)
|
||
domainCount[domain]++
|
||
totalWords += len(snippet.Text)
|
||
if now-snippet.Timestamp >= maxAge {
|
||
recrawlEligible++
|
||
}
|
||
|
||
siteInfo, _ := s.db.GetSiteInfo(domain)
|
||
if siteInfo != nil {
|
||
for lang, ratio := range siteInfo.Languages {
|
||
if ratio > 0.1 {
|
||
langCount[lang]++
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
|
||
// 排序取 Top
|
||
type kv struct {
|
||
k string
|
||
v int
|
||
}
|
||
topDomains := make([]kv, 0, len(domainCount))
|
||
for k, v := range domainCount {
|
||
topDomains = append(topDomains, kv{k, v})
|
||
}
|
||
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
|
||
if len(topDomains) > 20 {
|
||
topDomains = topDomains[:20]
|
||
}
|
||
topLangs := make([]kv, 0, len(langCount))
|
||
for k, v := range langCount {
|
||
topLangs = append(topLangs, kv{k, v})
|
||
}
|
||
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
|
||
if len(topLangs) > 10 {
|
||
topLangs = topLangs[:10]
|
||
}
|
||
|
||
domainsMap := make(map[string]int)
|
||
for _, kv := range topDomains {
|
||
domainsMap[kv.k] = kv.v
|
||
}
|
||
langsMap := make(map[string]int)
|
||
for _, kv := range topLangs {
|
||
langsMap[kv.k] = kv.v
|
||
}
|
||
|
||
resp := map[string]any{
|
||
"total_urls": total,
|
||
"total_words": totalWords,
|
||
"total_domains": len(domainCount), // 真实的域名总数(非Top 20)
|
||
"domains": domainsMap,
|
||
"languages": langsMap,
|
||
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
|
||
"recrawl_eligible": recrawlEligible, // 已过期、可被重爬的 URL 数量
|
||
}
|
||
|
||
json.NewEncoder(w).Encode(resp)
|
||
}
|
||
|
||
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
|
||
// GET: 返回所有未访问的 priority 条目列表
|
||
// POST: 添加一条 URL 或域名(body: {url: "..."})
|
||
// DELETE: 删除指定 URL(query: ?url=...)
|
||
func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
switch r.Method {
|
||
case http.MethodOptions:
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,DELETE,OPTIONS")
|
||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||
w.WriteHeader(204)
|
||
return
|
||
|
||
case http.MethodGet:
|
||
entries, err := s.db.GetPriorityURLs()
|
||
if err != nil {
|
||
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
|
||
return
|
||
}
|
||
if entries == nil {
|
||
entries = []storage.PriorityEntry{}
|
||
}
|
||
json.NewEncoder(w).Encode(map[string]any{"items": entries, "count": len(entries)})
|
||
|
||
case http.MethodPost:
|
||
var body struct {
|
||
URL string `json:"url"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.URL) == "" {
|
||
http.Error(w, `{"error":"missing url field"}`, 400)
|
||
return
|
||
}
|
||
|
||
raw := strings.TrimSpace(body.URL)
|
||
entry := storage.PriorityEntry{AddedAt: time.Now().Unix()}
|
||
|
||
// 判断是完整 URL 还是纯域名
|
||
if strings.Contains(raw, "/") || strings.HasPrefix(raw, "http") {
|
||
u, err := url.Parse(raw)
|
||
if err != nil || u.Host == "" {
|
||
http.Error(w, `{"error":"invalid url"}`, 400)
|
||
return
|
||
}
|
||
if u.Scheme == "" {
|
||
u.Scheme = "https"
|
||
}
|
||
entry.URL = u.String()
|
||
entry.IsDomain = false
|
||
} else {
|
||
entry.URL = "https://" + raw
|
||
entry.IsDomain = true
|
||
}
|
||
|
||
if err := s.db.AddPriorityURL(entry); err != nil {
|
||
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
|
||
return
|
||
}
|
||
json.NewEncoder(w).Encode(map[string]string{"status": "added", "url": entry.URL})
|
||
|
||
case http.MethodDelete:
|
||
urlParam := r.URL.Query().Get("url")
|
||
if urlParam == "" {
|
||
http.Error(w, `{"error":"missing url parameter"}`, 400)
|
||
return
|
||
}
|
||
if err := s.db.RemovePriorityURL(urlParam); err != nil {
|
||
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
|
||
return
|
||
}
|
||
json.NewEncoder(w).Encode(map[string]string{"status": "removed"})
|
||
|
||
default:
|
||
w.Header().Set("Allow", "GET,POST,DELETE")
|
||
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||
}
|
||
}
|
||
|
||
// handleAdminFlush 强制刷盘。
|
||
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
if r.Method != http.MethodGet && r.Method != http.MethodPost {
|
||
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||
return
|
||
}
|
||
s.Flush()
|
||
w.Write([]byte("flushed"))
|
||
}
|
||
|
||
// handleAdminPending 返回内存中未刷盘的索引条目数量。
|
||
func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
count := atomic.LoadInt64(&s.rowCount)
|
||
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
|
||
}
|
||
|
||
// handleAdminWorkers 查看和动态调整爬虫并发线程数。
|
||
// GET 返回 configured(设定值)和 active(实际运行中的 goroutine 数)
|
||
// POST {"workers": N} 动态修改(范围 1~500),下一轮 epoch 立即生效
|
||
func (s *Server) handleAdminWorkers(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
switch r.Method {
|
||
case http.MethodOptions:
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
|
||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||
w.WriteHeader(204)
|
||
return
|
||
|
||
case http.MethodGet:
|
||
json.NewEncoder(w).Encode(map[string]int64{
|
||
"configured": int64(config.CrawlerWorkers()),
|
||
"active": crawler.GlobalActiveWorkers(),
|
||
})
|
||
|
||
case http.MethodPost:
|
||
var body struct {
|
||
Workers int `json:"workers"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||
http.Error(w, `{"error":"invalid json"}`, 400)
|
||
return
|
||
}
|
||
if body.Workers < 1 || body.Workers > 500 {
|
||
http.Error(w, `{"error":"workers must be between 1 and 500"}`, 400)
|
||
return
|
||
}
|
||
old := config.CrawlerWorkers()
|
||
config.SetCrawlerWorkers(body.Workers)
|
||
log.Printf("[admin] workers changed: %d → %d (takes effect next epoch)", old, body.Workers)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"status": "ok",
|
||
"old": old,
|
||
"current": config.CrawlerWorkers(),
|
||
})
|
||
|
||
default:
|
||
w.Header().Set("Allow", "GET,POST")
|
||
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||
}
|
||
}
|
||
|
||
// handleAdminBacklink 查看反链计算状态和手动触发。
|
||
// GET 返回 running(是否计算中)、next_run(下次执行时间)、last_run(上次完成时间)
|
||
// POST 触发立即执行一次反链计算
|
||
func (s *Server) handleAdminBacklink(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
if s.backlinkRunner == nil {
|
||
http.Error(w, `{"error":"backlink runner not available"}`, 503)
|
||
return
|
||
}
|
||
|
||
switch r.Method {
|
||
case http.MethodOptions:
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
|
||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||
w.WriteHeader(204)
|
||
return
|
||
|
||
case http.MethodGet:
|
||
json.NewEncoder(w).Encode(s.backlinkRunner.Status())
|
||
|
||
case http.MethodPost:
|
||
go s.backlinkRunner.RunNow() // 异步执行,避免阻塞 HTTP 请求
|
||
json.NewEncoder(w).Encode(map[string]string{"status": "started"})
|
||
|
||
default:
|
||
w.Header().Set("Allow", "GET,POST")
|
||
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||
}
|
||
}
|
||
|
||
// ---- 搜索处理器 ----
|
||
|
||
// searchResponse 是搜索 API 的 JSON 响应结构。
|
||
type searchResponse struct {
|
||
Tokens []string `json:"tokens"` // 查询的分词结果
|
||
Counts map[string]int `json:"counts"` // 每个词在索引中出现的 URL 数量
|
||
Results []searchResult `json:"results"` // 排序后的搜索结果列表
|
||
Total int `json:"total"` // 符合 site: 过滤条件前的总候选数
|
||
}
|
||
|
||
// searchResult 是单条搜索结果。
|
||
type searchResult struct {
|
||
Score float64 `json:"score"` // 综合排序分数
|
||
URL string `json:"url"` // 页面 URL
|
||
Snippet *snippetInfo `json:"snippet,omitempty"` // 摘要信息(标题/描述/正文)
|
||
Relevance map[string]float64 `json:"relevance"` // 每个关键词在该 URL 下的权重
|
||
DomainCount int `json:"domain_count"` // 该 URL 所属域名的总候选数
|
||
Factors map[string]float64 `json:"factors,omitempty"` // 各排序因子的详细分数
|
||
}
|
||
|
||
// snippetInfo 封装页面摘要的标题、描述和正文片段。
|
||
type snippetInfo struct {
|
||
Title string `json:"title"` // 页面标题
|
||
Description string `json:"description"` // meta description
|
||
Text string `json:"text"` // 正文前 256 字符
|
||
}
|
||
|
||
// siteRe 用于匹配 site: 过滤语法的正则(支持 site:example.com 语法)。
|
||
var siteRe = regexp.MustCompile(`^site:(.+)$`)
|
||
|
||
// handleSearch 处理 GET /search 请求。
|
||
// 参数:q(查询词),qh(URL 编码的查询词),slice(分页范围,格式 "from:to")。
|
||
func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Access-Control-Allow-Origin", "*") // 允许跨域
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
|
||
// 获取查询词
|
||
q := r.URL.Query().Get("q")
|
||
if q == "" {
|
||
// qh:URL 编码的查询词(用于含特殊字符的查询)
|
||
if qh := r.URL.Query().Get("qh"); qh != "" {
|
||
decoded, err := url.PathUnescape(qh)
|
||
if err == nil {
|
||
q = decoded
|
||
}
|
||
}
|
||
}
|
||
|
||
// 解析分页参数(格式 "0:10")
|
||
sliceStr := r.URL.Query().Get("slice")
|
||
sliceFrom, sliceTo := 0, 10
|
||
if sliceStr != "" {
|
||
parts := strings.SplitN(sliceStr, ":", 2)
|
||
if len(parts) == 2 {
|
||
a := atoi(parts[0])
|
||
b := atoi(parts[1])
|
||
if a >= 0 && b > a && b-a <= 20 {
|
||
sliceFrom, sliceTo = a, b
|
||
}
|
||
}
|
||
}
|
||
|
||
// 解析查询分词,并提取 site: 过滤条件
|
||
var tokens []string
|
||
var siteFilter string
|
||
for _, part := range strings.Fields(q) {
|
||
if m := siteRe.FindStringSubmatch(part); len(m) > 1 {
|
||
siteFilter = m[1] // site:example.com 提取目标主机名
|
||
} else {
|
||
// 搜索模式分词(CutForSearch):更细粒度,"气象局" → ["气象", "局", "气象局"]
|
||
segs := s.analyzer.Segment(part, true)
|
||
for _, t := range segs {
|
||
if !s.infoSvc.IsBlocked(t) { // 过滤屏蔽词
|
||
tokens = append(tokens, t)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 智能去重:当一个词是另一个词的子串时,保留两者但标记子词关系。
|
||
// 这样评分时可以用长词(精确匹配)加分,同时不因缺少短词而过度惩罚。
|
||
// 算法:对于每一对词 (a, b),如果 a 是 b 的子串且 a != b,则 a 是 b 的子词。
|
||
tokens = deduplicateSubstrings(tokens)
|
||
|
||
// 最多保留 20 个词(避免查询过于宽泛)
|
||
if len(tokens) > 20 {
|
||
tokens = tokens[:20]
|
||
}
|
||
|
||
results, total := s.query(tokens, sliceFrom, sliceTo, siteFilter)
|
||
|
||
// 统计每个词命中的 URL 数量(供前端展示)
|
||
counts := make(map[string]int, len(tokens))
|
||
for _, t := range tokens {
|
||
entries, _ := s.db.GetIndex(t)
|
||
counts[t] = len(entries)
|
||
}
|
||
|
||
resp := searchResponse{
|
||
Tokens: tokens,
|
||
Counts: counts,
|
||
Results: results,
|
||
Total: total,
|
||
}
|
||
json.NewEncoder(w).Encode(resp)
|
||
}
|
||
|
||
// query 执行多关键词搜索,返回排序后的结果列表。
|
||
// 搜索流程:加载倒排索引 → 构建 URL 候选集 → 多因子评分 → 域名交错重排 → 截取分页。
|
||
func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]searchResult, int) {
|
||
if len(tokens) == 0 {
|
||
return nil, 0
|
||
}
|
||
|
||
// 加载每个词对应的倒排索引条目(磁盘 + 内存)
|
||
type tokenIndex struct {
|
||
token string
|
||
entries []storage.IndexEntry
|
||
defVal float64 // 缺省权重(词在索引中条目已满时使用)
|
||
}
|
||
tokenIndexes := make([]tokenIndex, 0, len(tokens))
|
||
maxURLsPerKey := config.MaxURLsPerKey()
|
||
|
||
// 读锁保护内存索引访问(与刷盘互斥,但多个搜索可并发)
|
||
s.memMu.RLock()
|
||
for _, t := range tokens {
|
||
// 1. 从磁盘加载
|
||
diskEntries, _ := s.db.GetIndex(t)
|
||
// 2. 从内存加载(尚未刷盘的数据)
|
||
memEntries := s.mem[t]
|
||
// 3. 合并(内存数据优先,因为更新)
|
||
entries := make([]storage.IndexEntry, 0, len(diskEntries)+len(memEntries))
|
||
seen := make(map[string]bool, len(diskEntries)+len(memEntries))
|
||
for _, e := range memEntries {
|
||
if !seen[e.URL] {
|
||
entries = append(entries, e)
|
||
seen[e.URL] = true
|
||
}
|
||
}
|
||
for _, e := range diskEntries {
|
||
if !seen[e.URL] {
|
||
entries = append(entries, e)
|
||
seen[e.URL] = true
|
||
}
|
||
}
|
||
// 计算缺省权重:当条目数达到上限时,权重低于第 MaxURLsPerKey 名的条目使用缺省权重
|
||
defVal := 1.0 / 10000 * float64(max(100, len(entries))) / float64(maxURLsPerKey)
|
||
if len(entries) >= maxURLsPerKey {
|
||
weights := make([]float64, len(entries))
|
||
for i, e := range entries {
|
||
weights[i] = float64(e.Weight)
|
||
}
|
||
sort.Sort(sort.Reverse(sort.Float64Slice(weights)))
|
||
defVal = math.Max(1.0/10000, weights[maxURLsPerKey-1]/2)
|
||
}
|
||
tokenIndexes = append(tokenIndexes, tokenIndex{t, entries, defVal})
|
||
}
|
||
s.memMu.RUnlock()
|
||
|
||
// 构建 URL → (词 → 权重) 映射,收集所有候选 URL
|
||
urlWeights := make(map[string]map[string]float64)
|
||
for _, ti := range tokenIndexes {
|
||
for _, e := range ti.entries {
|
||
if urlWeights[e.URL] == nil {
|
||
urlWeights[e.URL] = make(map[string]float64)
|
||
}
|
||
urlWeights[e.URL][ti.token] = float64(e.Weight)
|
||
}
|
||
}
|
||
|
||
// site: 过滤
|
||
total := len(urlWeights)
|
||
if siteFilter != "" {
|
||
filtered := make(map[string]map[string]float64)
|
||
for u, vs := range urlWeights {
|
||
h := netloc(u)
|
||
if matchSite(h, siteFilter) {
|
||
filtered[u] = vs
|
||
}
|
||
}
|
||
urlWeights = filtered
|
||
total = len(urlWeights)
|
||
}
|
||
|
||
// 构建每个词对应的缺省权重 map
|
||
defVals := make(map[string]float64, len(tokenIndexes))
|
||
for _, ti := range tokenIndexes {
|
||
defVals[ti.token] = ti.defVal
|
||
}
|
||
|
||
// 计算每个 URL 的相关性和初始分数
|
||
// 评分策略:部分匹配加权和 + 缺词软惩罚(替代原来的全词乘积)
|
||
// 全词乘积问题:一个 URL 只要缺少任何一个查询词,rel 就接近 0,
|
||
// 导致"气象局"拆成 ["气象局","局"] 后,只有"气象"没有"气象局"的页面被淹没。
|
||
missPenalty := config.MissPenalty()
|
||
candidates := make([]candidate, 0, len(urlWeights))
|
||
for u, vs := range urlWeights {
|
||
// 统计实际匹配的词数和权重总和
|
||
matchedCount := 0
|
||
sumWeight := 0.0
|
||
for _, ti := range tokenIndexes {
|
||
vp := vs[ti.token]
|
||
if vp == 0 {
|
||
vp = defVals[ti.token]
|
||
}
|
||
if vp > 0.06 {
|
||
vp = math.Log((vp-0.06)*40+1)/40 + 0.06
|
||
}
|
||
sumWeight += vp
|
||
// 只有权重超过默认值才算真正匹配(排除了 defVal 填充的假匹配)
|
||
if vs[ti.token] > 0 {
|
||
matchedCount++
|
||
}
|
||
}
|
||
totalTokens := len(tokenIndexes)
|
||
// 部分匹配相关性 = 加权平均 × 匹配覆盖率加成
|
||
// matchRatio:匹配词占比,全部匹配=1,全部缺失=0
|
||
matchRatio := float64(matchedCount) / float64(totalTokens)
|
||
// avgWeight:匹配词的平均权重
|
||
avgWeight := sumWeight / float64(totalTokens)
|
||
// rel = 平均权重 × (匹配率 + 未匹配部分的软惩罚)
|
||
// missPenalty 控制未匹配词的惩罚力度:
|
||
// 0 = 完全不惩罚(纯加权和)
|
||
// 1 = 缺词的权重取 0(等同于全词乘积的极端情况)
|
||
rel := avgWeight * (matchRatio + (1-matchRatio)*missPenalty)
|
||
// 反向链接繁荣加分
|
||
prosper := 1 + s.infoSvc.Prosper(u)*config.BacklinkWeight()
|
||
bad := badURL(u)
|
||
adjust := s.infoSvc.Adjust(netloc(u))
|
||
// 基础分数 = 相关性 × 繁荣值 × URL质量 × 人工调整
|
||
score := rel * prosper * (1 - bad) * adjust * 0.1
|
||
|
||
// 12 维分数向量:分别记录各项因子,供后续多阶段调整
|
||
var vec [12]float64
|
||
vec[0] = score // 0: 综合分数
|
||
vec[1] = rel // 1: 相关性
|
||
vec[2] = prosper // 2: 繁荣值
|
||
vec[3] = 1 - bad // 3: URL 质量
|
||
vec[4] = 1 // 4: 语种倍数(待填充)
|
||
vec[5] = 1 // 5: 重复惩罚(待填充)
|
||
vec[6] = adjust // 6: 人工调整
|
||
vec[7] = 1 // 7: 网站时间衰减(待填充)
|
||
vec[8] = 1 // 8: 连续词加成(待填充)
|
||
vec[9] = 1 // 9: 关键词内容(预留)
|
||
vec[10] = 1 // 10: URL 时间衰减(待填充)
|
||
vec[11] = 0.1 // 11: 常数因子
|
||
|
||
candidates = append(candidates, candidate{u, rel, vec})
|
||
}
|
||
|
||
// 初步排序
|
||
sort.Slice(candidates, func(i, j int) bool {
|
||
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
|
||
})
|
||
|
||
// 阶段一:加载网站信息,计算语种倍数和时间衰减(Top 256 并发)
|
||
now := time.Now().Unix()
|
||
limit256 := 256
|
||
if len(candidates) < 256 {
|
||
limit256 = len(candidates)
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < limit256; i++ {
|
||
wg.Add(1)
|
||
go func(idx int) {
|
||
defer wg.Done()
|
||
c := &candidates[idx]
|
||
h := netloc(c.url)
|
||
siteInfo, _ := s.db.GetSiteInfo(h)
|
||
langMul := languageMultiplier(siteInfo)
|
||
timeMul := timeMul(siteInfo, now)
|
||
urlTimeMul := urlTimeMul(s.db, c.url, now)
|
||
|
||
// 更新综合分数和各项因子
|
||
c.scoreVec[0] = c.scoreVec[0] * 10 * langMul * timeMul * urlTimeMul
|
||
c.scoreVec[4] = langMul
|
||
c.scoreVec[7] = timeMul
|
||
c.scoreVec[10] = urlTimeMul
|
||
}(i)
|
||
}
|
||
wg.Wait()
|
||
|
||
sort.Slice(candidates, func(i, j int) bool {
|
||
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
|
||
})
|
||
|
||
// 阶段二:连续词加成和标题重复惩罚(Top 80)
|
||
limit80 := 80
|
||
if len(candidates) < 80 {
|
||
limit80 = len(candidates)
|
||
}
|
||
|
||
titles := make([]string, limit80)
|
||
for i := 0; i < limit80; i++ {
|
||
if snippet, err := s.db.GetSnippet(candidates[i].url); err == nil {
|
||
titles[i] = snippet.Title
|
||
}
|
||
}
|
||
|
||
// 重复惩罚:与前序结果标题相似度过高则降权
|
||
for i := 0; i < limit80; i++ {
|
||
h := repetitionSimilarity(titles, i)
|
||
consecutive := consecutiveCount(titles[i], tokens)
|
||
repMul := 1.0
|
||
if h > 0.5 {
|
||
repMul = 1 - (h - 0.5)
|
||
}
|
||
// 连续词出现越多,乘以 config.ConsecutiveKeyWeight(>1)加成
|
||
consMul := math.Pow(config.ConsecutiveKeyWeight(), float64(consecutive))
|
||
candidates[i].scoreVec[0] *= repMul * consMul
|
||
candidates[i].scoreVec[5] = repMul
|
||
candidates[i].scoreVec[8] = consMul
|
||
}
|
||
|
||
sort.Slice(candidates, func(i, j int) bool {
|
||
return candidates[i].scoreVec[0] > candidates[j].scoreVec[0]
|
||
})
|
||
|
||
// 阶段三:域名交错重排(使结果更丰富多样)
|
||
reranked := rerank(candidates, from, to)
|
||
|
||
// 并发获取摘要
|
||
results := make([]searchResult, 0, len(reranked))
|
||
var snippetMu sync.Mutex
|
||
var snippetWg sync.WaitGroup
|
||
|
||
for _, c := range reranked {
|
||
snippetWg.Add(1)
|
||
go func(cand candidate) {
|
||
defer snippetWg.Done()
|
||
snip := s.getSnippet(cand.url)
|
||
r := searchResult{
|
||
Score: cand.scoreVec[0],
|
||
URL: unescapeURL(cand.url),
|
||
Snippet: snip,
|
||
Relevance: make(map[string]float64),
|
||
DomainCount: 0,
|
||
Factors: map[string]float64{
|
||
"relevance": cand.scoreVec[1],
|
||
"backlink": cand.scoreVec[2],
|
||
"url_quality": cand.scoreVec[3],
|
||
"language": cand.scoreVec[4],
|
||
"repetition": cand.scoreVec[5],
|
||
"adjust": cand.scoreVec[6],
|
||
"site_time": cand.scoreVec[7],
|
||
"consecutive": cand.scoreVec[8],
|
||
"url_time": cand.scoreVec[10],
|
||
},
|
||
}
|
||
for _, ti := range tokenIndexes {
|
||
r.Relevance[ti.token] = urlWeights[cand.url][ti.token]
|
||
}
|
||
snippetMu.Lock()
|
||
results = append(results, r)
|
||
snippetMu.Unlock()
|
||
}(c)
|
||
}
|
||
snippetWg.Wait()
|
||
|
||
// 保持 rerank 的原始顺序(并发写入打乱了顺序)
|
||
urlOrder := make(map[string]int)
|
||
for i, c := range reranked {
|
||
urlOrder[c.url] = i
|
||
}
|
||
sort.Slice(results, func(i, j int) bool {
|
||
return urlOrder[results[i].URL] < urlOrder[results[j].URL]
|
||
})
|
||
|
||
return results, total
|
||
}
|
||
|
||
// getSnippet 获取某 URL 的摘要,优先从缓存读取,缓存未命中则在线抓取。
|
||
func (s *Server) getSnippet(rawURL string) *snippetInfo {
|
||
// 优先读缓存
|
||
if entry, err := s.db.GetSnippet(rawURL); err == nil {
|
||
snip := buildSnippet(entry)
|
||
return snip
|
||
}
|
||
if !config.UseOnlineSnippet() {
|
||
return nil
|
||
}
|
||
// 在线抓取(不使用 robots.txt,适用于搜索摘要场景)
|
||
req, err := http.NewRequest("GET", rawURL, nil)
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
req.Header.Set("User-Agent", config.SpiderName())
|
||
resp, err := s.httpCli.Do(req)
|
||
if err != nil || resp.StatusCode != 200 {
|
||
return nil
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
ct := resp.Header.Get("Content-Type")
|
||
if !strings.Contains(ct, "text/html") {
|
||
return nil
|
||
}
|
||
body := readBodyLimited(resp, 60000)
|
||
title, desc, text, _ := parser.ParseHTML(body, resp.Request.URL.String())
|
||
entry := &storage.SnippetEntry{
|
||
Title: title,
|
||
Description: truncate(desc, 256),
|
||
Text: truncate(text, 256),
|
||
Timestamp: time.Now().Unix(),
|
||
}
|
||
_ = s.db.SetSnippet(rawURL, entry)
|
||
return buildSnippet(entry)
|
||
}
|
||
|
||
func buildSnippet(entry *storage.SnippetEntry) *snippetInfo {
|
||
if entry == nil || (entry.Title == "" && entry.Description == "" && entry.Text == "") {
|
||
return nil
|
||
}
|
||
return &snippetInfo{
|
||
Title: entry.Title,
|
||
Description: entry.Description,
|
||
Text: entry.Text,
|
||
}
|
||
}
|
||
|
||
// ---- 评分辅助函数 ----
|
||
|
||
// languageMultiplier 根据网站的语种分布计算语种倍数。
|
||
// 中文占比越高且无关语言占比越低,则倍数越高(加分);否则降权。
|
||
func languageMultiplier(si *storage.SiteInfo) float64 {
|
||
if si == nil || len(si.Languages) == 0 {
|
||
return 1.0
|
||
}
|
||
total := 0.0
|
||
for _, v := range si.Languages {
|
||
total += v
|
||
}
|
||
chinese := si.Languages["zh"] / total
|
||
weird := (total - si.Languages["zh"] - si.Languages["en"] - si.Languages["ja"]) / total
|
||
languageWeight := config.LanguageWeight()
|
||
return 1 + chinese*languageWeight - weird*languageWeight
|
||
}
|
||
|
||
// timeMul 根据网站最后访问时间计算时间衰减倍数(越久远衰减越多)。
|
||
func timeMul(si *storage.SiteInfo, now int64) float64 {
|
||
if si == nil {
|
||
return 1.0
|
||
}
|
||
t := si.LastVisitTime
|
||
if t == 0 {
|
||
t = 1648000000 // 默认时间戳(2022 年初)
|
||
}
|
||
days := (now - t) / (3600 * 24)
|
||
if days < 0 {
|
||
days = 0
|
||
}
|
||
if days > 180 {
|
||
days = 180 // 最多衰减到约半年前
|
||
}
|
||
if days > 0 {
|
||
days-- // 跳过第一天
|
||
}
|
||
return math.Pow(config.WeightDailyDecay(), float64(days))
|
||
}
|
||
|
||
// urlTimeMul 根据该 URL 的摘要抓取时间计算时间衰减倍数(30 天内不衰减)。
|
||
func urlTimeMul(db *storage.DB, rawURL string, now int64) float64 {
|
||
entry, err := db.GetSnippet(rawURL)
|
||
if err != nil || entry == nil {
|
||
return 1.0
|
||
}
|
||
days := (now - entry.Timestamp) / (3600 * 24)
|
||
if days <= 30 {
|
||
return 1.0
|
||
}
|
||
return math.Pow((2+config.WeightDailyDecay())/3, float64(days))
|
||
}
|
||
|
||
// badURL 返回 URL 的"劣质"评分(0~0.9)。
|
||
func badURL(u string) float64 {
|
||
s := math.Max(0, float64(len(u)-30)/200.0)
|
||
if strings.Contains(u, ".htm") || strings.Contains(u, ".php") {
|
||
s += (1 - s) * 0.3
|
||
}
|
||
if strings.Count(strings.TrimRight(u, "/"), "/") > 2 {
|
||
s += (1 - s) * 0.1
|
||
}
|
||
if len(u) < 5 || u[4] == ':' {
|
||
s += (1 - s) * 0.3
|
||
}
|
||
return math.Min(s, 0.9)
|
||
}
|
||
|
||
// deduplicateSubstrings 对分词结果进行智能去重。
|
||
// 当词 A 是词 B 的子串时(A ≠ B),移除较短的 A。
|
||
// 例如 ["气象", "局", "气象局"] → ["气象局", "局"]
|
||
// 保留最长词以确保精确匹配优先,同时短词作为兜底召回。
|
||
func deduplicateSubstrings(tokens []string) []string {
|
||
if len(tokens) <= 1 {
|
||
return tokens
|
||
}
|
||
// 按长度降序排列,等长按字典序
|
||
sort.Slice(tokens, func(i, j int) bool {
|
||
if len(tokens[i]) != len(tokens[j]) {
|
||
return len(tokens[i]) > len(tokens[j])
|
||
}
|
||
return tokens[i] < tokens[j]
|
||
})
|
||
seen := make(map[string]bool)
|
||
var result []string
|
||
for _, t := range tokens {
|
||
if seen[t] {
|
||
continue // 完全重复的词跳过
|
||
}
|
||
seen[t] = true
|
||
// 检查是否已被更长的词包含(t 是某个已保留词的子串)
|
||
isSubstr := false
|
||
for _, kept := range result {
|
||
if strings.Contains(kept, t) && kept != t {
|
||
isSubstr = true
|
||
break
|
||
}
|
||
}
|
||
if !isSubstr {
|
||
result = append(result, t)
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
|
||
// netloc 从 URL 提取主机名。
|
||
func netloc(rawURL string) string {
|
||
parts := strings.SplitN(rawURL, "/", 4)
|
||
if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" {
|
||
return parts[2]
|
||
}
|
||
return rawURL
|
||
}
|
||
|
||
// matchSite 判断主机名是否匹配 site: 过滤模式(支持子域名)。
|
||
func matchSite(host, pattern string) bool {
|
||
if host == pattern {
|
||
return true
|
||
}
|
||
if strings.HasSuffix(host, "."+pattern) {
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// consecutiveCount 统计标题中连续词对出现的次数(用于连续词加成)。
|
||
func consecutiveCount(title string, tokens []string) int {
|
||
c := 0
|
||
for i := 0; i < len(tokens)-1; i++ {
|
||
if strings.Contains(title, tokens[i]+tokens[i+1]) {
|
||
c++
|
||
}
|
||
}
|
||
return c
|
||
}
|
||
|
||
// repetitionSimilarity 计算标题与前序所有标题的最大相似度(基于编辑距离)。
|
||
// 相似度 > 0.5 的结果将被降权。
|
||
func repetitionSimilarity(titles []string, idx int) float64 {
|
||
if idx == 0 {
|
||
return 0
|
||
}
|
||
t := titles[idx]
|
||
if t == "" {
|
||
return 0
|
||
}
|
||
best := 0.0
|
||
for _, prev := range titles[:idx] {
|
||
if prev == "" {
|
||
continue
|
||
}
|
||
sim := 1 - float64(levenshtein(t, prev))/float64(max(len(t), len(prev)))
|
||
if sim > best {
|
||
best = sim
|
||
}
|
||
}
|
||
return best
|
||
}
|
||
|
||
// levenshtein 计算两个字符串的编辑距离(动态规划)。
|
||
func levenshtein(a, b string) int {
|
||
ra := []rune(a)
|
||
rb := []rune(b)
|
||
la, lb := len(ra), len(rb)
|
||
if la == 0 {
|
||
return lb
|
||
}
|
||
if lb == 0 {
|
||
return la
|
||
}
|
||
prev := make([]int, lb+1)
|
||
curr := make([]int, lb+1)
|
||
for j := 0; j <= lb; j++ {
|
||
prev[j] = j
|
||
}
|
||
for i := 1; i <= la; i++ {
|
||
curr[0] = i
|
||
for j := 1; j <= lb; j++ {
|
||
cost := 1
|
||
if ra[i-1] == rb[j-1] {
|
||
cost = 0
|
||
}
|
||
curr[j] = min3(curr[j-1]+1, prev[j]+1, prev[j-1]+cost)
|
||
}
|
||
prev, curr = curr, prev
|
||
}
|
||
return prev[lb]
|
||
}
|
||
|
||
func min3(a, b, c int) int {
|
||
if a < b {
|
||
if a < c {
|
||
return a
|
||
}
|
||
return c
|
||
}
|
||
if b < c {
|
||
return b
|
||
}
|
||
return c
|
||
}
|
||
|
||
// ---- 域名交错重排 ----
|
||
|
||
// rerank 使用堆结构对候选结果按域名交错排列,使不同域名的 URL 交替出现。
|
||
// 每个域名的第二次出现分数乘以 1/8,第三次 1/64,以此类推,确保结果多样性。
|
||
type domainHeap []rerankItem
|
||
|
||
type rerankItem struct {
|
||
score float64
|
||
url string
|
||
domainMul float64 // 域名衰减倍数
|
||
vec [12]float64
|
||
}
|
||
|
||
func (h domainHeap) Len() int { return len(h) }
|
||
func (h domainHeap) Less(i, j int) bool { return h[i].score*h[i].domainMul > h[j].score*h[j].domainMul }
|
||
func (h domainHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||
func (h *domainHeap) Push(x interface{}) { *h = append(*h, x.(rerankItem)) }
|
||
func (h *domainHeap) Pop() interface{} {
|
||
old := *h
|
||
n := len(old)
|
||
x := old[n-1]
|
||
*h = old[:n-1]
|
||
return x
|
||
}
|
||
|
||
// candidate 是候选 URL 的内部表示。
|
||
type candidate struct {
|
||
url string
|
||
relevance float64
|
||
scoreVec [12]float64
|
||
}
|
||
|
||
// rerank 对候选列表进行域名交错重排,返回分页范围内的结果。
|
||
func rerank(candidates []candidate, from, to int) []candidate {
|
||
// 按域名分组
|
||
domainItems := make(map[string][]candidate)
|
||
for _, c := range candidates {
|
||
h := netloc(c.url)
|
||
domainItems[h] = append(domainItems[h], c)
|
||
}
|
||
|
||
// 每个域名的 URL 列表取最后一个(分数最高)放入堆,其余保留
|
||
h := &domainHeap{}
|
||
heap.Init(h)
|
||
domainMul := make(map[string]float64)
|
||
|
||
for domain, items := range domainItems {
|
||
domainMul[domain] = 1.0
|
||
sort.Slice(items, func(i, j int) bool {
|
||
return items[i].scoreVec[0] < items[j].scoreVec[0]
|
||
})
|
||
top := items[len(items)-1]
|
||
domainItems[domain] = items[:len(items)-1]
|
||
heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec})
|
||
}
|
||
|
||
// 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够 to 条,
|
||
// 然后截取 [from:to] 段返回。
|
||
var result []candidate
|
||
for len(result) < to {
|
||
if h.Len() == 0 {
|
||
// 堆为空时,将所有域名剩余条目依次推入堆(每域一条)
|
||
anyPushed := false
|
||
for domain, items := range domainItems {
|
||
if len(items) == 0 {
|
||
continue
|
||
}
|
||
next := items[len(items)-1]
|
||
domainItems[domain] = items[:len(items)-1]
|
||
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
|
||
anyPushed = true
|
||
}
|
||
if !anyPushed {
|
||
break // 所有域名都没有剩余条目,结束
|
||
}
|
||
}
|
||
item := heap.Pop(h).(rerankItem)
|
||
result = append(result, candidate{url: item.url, scoreVec: item.vec})
|
||
domain := netloc(item.url)
|
||
domainMul[domain] /= 8 // 该域名的下一次出现衰减到 1/8
|
||
remaining := domainItems[domain]
|
||
if len(remaining) > 0 {
|
||
next := remaining[len(remaining)-1]
|
||
domainItems[domain] = remaining[:len(remaining)-1]
|
||
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
|
||
}
|
||
}
|
||
// 截取分页段
|
||
if from >= len(result) {
|
||
return nil
|
||
}
|
||
return result[from:]
|
||
}
|
||
|
||
// ---- 杂项辅助函数 ----
|
||
|
||
// readBodyLimited 从 HTTP 响应体读取最多 limit 字节(用于限制在线摘要抓取大小)。
|
||
func readBodyLimited(resp *http.Response, limit int64) string {
|
||
data := make([]byte, 0, limit)
|
||
buf := make([]byte, 4096)
|
||
var total int64
|
||
for {
|
||
n, err := resp.Body.Read(buf)
|
||
if n > 0 {
|
||
data = append(data, buf[:n]...)
|
||
total += int64(n)
|
||
if total >= limit {
|
||
break
|
||
}
|
||
}
|
||
if err != nil {
|
||
break
|
||
}
|
||
}
|
||
return string(data)
|
||
}
|
||
|
||
// truncate 将字符串截断到最多 n 个字符。
|
||
func truncate(s string, n int) string {
|
||
if len(s) <= n {
|
||
return s
|
||
}
|
||
return s[:n]
|
||
}
|
||
|
||
// unescapeURL 对 URL 进行解码(%XX 转义)。
|
||
func unescapeURL(u string) string {
|
||
decoded, err := url.PathUnescape(u)
|
||
if err != nil {
|
||
return u
|
||
}
|
||
return decoded
|
||
}
|
||
|
||
// atoi 手写字符串转整数(不含负数和浮点)。
|
||
func atoi(s string) int {
|
||
n := 0
|
||
for _, c := range s {
|
||
if c < '0' || c > '9' {
|
||
return n
|
||
}
|
||
n = n*10 + int(c-'0')
|
||
}
|
||
return n
|
||
}
|
||
|
||
func max(a, b int) int {
|
||
if a > b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
func min(a, b int) int {
|
||
if a < b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
// ---- 以下为内嵌的收获服务(harvester)逻辑 ----
|
||
|
||
// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。
|
||
type ingestPayload struct {
|
||
URL string `json:"url"` // 被索引页面的最终 URL
|
||
Keywords []struct {
|
||
Word string `json:"word"` // 关键词
|
||
Weight float32 `json:"weight"` // 该 URL 在该词下的权重
|
||
} `json:"keywords"`
|
||
}
|
||
|
||
// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]]
|
||
type ingestPayloadLegacy []any
|
||
|
||
// parsePayload 解析爬虫请求体,兼容新旧两种格式。
|
||
// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]}
|
||
// 旧格式(Python 爬虫):["url", [["word", weight], ...]]
|
||
func parsePayload(r *http.Request) (*ingestPayload, error) {
|
||
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
// 先尝试新格式(Go 爬虫)
|
||
var modern ingestPayload
|
||
if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" {
|
||
return &modern, nil
|
||
}
|
||
// 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]]
|
||
var legacy ingestPayloadLegacy
|
||
if err := json.Unmarshal(body, &legacy); err != nil {
|
||
return nil, fmt.Errorf("invalid payload: %w", err)
|
||
}
|
||
if len(legacy) < 2 {
|
||
return nil, fmt.Errorf("invalid legacy payload: too few elements")
|
||
}
|
||
url, ok := legacy[0].(string)
|
||
if !ok {
|
||
return nil, fmt.Errorf("invalid url type")
|
||
}
|
||
kwsRaw, ok := legacy[1].([]any)
|
||
if !ok {
|
||
return nil, fmt.Errorf("invalid keywords type")
|
||
}
|
||
payload := &ingestPayload{URL: url}
|
||
for _, kw := range kwsRaw {
|
||
kwSlice, ok := kw.([]any)
|
||
if !ok || len(kwSlice) < 2 {
|
||
continue
|
||
}
|
||
word, _ := kwSlice[0].(string)
|
||
weight, _ := kwSlice[1].(float64)
|
||
if word == "" {
|
||
continue
|
||
}
|
||
payload.Keywords = append(payload.Keywords, struct {
|
||
Word string `json:"word"`
|
||
Weight float32 `json:"weight"`
|
||
}{word, float32(weight)})
|
||
}
|
||
if payload.URL == "" {
|
||
return nil, fmt.Errorf("empty url after parsing")
|
||
}
|
||
return payload, nil
|
||
}
|
||
|
||
// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。
|
||
func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
payload, err := parsePayload(r)
|
||
if err != nil {
|
||
http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest)
|
||
return
|
||
}
|
||
payload.URL = strings.ReplaceAll(payload.URL, "\n", "")
|
||
if payload.URL == "" {
|
||
http.Error(w, "empty url", http.StatusBadRequest)
|
||
return
|
||
}
|
||
s.memMu.Lock()
|
||
// 阶段 1:收集需要阈值检查的 key,快速释放锁
|
||
type keyCheck struct {
|
||
word string
|
||
weight float32
|
||
memLen int
|
||
entries []storage.IndexEntry
|
||
}
|
||
toCheck := make([]keyCheck, 0, len(payload.Keywords))
|
||
for _, kw := range payload.Keywords {
|
||
entries := s.mem[kw.Word]
|
||
if len(entries) > 15 {
|
||
toCheck = append(toCheck, keyCheck{kw.Word, kw.Weight, len(entries), entries})
|
||
} else {
|
||
s.mem[kw.Word] = append(entries, storage.IndexEntry{
|
||
Weight: kw.Weight,
|
||
URL: payload.URL,
|
||
})
|
||
atomic.AddInt64(&s.rowCount, 1)
|
||
}
|
||
}
|
||
s.memMu.Unlock()
|
||
|
||
// 阶段 2:锁外读 db 做阈值检查(避免持锁时做慢 I/O)
|
||
if len(toCheck) > 0 {
|
||
s.memMu.Lock()
|
||
for _, kc := range toCheck {
|
||
// 重新读取最新长度(可能被其他请求修改)
|
||
current := s.mem[kc.word]
|
||
if len(current) > kc.memLen {
|
||
// 条目被其他请求增加了,重新检查
|
||
low := s.lowThreshold(kc.word)
|
||
if float64(kc.weight) < low {
|
||
continue
|
||
}
|
||
}
|
||
low := s.lowThreshold(kc.word)
|
||
if float64(kc.weight) < low {
|
||
continue
|
||
}
|
||
s.mem[kc.word] = append(s.mem[kc.word], storage.IndexEntry{
|
||
Weight: kc.weight,
|
||
URL: payload.URL,
|
||
})
|
||
atomic.AddInt64(&s.rowCount, 1)
|
||
}
|
||
s.memMu.Unlock()
|
||
}
|
||
if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold()) {
|
||
go s.Flush()
|
||
}
|
||
w.Write([]byte("ok"))
|
||
}
|
||
|
||
// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。
|
||
func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) {
|
||
s.Flush()
|
||
w.Write([]byte("flushed"))
|
||
}
|
||
|
||
// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。
|
||
func (s *Server) lowThreshold(key string) float64 {
|
||
existing := s.getCachedIndex(key)
|
||
maxURLsPerKey := config.MaxURLsPerKey()
|
||
if len(existing) < maxURLsPerKey {
|
||
return -1
|
||
}
|
||
weights := make([]float64, len(existing))
|
||
for i, e := range existing {
|
||
weights[i] = float64(e.Weight)
|
||
}
|
||
return nthLargest(weights, maxURLsPerKey-1) * 0.05
|
||
}
|
||
|
||
// flush 将内存中的索引批量合并写入磁盘,然后清空内存。
|
||
func (s *Server) flush() {
|
||
if !s.flushMu.TryLock() {
|
||
return
|
||
}
|
||
defer func() {
|
||
s.flushMu.Unlock()
|
||
// 清除索引读缓存
|
||
s.indexCacheMu.Lock()
|
||
s.indexCache = nil
|
||
s.indexCacheMu.Unlock()
|
||
}()
|
||
s.memMu.Lock()
|
||
snapshot := s.mem
|
||
s.mem = make(map[string][]storage.IndexEntry)
|
||
atomic.StoreInt64(&s.rowCount, 0)
|
||
s.memMu.Unlock()
|
||
totalKeys := len(snapshot)
|
||
log.Printf("[harvester] flushing %d keys", totalKeys)
|
||
|
||
// 预热索引读缓存:一次 ForEachIndex 读取全部索引,避免 mergeKey 逐个读事务
|
||
indexCache := make(map[string][]storage.IndexEntry, totalKeys)
|
||
s.db.ForEachIndex(func(keyword string, entries []storage.IndexEntry) error {
|
||
indexCache[keyword] = entries
|
||
return nil
|
||
})
|
||
s.indexCacheMu.Lock()
|
||
s.indexCache = indexCache
|
||
s.indexCacheMu.Unlock()
|
||
log.Printf("[harvester] index cache warmed: %d keys loaded", len(indexCache))
|
||
items := make([]struct {
|
||
key string
|
||
entries []storage.IndexEntry
|
||
}, 0, totalKeys)
|
||
for k, v := range snapshot {
|
||
items = append(items, struct {
|
||
key string
|
||
entries []storage.IndexEntry
|
||
}{k, v})
|
||
}
|
||
rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] })
|
||
type result struct {
|
||
key string
|
||
entries []storage.IndexEntry
|
||
}
|
||
results := make(chan result, len(items))
|
||
sem := make(chan struct{}, 8)
|
||
processed := int64(0)
|
||
progressInterval := 1000
|
||
if totalKeys < 10000 {
|
||
progressInterval = totalKeys / 10
|
||
}
|
||
if progressInterval < 1 {
|
||
progressInterval = 1
|
||
}
|
||
for _, item := range items {
|
||
sem <- struct{}{}
|
||
go func(k string, newEntries []storage.IndexEntry) {
|
||
defer func() { <-sem }()
|
||
merged := s.mergeKey(k, newEntries)
|
||
results <- result{k, merged}
|
||
}(item.key, item.entries)
|
||
}
|
||
batch := make(map[string][]storage.IndexEntry, len(items))
|
||
for range items {
|
||
r := <-results
|
||
batch[r.key] = r.entries
|
||
current := atomic.AddInt64(&processed, 1)
|
||
if int(current)%progressInterval == 0 || int(current) == totalKeys {
|
||
percent := float64(current) * 100 / float64(totalKeys)
|
||
log.Printf("[harvester] flush progress: %d/%d (%.1f%%)", current, totalKeys, percent)
|
||
}
|
||
}
|
||
if err := s.db.BatchSetIndex(batch); err != nil {
|
||
log.Printf("[harvester] flush write error: %v", err)
|
||
}
|
||
log.Printf("[harvester] flush done, %d keys written", len(batch))
|
||
}
|
||
|
||
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。
|
||
func (s *Server) getCachedIndex(key string) []storage.IndexEntry {
|
||
s.indexCacheMu.RLock()
|
||
if s.indexCache != nil {
|
||
if entries, ok := s.indexCache[key]; ok {
|
||
s.indexCacheMu.RUnlock()
|
||
atomic.AddInt64(&s.indexCacheHits, 1)
|
||
return entries
|
||
}
|
||
}
|
||
s.indexCacheMu.RUnlock()
|
||
entries, _ := s.db.GetIndex(key)
|
||
return entries
|
||
}
|
||
|
||
// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。
|
||
func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry {
|
||
existing := s.getCachedIndex(key)
|
||
if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey() {
|
||
return nil
|
||
}
|
||
merged := dedup(append(newEntries, existing...))
|
||
if rand.Float64() < 0.02 {
|
||
merged = dedupNormalised(merged)
|
||
}
|
||
maxURLsPerKey := config.MaxURLsPerKey()
|
||
if float64(len(merged)) > float64(maxURLsPerKey)*1.1 || rand.Float64() < 0.02 {
|
||
merged = trim(merged, s.infoSvc, maxURLsPerKey, config.MaxSameDomainPerKey())
|
||
}
|
||
return merged
|
||
}
|
||
|
||
// ---- 收获服务辅助函数 ----
|
||
|
||
func dedup(entries []storage.IndexEntry) []storage.IndexEntry {
|
||
seen := make(map[string]bool, len(entries))
|
||
out := make([]storage.IndexEntry, 0, len(entries))
|
||
for _, e := range entries {
|
||
if seen[e.URL] {
|
||
continue
|
||
}
|
||
seen[e.URL] = true
|
||
out = append(out, e)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry {
|
||
sorted := make([]storage.IndexEntry, len(entries))
|
||
copy(sorted, entries)
|
||
for i := 0; i < len(sorted)-1; i++ {
|
||
for j := i + 1; j < len(sorted); j++ {
|
||
if len(sorted[j].URL) > len(sorted[i].URL) {
|
||
sorted[i], sorted[j] = sorted[j], sorted[i]
|
||
}
|
||
}
|
||
}
|
||
seen := make(map[string]bool)
|
||
out := make([]storage.IndexEntry, 0, len(sorted))
|
||
for _, e := range sorted {
|
||
k := normaliseURL(e.URL)
|
||
if seen[k] {
|
||
continue
|
||
}
|
||
seen[k] = true
|
||
out = append(out, e)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func normaliseURL(u string) string {
|
||
if strings.HasPrefix(u, "https://") {
|
||
u = u[8:]
|
||
} else if strings.HasPrefix(u, "http://") {
|
||
u = u[7:]
|
||
}
|
||
return strings.TrimRight(u, "/")
|
||
}
|
||
|
||
func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry {
|
||
scored := make([]storage.IndexEntry, len(entries))
|
||
copy(scored, entries)
|
||
for i := 0; i < len(scored)-1; i++ {
|
||
for j := i + 1; j < len(scored); j++ {
|
||
si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL))
|
||
sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL))
|
||
if sj > si {
|
||
scored[i], scored[j] = scored[j], scored[i]
|
||
}
|
||
}
|
||
}
|
||
domainCount := make(map[string]int)
|
||
out := make([]storage.IndexEntry, 0, limit)
|
||
for _, e := range scored {
|
||
host := netloc(e.URL)
|
||
if host == "" {
|
||
host = e.URL
|
||
}
|
||
host = strings.ToLower(host)
|
||
isHome := isHomepage(e.URL)
|
||
if !isHome && domainCount[host] >= sameDomainLimit {
|
||
continue
|
||
}
|
||
domainCount[host]++
|
||
out = append(out, e)
|
||
if len(out) >= limit {
|
||
break
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func isHomepage(u string) bool {
|
||
u = strings.TrimPrefix(u, "https://")
|
||
u = strings.TrimPrefix(u, "http://")
|
||
return strings.Count(strings.TrimRight(u, "/"), "/") == 0
|
||
}
|
||
|
||
func nthLargest(values []float64, n int) float64 {
|
||
if n >= len(values) {
|
||
return 0
|
||
}
|
||
cp := make([]float64, len(values))
|
||
copy(cp, values)
|
||
for i := 0; i <= n; i++ {
|
||
maxIdx := i
|
||
for j := i + 1; j < len(cp); j++ {
|
||
if cp[j] > cp[maxIdx] {
|
||
maxIdx = j
|
||
}
|
||
}
|
||
cp[i], cp[maxIdx] = cp[maxIdx], cp[i]
|
||
}
|
||
return cp[n]
|
||
}
|