diff --git a/crawler/crawler.go b/crawler/crawler.go index df7c3d2..3aea8c0 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -4,6 +4,7 @@ package crawler import ( "bytes" // 字节缓冲(构造 HTTP POST 请求体) + "context" // context 超时控制 "encoding/json" // JSON 序列化(发送关键词数据到 harvester) "log" // 日志输出 "math" // 数学运算(指数衰减、质量评分) @@ -28,13 +29,30 @@ type Stats struct { KeywordsFetched int64 // 累计提取的关键词总数 } +// 熔断器状态(用 atomic int32 代替 mutex,避免持有锁时的慢 I/O)。 +const ( + circuitClosed int32 = iota // 正常:所有请求都发往 harvester + circuitOpen // 断开:连续失败 N 次后,冷却时间内跳过所有请求 + circuitHalfOpen // 半开:冷却结束,尝试放行一次请求试探 +) + +const ( + circuitFailureThreshold = 5 // 连续失败多少次后触发熔断 + circuitCooldownSeconds = 30 // 熔断持续时间(秒) +) + // Crawler 编排整个 BFS 爬取流程。 type Crawler struct { - fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流) - db *storage.DB // 持久化数据库 - analyzer *analyzer.Analyzer // 分词和关键词分析 - prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值") - stats Stats // 原子计数器 + fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流) + db *storage.DB // 持久化数据库 + analyzer *analyzer.Analyzer // 分词和关键词分析 + prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值") + stats Stats // 原子计数器 + + // 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险) + circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen + circuitFailures int32 // 连续失败计数(atomic) + circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒) } // New 创建一个 Crawler 实例。 @@ -269,7 +287,32 @@ func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, } // sendToHarvester 将关键词索引数据通过 HTTP POST 发送到收获服务器(:5000/l 端点)。 +// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因 harvester 故障而堆积。 func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { + now := time.Now().Unix() + + // ---- 熔断检查(atomic,无锁) ---- + state := atomic.LoadInt32(&c.circuitState) + expiry := atomic.LoadInt64(&c.circuitExpiry) + + switch state { + case circuitOpen: + if now < expiry { + return // 熔断中,直接跳过 + } + // 冷却结束,切换到半开,放行一个试探请求 + atomic.StoreInt32(&c.circuitState, circuitHalfOpen) + atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) + log.Println("[crawler] circuit: half-open, probing harvester") + case circuitHalfOpen: + if now < expiry { + return // 半开冷却中,只放行第一个,其余跳过 + } + // 半开超时,重新进入半开状态 + atomic.StoreInt32(&c.circuitState, circuitHalfOpen) + atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) + } + type payload struct { URL string `json:"url"` Keywords []analyzer.Keyword `json:"keywords"` @@ -279,12 +322,34 @@ func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { if err != nil { return } - resp, err := http.Post(config.HarvesterAddr+"/l", "application/json", bytes.NewReader(data)) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", config.HarvesterAddr+"/l", bytes.NewReader(data)) if err != nil { - log.Printf("[crawler] harvester post failed: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + + // ---- HTTP 请求(此时没有任何锁) ---- + resp, err := http.DefaultClient.Do(req) + + // ---- 结果处理(atomic,无锁) ---- + if err != nil { + failures := atomic.AddInt32(&c.circuitFailures, 1) + if failures >= circuitFailureThreshold { + atomic.StoreInt32(&c.circuitState, circuitOpen) + atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) + log.Printf("[crawler] circuit OPEN: harvester unreachable (%d failures), cooling for %ds", + failures, circuitCooldownSeconds) + } return } resp.Body.Close() + + // ---- 成功:重置熔断器 ---- + atomic.StoreInt32(&c.circuitFailures, 0) + atomic.StoreInt32(&c.circuitState, circuitClosed) } // schedule 从候选 URL 集合中选出下一轮 BFS 队列。 diff --git a/search/server.go b/search/server.go index 7fcf9f2..70e3f74 100644 --- a/search/server.go +++ b/search/server.go @@ -11,6 +11,7 @@ import ( "net/url" // URL 解析 "regexp" // 正则表达式(site: 过滤语法) "sort" // 排序 + "strconv" // 字符串转整数 "strings" // 字符串操作 "sync" // 互斥锁(保护并发切片写入) "time" // 时间戳 @@ -46,6 +47,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/search", s.handleSearch) + mux.HandleFunc("/admin/recent", s.handleAdminRecent) + mux.HandleFunc("/admin/stats", s.handleAdminStats) return mux } @@ -55,6 +58,149 @@ func (s *Server) ListenAndServe(addr string) error { return http.ListenAndServe(addr, s.Handler()) } +// ---- Admin 接口 ---- + +// recentItem 是 /admin/recent 接口返回的单条记录。 +type recentItem struct { + URL string `json:"url"` + Title string `json:"title"` + Description string `json:"description"` + Domain string `json:"domain"` + Language map[string]float64 `json:"language"` + WordCount int `json:"word_count"` + CrawledAt int64 `json:"crawled_at"` +} + +// handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。 +// 参数:limit(默认50,最大200)。 +func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + limit := 50 + if l := r.URL.Query().Get("limit"); l != "" { + if v, err := strconv.Atoi(l); err == nil && v > 0 { + limit = v + } + } + if limit > 200 { + limit = 200 + } + + type entry struct { + url string + snippet *storage.SnippetEntry + siteInfo *storage.SiteInfo + } + + var items []entry + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { + siteInfo, _ := s.db.GetSiteInfo(netloc(url)) + items = append(items, entry{url, snippet, siteInfo}) + return nil + }) + + // 按时间倒序 + sort.Slice(items, func(i, j int) bool { + return items[i].snippet.Timestamp > items[j].snippet.Timestamp + }) + + if len(items) > limit { + items = items[:limit] + } + + result := make([]recentItem, 0, len(items)) + for _, e := range items { + lang := e.siteInfo.Languages + if lang == nil { + lang = make(map[string]float64) + } + desc := e.snippet.Description + if len(desc) > 200 { + desc = desc[:200] + } + result = append(result, recentItem{ + URL: e.url, + Title: e.snippet.Title, + Description: desc, + Domain: netloc(e.url), + Language: lang, + WordCount: len(e.snippet.Text), + CrawledAt: e.snippet.Timestamp, + }) + } + + resp := map[string]any{ + "items": result, + "total": len(items), + } + json.NewEncoder(w).Encode(resp) +} + +// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 +func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + domainCount := make(map[string]int) + langCount := make(map[string]int) + totalWords := 0 + total := 0 + + s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { + total++ + domain := netloc(url) + domainCount[domain]++ + totalWords += len(snippet.Text) + + siteInfo, _ := s.db.GetSiteInfo(domain) + if siteInfo != nil { + for lang, ratio := range siteInfo.Languages { + if ratio > 0.1 { + langCount[lang]++ + } + } + } + return nil + }) + + // 排序取 Top + type kv struct{ k string; v int } + topDomains := make([]kv, 0, len(domainCount)) + for k, v := range domainCount { + topDomains = append(topDomains, kv{k, v}) + } + sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v }) + if len(topDomains) > 20 { + topDomains = topDomains[:20] + } + topLangs := make([]kv, 0, len(langCount)) + for k, v := range langCount { + topLangs = append(topLangs, kv{k, v}) + } + sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v }) + if len(topLangs) > 10 { + topLangs = topLangs[:10] + } + + domainsMap := make(map[string]int) + for _, kv := range topDomains { + domainsMap[kv.k] = kv.v + } + langsMap := make(map[string]int) + for _, kv := range topLangs { + langsMap[kv.k] = kv.v + } + + resp := map[string]any{ + "total_urls": total, + "total_words": totalWords, + "domains": domainsMap, + "languages": langsMap, + } + json.NewEncoder(w).Encode(resp) +} + // ---- 搜索处理器 ---- // searchResponse 是搜索 API 的 JSON 响应结构。 diff --git a/storage/storage.go b/storage/storage.go index 270c207..edebe58 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -315,3 +315,16 @@ func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error { }) }) } + +// ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。 +func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error { + return d.db.View(func(tx *bolt.Tx) error { + return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error { + var entry SnippetEntry + if err := decompressUnmarshal(v, &entry); err != nil { + return nil // 跳过损坏条目 + } + return fn(string(k), &entry) + }) + }) +}