修复一个卡死问题

This commit is contained in:
2026-04-08 18:44:51 +08:00
parent c154abf410
commit 1d3570a505
3 changed files with 231 additions and 7 deletions
+72 -7
View File
@@ -4,6 +4,7 @@ package crawler
import (
"bytes" // 字节缓冲(构造 HTTP POST 请求体)
"context" // context 超时控制
"encoding/json" // JSON 序列化(发送关键词数据到 harvester)
"log" // 日志输出
"math" // 数学运算(指数衰减、质量评分)
@@ -28,13 +29,30 @@ type Stats struct {
KeywordsFetched int64 // 累计提取的关键词总数
}
// 熔断器状态(用 atomic int32 代替 mutex,避免持有锁时的慢 I/O)。
const (
circuitClosed int32 = iota // 正常:所有请求都发往 harvester
circuitOpen // 断开:连续失败 N 次后,冷却时间内跳过所有请求
circuitHalfOpen // 半开:冷却结束,尝试放行一次请求试探
)
const (
circuitFailureThreshold = 5 // 连续失败多少次后触发熔断
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
)
// Crawler 编排整个 BFS 爬取流程。
type Crawler struct {
fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流)
db *storage.DB // 持久化数据库
analyzer *analyzer.Analyzer // 分词和关键词分析
prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值")
stats Stats // 原子计数器
fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流)
db *storage.DB // 持久化数据库
analyzer *analyzer.Analyzer // 分词和关键词分析
prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值")
stats Stats // 原子计数器
// 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险)
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
circuitFailures int32 // 连续失败计数(atomic
circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒)
}
// New 创建一个 Crawler 实例。
@@ -269,7 +287,32 @@ func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc,
}
// sendToHarvester 将关键词索引数据通过 HTTP POST 发送到收获服务器(:5000/l 端点)。
// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因 harvester 故障而堆积。
func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) {
now := time.Now().Unix()
// ---- 熔断检查(atomic,无锁) ----
state := atomic.LoadInt32(&c.circuitState)
expiry := atomic.LoadInt64(&c.circuitExpiry)
switch state {
case circuitOpen:
if now < expiry {
return // 熔断中,直接跳过
}
// 冷却结束,切换到半开,放行一个试探请求
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
log.Println("[crawler] circuit: half-open, probing harvester")
case circuitHalfOpen:
if now < expiry {
return // 半开冷却中,只放行第一个,其余跳过
}
// 半开超时,重新进入半开状态
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
}
type payload struct {
URL string `json:"url"`
Keywords []analyzer.Keyword `json:"keywords"`
@@ -279,12 +322,34 @@ func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) {
if err != nil {
return
}
resp, err := http.Post(config.HarvesterAddr+"/l", "application/json", bytes.NewReader(data))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", config.HarvesterAddr+"/l", bytes.NewReader(data))
if err != nil {
log.Printf("[crawler] harvester post failed: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
// ---- HTTP 请求(此时没有任何锁) ----
resp, err := http.DefaultClient.Do(req)
// ---- 结果处理(atomic,无锁) ----
if err != nil {
failures := atomic.AddInt32(&c.circuitFailures, 1)
if failures >= circuitFailureThreshold {
atomic.StoreInt32(&c.circuitState, circuitOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
log.Printf("[crawler] circuit OPEN: harvester unreachable (%d failures), cooling for %ds",
failures, circuitCooldownSeconds)
}
return
}
resp.Body.Close()
// ---- 成功:重置熔断器 ----
atomic.StoreInt32(&c.circuitFailures, 0)
atomic.StoreInt32(&c.circuitState, circuitClosed)
}
// schedule 从候选 URL 集合中选出下一轮 BFS 队列。
+146
View File
@@ -11,6 +11,7 @@ import (
"net/url" // URL 解析
"regexp" // 正则表达式(site: 过滤语法)
"sort" // 排序
"strconv" // 字符串转整数
"strings" // 字符串操作
"sync" // 互斥锁(保护并发切片写入)
"time" // 时间戳
@@ -46,6 +47,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/search", s.handleSearch)
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
mux.HandleFunc("/admin/stats", s.handleAdminStats)
return mux
}
@@ -55,6 +58,149 @@ func (s *Server) ListenAndServe(addr string) error {
return http.ListenAndServe(addr, s.Handler())
}
// ---- Admin 接口 ----
// recentItem 是 /admin/recent 接口返回的单条记录。
type recentItem struct {
URL string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Domain string `json:"domain"`
Language map[string]float64 `json:"language"`
WordCount int `json:"word_count"`
CrawledAt int64 `json:"crawled_at"`
}
// handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。
// 参数:limit(默认50,最大200)。
func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
limit := 50
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil && v > 0 {
limit = v
}
}
if limit > 200 {
limit = 200
}
type entry struct {
url string
snippet *storage.SnippetEntry
siteInfo *storage.SiteInfo
}
var items []entry
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
siteInfo, _ := s.db.GetSiteInfo(netloc(url))
items = append(items, entry{url, snippet, siteInfo})
return nil
})
// 按时间倒序
sort.Slice(items, func(i, j int) bool {
return items[i].snippet.Timestamp > items[j].snippet.Timestamp
})
if len(items) > limit {
items = items[:limit]
}
result := make([]recentItem, 0, len(items))
for _, e := range items {
lang := e.siteInfo.Languages
if lang == nil {
lang = make(map[string]float64)
}
desc := e.snippet.Description
if len(desc) > 200 {
desc = desc[:200]
}
result = append(result, recentItem{
URL: e.url,
Title: e.snippet.Title,
Description: desc,
Domain: netloc(e.url),
Language: lang,
WordCount: len(e.snippet.Text),
CrawledAt: e.snippet.Timestamp,
})
}
resp := map[string]any{
"items": result,
"total": len(items),
}
json.NewEncoder(w).Encode(resp)
}
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
domainCount := make(map[string]int)
langCount := make(map[string]int)
totalWords := 0
total := 0
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
total++
domain := netloc(url)
domainCount[domain]++
totalWords += len(snippet.Text)
siteInfo, _ := s.db.GetSiteInfo(domain)
if siteInfo != nil {
for lang, ratio := range siteInfo.Languages {
if ratio > 0.1 {
langCount[lang]++
}
}
}
return nil
})
// 排序取 Top
type kv struct{ k string; v int }
topDomains := make([]kv, 0, len(domainCount))
for k, v := range domainCount {
topDomains = append(topDomains, kv{k, v})
}
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
if len(topDomains) > 20 {
topDomains = topDomains[:20]
}
topLangs := make([]kv, 0, len(langCount))
for k, v := range langCount {
topLangs = append(topLangs, kv{k, v})
}
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
if len(topLangs) > 10 {
topLangs = topLangs[:10]
}
domainsMap := make(map[string]int)
for _, kv := range topDomains {
domainsMap[kv.k] = kv.v
}
langsMap := make(map[string]int)
for _, kv := range topLangs {
langsMap[kv.k] = kv.v
}
resp := map[string]any{
"total_urls": total,
"total_words": totalWords,
"domains": domainsMap,
"languages": langsMap,
}
json.NewEncoder(w).Encode(resp)
}
// ---- 搜索处理器 ----
// searchResponse 是搜索 API 的 JSON 响应结构。
+13
View File
@@ -315,3 +315,16 @@ func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error {
})
})
}
// ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。
func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error {
var entry SnippetEntry
if err := decompressUnmarshal(v, &entry); err != nil {
return nil // 跳过损坏条目
}
return fn(string(k), &entry)
})
})
}