fix 分页bug,加入手动刷盘
This commit is contained in:
+83
-4
@@ -7,6 +7,8 @@ package harvester
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json" // JSON 反序列化(解析爬虫请求)
|
"encoding/json" // JSON 反序列化(解析爬虫请求)
|
||||||
|
"fmt" // 错误格式化
|
||||||
|
"io" // 读取请求体
|
||||||
"log" // 日志输出
|
"log" // 日志输出
|
||||||
"math/rand" // 随机数(打乱合并顺序、触发概率性操作)
|
"math/rand" // 随机数(打乱合并顺序、触发概率性操作)
|
||||||
"net/http" // HTTP 服务端
|
"net/http" // HTTP 服务端
|
||||||
@@ -42,7 +44,7 @@ func New(db *storage.DB, infoSvc *info.Service) *Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ingestPayload 是爬虫发送的 JSON 请求体结构。
|
// ingestPayload 是爬虫发送的 JSON 请求体结构(Go 爬虫用)。
|
||||||
type ingestPayload struct {
|
type ingestPayload struct {
|
||||||
URL string `json:"url"` // 被索引页面的最终 URL
|
URL string `json:"url"` // 被索引页面的最终 URL
|
||||||
Keywords []struct {
|
Keywords []struct {
|
||||||
@@ -51,22 +53,99 @@ type ingestPayload struct {
|
|||||||
} `json:"keywords"`
|
} `json:"keywords"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ingestPayloadLegacy 是 Python 爬虫发送的旧格式:[url, [[word, weight], ...]]
|
||||||
|
// 兼容处理:Python 端发的是数组,Go 期望的是对象。
|
||||||
|
type ingestPayloadLegacy []any
|
||||||
|
|
||||||
// Handler 返回 HTTP 路由处理器。
|
// Handler 返回 HTTP 路由处理器。
|
||||||
func (s *Server) Handler() http.Handler {
|
func (s *Server) Handler() http.Handler {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/l", s.handleIngest) // /l 端点:接收爬虫数据
|
mux.HandleFunc("/l", s.handleIngest) // /l 端点:接收爬虫数据
|
||||||
|
mux.HandleFunc("/flush", s.handleFlush) // /flush:强制刷盘(用于手动触发或调试)
|
||||||
|
mux.HandleFunc("/admin/pending", s.handleAdminPending) // /admin/pending:返回未刷盘数据条数
|
||||||
return mux
|
return mux
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleAdminPending 返回内存中未刷盘的索引条目数量。
|
||||||
|
func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
count := atomic.LoadInt64(&s.rowCount)
|
||||||
|
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleFlush 处理 GET /flush 请求,强制将内存索引刷到磁盘。
|
||||||
|
func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.flush()
|
||||||
|
w.Write([]byte("flushed"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush 公开的刷盘方法,供外部调用(定时刷盘、退出前刷盘)。
|
||||||
|
func (s *Server) Flush() { s.flush() }
|
||||||
|
|
||||||
|
// parsePayload 解析爬虫请求体,兼容新旧两种格式。
|
||||||
|
// 新格式(Go 爬虫):{"url": "...", "keywords": [{"word": "...", "weight": 0.0}]}
|
||||||
|
// 旧格式(Python 爬虫):["url", [["word", weight], ...]]
|
||||||
|
func parsePayload(r *http.Request) (*ingestPayload, error) {
|
||||||
|
// 读取 body 为 bytes(支持重读)
|
||||||
|
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 先尝试新格式(Go 爬虫)
|
||||||
|
var modern ingestPayload
|
||||||
|
if err := json.Unmarshal(body, &modern); err == nil && modern.URL != "" {
|
||||||
|
return &modern, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试旧格式(Python 爬虫):[url, [[word, weight], ...]]
|
||||||
|
var legacy ingestPayloadLegacy
|
||||||
|
if err := json.Unmarshal(body, &legacy); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid payload: %w", err)
|
||||||
|
}
|
||||||
|
if len(legacy) < 2 {
|
||||||
|
return nil, fmt.Errorf("invalid legacy payload: too few elements")
|
||||||
|
}
|
||||||
|
url, ok := legacy[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid url type")
|
||||||
|
}
|
||||||
|
kwsRaw, ok := legacy[1].([]any)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid keywords type")
|
||||||
|
}
|
||||||
|
payload := &ingestPayload{URL: url}
|
||||||
|
for _, kw := range kwsRaw {
|
||||||
|
kwSlice, ok := kw.([]any)
|
||||||
|
if !ok || len(kwSlice) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
word, _ := kwSlice[0].(string)
|
||||||
|
weight, _ := kwSlice[1].(float64)
|
||||||
|
if word == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
payload.Keywords = append(payload.Keywords, struct {
|
||||||
|
Word string `json:"word"`
|
||||||
|
Weight float32 `json:"weight"`
|
||||||
|
}{word, float32(weight)})
|
||||||
|
}
|
||||||
|
if payload.URL == "" {
|
||||||
|
return nil, fmt.Errorf("empty url after parsing")
|
||||||
|
}
|
||||||
|
return payload, nil
|
||||||
|
}
|
||||||
|
|
||||||
// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。
|
// handleIngest 处理爬虫发来的 POST 请求,将关键词数据写入内存索引。
|
||||||
func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var payload ingestPayload
|
payload, err := parsePayload(r)
|
||||||
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
if err != nil {
|
||||||
http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest)
|
http.Error(w, "bad payload: "+err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"os" // 操作系统信号
|
"os" // 操作系统信号
|
||||||
"os/signal" // 信号捕获
|
"os/signal" // 信号捕获
|
||||||
"syscall" // 系统调用(SIGTERM)
|
"syscall" // 系统调用(SIGTERM)
|
||||||
|
"time" // 定期刷盘定时器
|
||||||
|
|
||||||
"sese-engine/analyzer" // 文本分析和关键词提取
|
"sese-engine/analyzer" // 文本分析和关键词提取
|
||||||
"sese-engine/backlink" // 反向链接(繁荣值)计算
|
"sese-engine/backlink" // 反向链接(繁荣值)计算
|
||||||
@@ -67,8 +68,17 @@ func main() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// ---- 4b. 定期刷盘:每 60 秒强制将内存索引写入磁盘,确保搜索实时可用 ----
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(60 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
harvSrv.Flush()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// ---- 5. 搜索服务器(默认 :80):对外提供搜索 API ----
|
// ---- 5. 搜索服务器(默认 :80):对外提供搜索 API ----
|
||||||
searchSrv := search.New(db, infoSvc, anal)
|
searchSrv := search.New(db, infoSvc, anal, "http://localhost:5000")
|
||||||
go func() {
|
go func() {
|
||||||
addr := fmt.Sprintf(":%d", config.SearchServerPort)
|
addr := fmt.Sprintf(":%d", config.SearchServerPort)
|
||||||
if err := searchSrv.ListenAndServe(addr); err != nil {
|
if err := searchSrv.ListenAndServe(addr); err != nil {
|
||||||
@@ -93,5 +103,6 @@ func main() {
|
|||||||
quit := make(chan os.Signal, 1)
|
quit := make(chan os.Signal, 1)
|
||||||
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
|
||||||
<-quit
|
<-quit
|
||||||
log.Println("shutdown signal received, exiting...")
|
log.Println("shutdown signal received, flushing index...")
|
||||||
|
harvSrv.Flush() // 退出前刷盘,不丢数据
|
||||||
}
|
}
|
||||||
|
|||||||
+71
-7
@@ -5,6 +5,7 @@ package search
|
|||||||
import (
|
import (
|
||||||
"container/heap" // 堆结构(域名交错排序)
|
"container/heap" // 堆结构(域名交错排序)
|
||||||
"encoding/json" // JSON 序列化(响应输出)
|
"encoding/json" // JSON 序列化(响应输出)
|
||||||
|
"io" // 代理响应复制
|
||||||
"log" // 日志
|
"log" // 日志
|
||||||
"math" // 数学运算(Log、幂)
|
"math" // 数学运算(Log、幂)
|
||||||
"net/http" // HTTP 服务端
|
"net/http" // HTTP 服务端
|
||||||
@@ -29,14 +30,17 @@ type Server struct {
|
|||||||
infoSvc *info.Service
|
infoSvc *info.Service
|
||||||
analyzer *analyzer.Analyzer
|
analyzer *analyzer.Analyzer
|
||||||
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
|
httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查)
|
||||||
|
harvesterURL string // 收获服务器地址(如 "http://localhost:5000")
|
||||||
}
|
}
|
||||||
|
|
||||||
// New 创建一个 search Server。
|
// New 创建一个 search Server。
|
||||||
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
|
// harvesterURL 为收获服务器的地址,用于代理刷盘和状态查询。
|
||||||
|
func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer, harvesterURL string) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
db: db,
|
db: db,
|
||||||
infoSvc: infoSvc,
|
infoSvc: infoSvc,
|
||||||
analyzer: a,
|
analyzer: a,
|
||||||
|
harvesterURL: harvesterURL,
|
||||||
httpCli: &http.Client{
|
httpCli: &http.Client{
|
||||||
Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second,
|
Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second,
|
||||||
},
|
},
|
||||||
@@ -50,6 +54,7 @@ func (s *Server) Handler() http.Handler {
|
|||||||
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
|
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
|
||||||
mux.HandleFunc("/admin/stats", s.handleAdminStats)
|
mux.HandleFunc("/admin/stats", s.handleAdminStats)
|
||||||
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
|
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
|
||||||
|
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
|
||||||
return mux
|
return mux
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,6 +204,19 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
|||||||
"domains": domainsMap,
|
"domains": domainsMap,
|
||||||
"languages": langsMap,
|
"languages": langsMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 从 harvester 代理获取未刷盘数据条数
|
||||||
|
if s.harvesterURL != "" {
|
||||||
|
req, _ := http.NewRequest(http.MethodGet, s.harvesterURL+"/admin/pending", nil)
|
||||||
|
if proxyResp, err := s.httpCli.Do(req); err == nil {
|
||||||
|
defer proxyResp.Body.Close()
|
||||||
|
var pendingResp map[string]int64
|
||||||
|
if err := json.NewDecoder(proxyResp.Body).Decode(&pendingResp); err == nil {
|
||||||
|
resp["pending"] = pendingResp["pending"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(resp)
|
json.NewEncoder(w).Encode(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -282,6 +300,33 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleAdminFlush 代理到 harvester 的 /flush 接口,执行刷盘。
|
||||||
|
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
|
||||||
|
if r.Method != http.MethodGet && r.Method != http.MethodPost {
|
||||||
|
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 代理请求到 harvester
|
||||||
|
proxyURL := s.harvesterURL + "/flush"
|
||||||
|
req, err := http.NewRequest(http.MethodGet, proxyURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp, err := s.httpCli.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, `{"error":"harvester unreachable: `+err.Error()+`"}`, 502)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
w.WriteHeader(resp.StatusCode)
|
||||||
|
io.Copy(w, resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
// ---- 搜索处理器 ----
|
// ---- 搜索处理器 ----
|
||||||
|
|
||||||
// searchResponse 是搜索 API 的 JSON 响应结构。
|
// searchResponse 是搜索 API 的 JSON 响应结构。
|
||||||
@@ -874,13 +919,28 @@ func rerank(candidates []candidate, from, to int) []candidate {
|
|||||||
heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec})
|
heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec})
|
||||||
}
|
}
|
||||||
|
|
||||||
// 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够
|
// 从堆中依次弹出得分最高的条目(受域名衰减影响),直到取够 to 条,
|
||||||
|
// 然后截取 [from:to] 段返回。
|
||||||
var result []candidate
|
var result []candidate
|
||||||
for h.Len() > 0 && len(result) < to {
|
for len(result) < to {
|
||||||
item := heap.Pop(h).(rerankItem)
|
if h.Len() == 0 {
|
||||||
if len(result) >= from {
|
// 堆为空时,将所有域名剩余条目依次推入堆(每域一条)
|
||||||
result = append(result, candidate{url: item.url, scoreVec: item.vec})
|
anyPushed := false
|
||||||
|
for domain, items := range domainItems {
|
||||||
|
if len(items) == 0 {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
next := items[len(items)-1]
|
||||||
|
domainItems[domain] = items[:len(items)-1]
|
||||||
|
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
|
||||||
|
anyPushed = true
|
||||||
|
}
|
||||||
|
if !anyPushed {
|
||||||
|
break // 所有域名都没有剩余条目,结束
|
||||||
|
}
|
||||||
|
}
|
||||||
|
item := heap.Pop(h).(rerankItem)
|
||||||
|
result = append(result, candidate{url: item.url, scoreVec: item.vec})
|
||||||
domain := netloc(item.url)
|
domain := netloc(item.url)
|
||||||
domainMul[domain] /= 8 // 该域名的下一次出现衰减到 1/8
|
domainMul[domain] /= 8 // 该域名的下一次出现衰减到 1/8
|
||||||
remaining := domainItems[domain]
|
remaining := domainItems[domain]
|
||||||
@@ -890,7 +950,11 @@ func rerank(candidates []candidate, from, to int) []candidate {
|
|||||||
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
|
heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result
|
// 截取分页段
|
||||||
|
if from >= len(result) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return result[from:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- 杂项辅助函数 ----
|
// ---- 杂项辅助函数 ----
|
||||||
|
|||||||
Reference in New Issue
Block a user