Files
sese-engine-go/crawler/crawler.go
T
2026-04-10 14:28:04 +08:00

1072 lines
34 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// crawler.go — BFS crawl loop, URL scheduling, and site-info updating.
// crawler 包的主逻辑:BFS 爬取循环、URL 调度算法、网站元信息更新。
package crawler
import (
"bytes" // 字节缓冲(构造 HTTP POST 请求体)
"context" // context 超时控制
"encoding/json" // JSON 序列化(发送关键词数据到收获服务)
"fmt" // 格式化(构造目标地址)
"hash/fnv" // FNV 哈希(内容变化检测)
"log" // 日志输出
"math" // 数学运算(指数衰减、质量评分)
"math/rand" // 随机数(加权采样、队列打乱)
"net/http" // HTTP 客户端(POST 数据到收获服务)
"net/url" // URL 解析
"strings" // 字符串操作
"sync" // 互斥锁(保护并发收集结果)
"sync/atomic" // 原子操作(计数器,无锁并发更新)
"time" // 时间戳
"sese-engine/analyzer" // 文本分析和关键词提取
"sese-engine/config" // 全局配置常量
"sese-engine/parser" // HTML 解析(提取标题、正文、链接)
"sese-engine/storage" // 持久化存储
)
// Stats 存放爬虫实时统计计数器(使用 atomic 原子读取)。
type Stats struct {
VisitedURLs int64 // 已访问的 URL 总数(含失败)
SuccessURLs int64 // 成功抓取(HTTP 200)的 URL 数
KeywordsFetched int64 // 累计提取的关键词总数
}
// 熔断器状态(用 atomic int32 代替 mutex,避免持有锁时的慢 I/O)。
const (
circuitClosed int32 = iota // 正常:所有请求都发往 harvester
circuitOpen // 断开:连续失败 N 次后,冷却时间内跳过所有请求
circuitHalfOpen // 半开:冷却结束,尝试放行一次请求试探
)
const (
circuitFailureThreshold = 5 // 连续失败多少次后触发熔断
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
)
// Priority Worker 配置
const (
priorityMaxWorkers = 50 // Priority 独立 goroutine 上限(突破主 workers
priorityQueueSize = 100 // Priority 任务队列缓冲大小
)
// Crawler 编排整个 BFS 爬取流程。
type Crawler struct {
fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流)
db *storage.DB // 持久化数据库
analyzer *analyzer.Analyzer // 分词和关键词分析
prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值")
stats Stats // 原子计数器
// visited 记录已访问的 URL 集合(跨 epoch 持久,启动时从 DB 预热)
visited map[string]bool
visitedMu sync.RWMutex // 保护 visited 的并发读写
// 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险)
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
circuitFailures int32 // 连续失败计数(atomic
circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒)
// 运行时活跃线程计数(atomic,每轮 epoch 自动归零前重新开始计数)
activeWorkers int64
// ---- Priority Worker(独立 goroutine,不受主 workers 限制)----
priorityCh chan string // Priority URL 任务队列
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats
priorityStats struct {
pending int64 // 待处理的 Priority URL 数量(入队但未开始)
active int64 // 正在处理的 Priority URL 数量
}
// ---- Priority 子链接优先队列(来自 priority worker 的子链接会优先爬取)----
priorityChildrenMu sync.Mutex
priorityChildren []string // Priority URL 产生的子链接(优先处理)
}
// 全局活跃线程计数器(跨包可读,无需持有 Crawler 引用)
var globalActiveWorkers int64
// ActiveWorkers 返回当前正在运行的爬虫 goroutine 数量。
// 也可通过包级函数 GlobalActiveWorkers() 读取(供 search 等外部包使用)。
func (c *Crawler) ActiveWorkers() int64 {
return atomic.LoadInt64(&c.activeWorkers)
}
// GlobalActiveWorkers 返回当前全局活跃爬虫 goroutine 数量(包级,外部包可直接调用)。
func GlobalActiveWorkers() int64 {
return atomic.LoadInt64(&globalActiveWorkers)
}
// 全局 Priority Status 快照(跨 Crawler 实例共享,用于外部监控)
var globalPriorityStatus struct {
pending int64
active int64
}
// GlobalPriorityStatus 返回当前全局 Priority Worker 状态。
func GlobalPriorityStatus() map[string]interface{} {
return map[string]interface{}{
"pending": atomic.LoadInt64(&globalPriorityStatus.pending),
"active": atomic.LoadInt64(&globalPriorityStatus.active),
"max_workers": priorityMaxWorkers,
"children_queue": atomic.LoadInt64(&globalPriorityChildren),
}
}
// 全局 Priority 子链接队列长度(跨 Crawler 实例共享)
var globalPriorityChildren int64
// IncrementPriorityChildren 增加 priorityChildren 计数。
func IncrementPriorityChildren(n int64) {
atomic.AddInt64(&globalPriorityChildren, n)
}
// DecrementPriorityChildren 减少 priorityChildren 计数。
func DecrementPriorityChildren(n int64) {
atomic.AddInt64(&globalPriorityChildren, -n)
}
// New 创建一个 Crawler 实例。
// prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。
func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler {
c := &Crawler{
fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second),
db: db,
analyzer: a,
prosperMap: prosperMap,
visited: make(map[string]bool),
priorityCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
}
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
go c.runPriorityWorker()
// 启动时从 gate bucket 预热已爬取的 URL 集合(程序重启后不会重复爬取)
c.warmVisited()
return c
}
// warmVisited 从 DB 的 gate bucket 加载所有已缓存的 URL 到 visited set。
// 超过 RecrawlMaxAge 的 URL 不加入 visited,使其可以被重新爬取。
func (c *Crawler) warmVisited() {
count := 0
expired := 0
maxAge := int64(config.RecrawlMaxAge())
now := time.Now().Unix()
_ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error {
if now-entry.Timestamp < maxAge {
c.visited[u] = true // 未过期,仍然跳过
count++
} else {
expired++
}
return nil
})
log.Printf("[crawler] visited set warmed: %d active, %d expired (eligible for recrawl)", count, expired)
}
// startRecrawlTicker 启动后台定时任务,定期扫描并释放过期 URL 回到候选池。
// 已过期的 URL 从 visited map 中移除,使其可以在后续 BFS 轮次中被重新发现和爬取。
func (c *Crawler) startRecrawlTicker() {
interval := config.RecrawlCheckInterval()
if interval <= 0 {
return // 未配置或禁用
}
go func() {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer ticker.Stop()
for range ticker.C {
maxAge := int64(config.RecrawlMaxAge())
batchSize := config.RecrawlBatchSize()
now := time.Now().Unix()
removed := 0
c.visitedMu.Lock()
_ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error {
if removed >= batchSize {
return fmt.Errorf("batch full") // 提前终止遍历
}
if now-entry.Timestamp >= maxAge && c.visited[u] {
delete(c.visited, u)
removed++
}
return nil
})
c.visitedMu.Unlock()
if removed > 0 {
log.Printf("[crawler] recrawl ticker: released %d expired URLs back to pool", removed)
}
}
}()
}
// markVisited 将 URL 标记为已访问(线程安全)。
func (c *Crawler) markVisited(url string) {
c.visitedMu.Lock()
c.visited[url] = true
c.visitedMu.Unlock()
}
// ---- Priority Worker(突破 workers 上限,立即爬取高优先级 URL)----
// runPriorityWorker 独立处理高优先级 URL,不受主 workers 限制。
// 当用户插入 Priority URL 时,立即触发爬取(不等待 epoch 调度)。
func (c *Crawler) runPriorityWorker() {
for url := range c.priorityCh {
c.prioritySem <- struct{}{} // 获取令牌(阻塞直到有空闲槽位)
c.priorityMu.Lock()
c.priorityStats.active++
c.priorityStats.pending--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.pending, -1)
atomic.AddInt64(&globalPriorityStatus.active, 1)
c.priorityWg.Add(1)
go func(rawURL string) {
defer c.priorityWg.Done()
defer func() { <-c.prioritySem }() // 释放令牌
defer func() {
c.priorityMu.Lock()
c.priorityStats.active--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, -1)
}()
// 关键:强制移除 visited 标记(即使未过期也要重新爬取)
c.visitedMu.Lock()
delete(c.visited, rawURL)
c.visitedMu.Unlock()
log.Printf("[crawler] priority crawl started: %s", rawURL)
// 直接调用 visitURL,绕过队列调度
hrefs := c.visitURL(rawURL)
// 将子链接加入优先队列(保持优先级)
if len(hrefs) > 0 {
c.priorityChildrenMu.Lock()
for _, child := range hrefs {
if !c.isVisited(child) {
c.priorityChildren = append(c.priorityChildren, child)
}
}
IncrementPriorityChildren(int64(len(hrefs)))
c.priorityChildrenMu.Unlock()
}
log.Printf("[crawler] priority crawl done: %s (%d child links)", rawURL, len(hrefs))
// 清理已访问的 priority URL(防止重复爬取)
_ = c.db.RemovePriorityURL(rawURL)
}(url)
}
}
// TriggerPriorityCrawl 立即触发高优先级爬取(突破 workers 上限)。
// 适合用户手动插入 URL 时立即响应。
func (c *Crawler) TriggerPriorityCrawl(url string) {
select {
case c.priorityCh <- url:
c.priorityMu.Lock()
c.priorityStats.pending++
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.pending, 1)
log.Printf("[crawler] priority crawl triggered: %s", url)
default:
// 队列满了,降级到正常处理
log.Printf("[crawler] priority queue full, deferring to normal: %s", url)
}
}
// GetPriorityStatus 返回 Priority Worker 的实时状态。
func (c *Crawler) GetPriorityStatus() map[string]interface{} {
c.priorityMu.RLock()
defer c.priorityMu.RUnlock()
return map[string]interface{}{
"pending": c.priorityStats.pending,
"active": c.priorityStats.active,
"max_workers": priorityMaxWorkers,
}
}
// isVisited 检查 URL 是否已访问(线程安全)。
func (c *Crawler) isVisited(url string) bool {
c.visitedMu.RLock()
v := c.visited[url]
c.visitedMu.RUnlock()
return v
}
// fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs
// 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。
// 返回本次插入队列的 URL 数量。
func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int {
entries, err := c.db.GetPriorityURLs()
if err != nil || len(entries) == 0 {
return 0
}
added := 0
for _, e := range entries {
if c.isVisited(e.URL) {
_ = c.db.RemovePriorityURL(e.URL)
continue
}
*queue = append([]string{e.URL}, *queue...)
added++
}
_ = c.db.ClearVisitedPriorityURLs()
return added
}
// URLWeight 将 URL 和发现权重打包在一起,用于调度决策。
type URLWeight struct {
URL string // 待访问的 URL
Weight float64 // 发现权重(从父页面分得的"关注度",页面链接越多则每个分得越少)
}
// Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。
// 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。
// 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。
func (c *Crawler) Run(entryURL string, maxEpoch int) {
c.markVisited(entryURL)
queue := []string{entryURL}
// 启动后台重爬定时器:定期释放过期 URL 到候选池
c.startRecrawlTicker()
for ep := 0; ep < maxEpoch; ep++ {
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
workers := config.CrawlerWorkers()
// ---- 优先处理 priorityChildren 队列(来自 priority worker 的子链接)----
var priorityQueue []string
c.priorityChildrenMu.Lock()
if len(c.priorityChildren) > 0 {
priorityQueue = c.priorityChildren
// 更新全局计数器:这些 URL 即将被处理
DecrementPriorityChildren(int64(len(priorityQueue)))
log.Printf("[crawler] epoch %d/%d processing %d priority children first", ep+1, maxEpoch, len(priorityQueue))
}
c.priorityChildrenMu.Unlock()
// 每轮开始前:拉取 priority URLs,插入队列前端
priorityAdded := c.fetchAndApplyPriorityURLs(&queue)
if priorityAdded > 0 {
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers)
} else {
log.Printf("[crawler] epoch %d/%d queue=%d workers=%d", ep+1, maxEpoch, len(queue), workers)
}
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
for _, u := range queue {
c.markVisited(u)
}
// ---- 并发抓取本轮所有 URL ----
var (
newLinks []URLWeight // 收集下一轮候选 URL
mu sync.Mutex // 保护 newLinks 的并发写入
wg sync.WaitGroup
)
// 信号量:限制同时并发数(使用上方读取的 workers 值)
sem := make(chan struct{}, workers)
for _, u := range queue {
wg.Add(1)
sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位)
atomic.AddInt64(&c.activeWorkers, 1)
atomic.AddInt64(&globalActiveWorkers, 1)
go func(rawURL string) {
defer wg.Done()
defer func() { <-sem }() // 释放令牌
defer atomic.AddInt64(&c.activeWorkers, -1)
defer atomic.AddInt64(&globalActiveWorkers, -1)
// 抓取单个 URL,返回发现的子链接
hrefs := c.visitURL(rawURL)
n := len(hrefs)
if n == 0 {
return
}
// 收集未访问的子链接
var children []string
for _, h := range hrefs {
if !c.isVisited(h) {
children = append(children, h)
}
}
if len(children) == 0 {
return
}
// 分配权重
w := 1.0 / float64(n)
// 孙链接(来自 priorityChildren)爬取后,子链接进入正常 BFS 队列(不再优先传递)
// 所有子链接统一进入 newLinks,经过 schedule() 调度
mu.Lock()
for _, h := range children {
newLinks = append(newLinks, URLWeight{URL: h, Weight: w})
}
mu.Unlock()
}(u)
}
wg.Wait()
// ---- 清空本轮已处理的 priorityQueue ----
c.priorityChildrenMu.Lock()
if len(priorityQueue) > 0 {
c.priorityChildren = c.priorityChildren[len(priorityQueue):]
// 计数器已在提取时扣除,这里不需要额外操作
}
c.priorityChildrenMu.Unlock()
// 本轮没有发现新链接,爬取结束
if len(newLinks) == 0 {
log.Println("[crawler] empty queue — stopping")
return
}
// 调度算法:从候选 URL 中选出下一轮要抓取的队列
queue = c.schedule(newLinks)
}
}
// visitURL 抓取一个 URL,提取关键词、缓存摘要、更新网站元信息,返回页面中发现的子链接。
func (c *Crawler) visitURL(rawURL string) (hrefs []string) {
// recover 保护:防止任何模块(analyzer/storage/parser)的 panic 杀死 goroutine
defer func() {
if r := recover(); r != nil {
log.Printf("[crawler] visitURL panic recovered: url=%s error=%v", rawURL, r)
}
}()
atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1
// 使用 sync.WaitGroup + select 实现硬超时包装器,
// 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。
fetchTimeout := 30 * time.Second
var res *FetchResult
var fetchErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 礼貌模式抓取(遵守 robots.txt + 限流),限制页面大小防止内存爆炸
res, fetchErr = c.fetcher.fetchWithHistory(rawURL, true, fetchTimeout, config.MaxPageSize())
}()
waitCh := make(chan struct{})
go func() {
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
// fetch 正常返回(成功或错误)
case <-time.After(fetchTimeout + 5*time.Second):
log.Printf("[crawler] fetch timeout: %s", rawURL)
}
if fetchErr != nil || res == nil {
c.updateSiteFailure(rawURL) // 记录失败,更新该网站成功率
return
}
atomic.AddInt64(&c.stats.SuccessURLs, 1) // 成功计数器 +1
// 解析 HTML:提取标题、描述、正文和所有超链接
title, desc, text, hrefs := parser.ParseHTML(res.Body, res.FinalURL)
// 计算正文内容哈希(FNV-1a),用于增量重爬检测
contentHash := fnvHash(text)
// 增量重爬检测:查询上次爬取的哈希,内容未变则跳过关键词提取
isRecrawl := false
oldEntry, _ := c.db.GetSnippet(res.FinalURL)
if oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash {
isRecrawl = true
//log.Printf("[crawler] unchanged (recrawl skip): %s", res.FinalURL)
}
// 缓存 URL 摘要(仅对短 URL 缓存,防止超长 URL 浪费空间)
if len(res.FinalURL) < 250 {
_ = c.db.SetSnippet(res.FinalURL, &storage.SnippetEntry{
Title: title,
Description: truncate(desc, 256),
Text: truncate(text, 256),
Timestamp: time.Now().Unix(),
ContentHash: contentHash,
})
}
// 关键词提取:将标题/描述/正文交给 analyzer 计算关键词权重
// 增量优化:如果内容未变化(重爬),跳过关键词提取和索引更新
if !isRecrawl {
kws := c.analyzer.Analyze(title, desc, text)
if len(kws) > 0 {
// 限制每个页面最多发送的关键词数量
maxKws := config.MaxKeywordsPerPage()
if len(kws) > maxKws {
kws = kws[:maxKws]
}
atomic.AddInt64(&c.stats.KeywordsFetched, int64(len(kws)))
// 异步发送到收获服务器写入倒排索引(不阻塞爬取流程)
go c.sendToHarvester(res.FinalURL, kws)
}
}
// 更新网站元信息(成功访问)
host := netloc(res.FinalURL)
c.updateSiteSuccess(host, res, title, desc, text, hrefs)
// 处理永久重定向:更新源主机名下的重定向映射
for from, to := range res.Redirects {
fromHost := netloc(from)
if fromHost == "" {
continue
}
_ = c.db.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) {
if info.Redirects == nil {
info.Redirects = make(map[string]string)
}
info.Redirects[from] = to
if len(info.Redirects) > 50 {
info.Redirects = truncateMap(info.Redirects, 40)
}
})
}
// 限制返回的链接数,防止下一轮队列爆炸
if len(hrefs) > 100 {
hrefs = sampleStrings(hrefs, 100)
}
return hrefs
}
// updateSiteFailure 当某 URL 抓取失败时,更新该网站的访问成功率(指数衰减)。
func (c *Crawler) updateSiteFailure(rawURL string) {
host := netloc(rawURL)
if host == "" {
return
}
_ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) {
if info.SuccessRate == nil {
zero := 0.0
info.SuccessRate = &zero
}
// 成功率每次失败乘以 0.99(无限趋近 0)
*info.SuccessRate *= 0.99
})
}
// updateSiteSuccess 当某 URL 抓取成功时,更新网站的完整元信息。
// 使用 UpdateSiteInfo 原子读-改-写,避免并发 goroutine 对同一 host 的 SiteInfo 更新丢失。
func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, text string, hrefs []string) {
now := time.Now().Unix()
httpsAvailable := strings.HasPrefix(res.FinalURL, "https://")
serverType := res.ServerType
// 语言检测(CPU 密集,在锁外执行)
var detectedLang string
// 检测条件在 UpdateSiteInfo 回调内判断,这里预先计算好
detectedLang = c.analyzer.DetectLanguage(title + " " + desc + " " + text)
// 收集外链(跨顶级域名的链接)
superHost := superNetloc(res.FinalURL)
var external []string
for _, h := range hrefs {
if superNetloc(h) != superHost {
external = append(external, h)
}
}
sampled := sampleStrings(external, 10)
_ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) {
// 访问计数 +1,更新最后访问时间
info.VisitCount++
info.LastVisitTime = now
// 成功率更新:EWM(指数加权移动)平滑,每次 +0.01
one := 1.0
if info.SuccessRate == nil {
info.SuccessRate = &one
}
*info.SuccessRate = *info.SuccessRate*0.99 + 0.01
// 记录是否支持 HTTPS
if httpsAvailable {
t := true
info.HTTPSAvailable = &t
}
// 记录 HTTP Server 类型(去重,保留最近 5 个)
if serverType != "" {
found := false
for _, s := range info.ServerTypes {
if s == serverType {
found = true
break
}
}
if !found {
info.ServerTypes = append(info.ServerTypes, serverType)
if len(info.ServerTypes) > 5 {
info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:]
}
}
}
// 语言检测和出站链接收集(仅在前 10 次访问或 10% 概率下触发,减少开销)
if info.VisitCount < 10 || rand.Float64() < 0.1 {
if detectedLang != "" {
if info.Languages == nil {
info.Languages = make(map[string]float64)
}
// 首次访问强度高,随访问次数增加强度衰减
intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1)))
for k := range info.Languages {
info.Languages[k] *= (1 - intensity) // 旧语种按 intensity 衰减
}
info.Languages[detectedLang] += intensity // 新语种增加
}
// 外链
info.OutLinks = append(info.OutLinks, sampled...)
if len(info.OutLinks) > 250 {
info.OutLinks = sampleStrings(info.OutLinks, 200)
}
}
})
}
// sendToHarvester 将关键词索引数据通过 HTTP POST 发送到搜索服务器(/l 端点)。
// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因收获服务故障而堆积。
func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) {
now := time.Now().Unix()
// ---- 熔断检查(atomic,无锁) ----
state := atomic.LoadInt32(&c.circuitState)
expiry := atomic.LoadInt64(&c.circuitExpiry)
switch state {
case circuitOpen:
if now < expiry {
return // 熔断中,直接跳过
}
// 冷却结束,切换到半开,放行一个试探请求
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
case circuitHalfOpen:
if now < expiry {
return // 半开冷却中,只放行第一个,其余跳过
}
// 半开超时,重新进入半开状态
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
}
type payload struct {
URL string `json:"url"`
Keywords []analyzer.Keyword `json:"keywords"`
}
p := payload{URL: finalURL, Keywords: kws}
data, err := json.Marshal(p)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d/l", config.SearchServerPort()), bytes.NewReader(data))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
// ---- HTTP 请求(此时没有任何锁) ----
resp, err := http.DefaultClient.Do(req)
// ---- 结果处理(atomic,无锁) ----
if err != nil {
failures := atomic.AddInt32(&c.circuitFailures, 1)
if failures >= circuitFailureThreshold {
atomic.StoreInt32(&c.circuitState, circuitOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
//log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds",failures, circuitCooldownSeconds)
}
return
}
resp.Body.Close()
// ---- 成功:重置熔断器 ----
atomic.StoreInt32(&c.circuitFailures, 0)
atomic.StoreInt32(&c.circuitState, circuitClosed)
}
// schedule 从候选 URL 集合中选出下一轮 BFS 队列。
// 包含:域名集中度过滤、HTTP/HTTPS 比例控制、繁荣 URL 占比控制、加权随机采样。
func (c *Crawler) schedule(links []URLWeight) []string {
// 候选过多时先随机采样到 10 万条,防止内存爆炸
if len(links) > 100000 {
links = sampleURLWeights(links, 100000)
}
// 预加载所有涉及的网站信息(加速后续评分计算)
domains := make(map[string]bool)
for _, lw := range links {
if h := netloc(lw.URL); h != "" {
domains[h] = true
}
if h := superNetloc(lw.URL); h != "" {
domains[h] = true
}
}
siteCache := make(map[string]*storage.SiteInfo, len(domains))
var mu sync.Mutex
var wg sync.WaitGroup
for d := range domains {
wg.Add(1)
go func(host string) {
defer wg.Done()
info, _ := c.db.GetSiteInfo(host)
mu.Lock()
siteCache[host] = info
mu.Unlock()
}(d)
}
wg.Wait()
// 对所有候选 URL 逐一计算调度优先级分数
scored_list := make([]scoredURL, len(links))
for i, lw := range links {
scored_list[i] = scoredURL{url: lw.URL, score: c.scoreURL(lw, siteCache)}
}
// 加权随机采样:从高分到低分按权重概率抽取最多 k 条
k := min(45000, len(scored_list)/3+250)
selected := weightedSample(scored_list, k)
// 域名集中度过滤:限制每个域名被选中的数量,防止被少数网站垄断
selected = concentrationFilter(selected, config.CrawlFocus())
// 分离 HTTPS 和 HTTP 链接,HTTP 最多占 HTTPS 的 1/4
var httpsURLs, httpURLs []string
for _, s := range selected {
if strings.HasPrefix(s, "https://") {
httpsURLs = append(httpsURLs, s)
} else {
httpURLs = append(httpURLs, s)
}
}
maxHTTP := len(httpsURLs) / 4
if len(httpURLs) > maxHTTP {
httpURLs = sampleStrings(httpURLs, maxHTTP)
}
// 分离繁荣(高反向链接)域名和普通域名,按比例控制繁荣 URL 占比
var prosperURLs, otherURLs []string
for _, u := range append(httpsURLs, httpURLs...) {
if c.prosperMap[netloc(u)] > 0 {
prosperURLs = append(prosperURLs, u)
} else {
otherURLs = append(otherURLs, u)
}
}
// 根据目标繁荣占比计算普通 URL 应保留数量
expectedProsperRatio := config.ExpectedProsperRatio()
n := int(float64(len(prosperURLs)) * (1 - expectedProsperRatio) / expectedProsperRatio)
if len(otherURLs) > n {
keep := max(len(otherURLs)-len(selected)/10, n)
if keep < len(otherURLs) {
otherURLs = sampleStrings(otherURLs, keep)
}
}
// 合并并随机打乱(使繁荣 URL 和普通 URL 混合)
result := append(prosperURLs, otherURLs...)
rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] })
return result
}
// scoreURL 计算单个 URL 的调度优先级分数。
// 综合考虑:中文语种权重、域名访问历史衰减、网站质量评分、繁荣值、URL 本身质量。
func (c *Crawler) scoreURL(lw URLWeight, siteCache map[string]*storage.SiteInfo) float64 {
host := netloc(lw.URL)
super := superNetloc(lw.URL)
info := siteCache[host]
if info == nil {
info = &storage.SiteInfo{}
}
// 中文倾向性:该网站中文内容占比
var chineseness float64 = 0.5
if len(info.Languages) > 0 {
total := 0.0
for _, v := range info.Languages {
total += v
}
if total > 0 {
chineseness = info.Languages["zh"] / total
}
}
// 兴趣衰减:基于访问次数的指数衰减,繁荣域名可访问更多次
prosper := math.Min(62, c.prosperMap[host])
limit := prosper*500 + 50
b := math.Pow(0.1, 1/limit)
interest := math.Pow(b, float64(info.VisitCount))
// 同理对顶级域名计算衰减(二级域名不够用时看顶级域名)
var interest2 float64 = 1.0
if super != host {
superInfo := siteCache[super]
if superInfo != nil {
limit2 := math.Min(62, c.prosperMap[super])*500 + 50
b2 := math.Pow(0.1, 1/limit2)
interest2 = math.Pow(b2, float64(superInfo.VisitCount))
}
}
// 网站质量评分
quality := 1.0
if info.Quality != nil {
quality = *info.Quality
}
// 繁荣值加分(log 变换平滑)
prosperity := prosper
if prosperity > 0 {
prosperity += 0.5
}
prosperity = math.Log2(2+prosperity) + 1
// URL 本身的质量惩罚(过长、路径过深、使用 .php/.htm 等)
bad := badURL(lw.URL)
return (0.1 + chineseness) * math.Min(0.05+interest, 0.05+interest2) * quality * (1 - bad) * lw.Weight * prosperity
}
// ---- 辅助函数 ----
// netloc 从原始 URL 字符串提取主机名(不含路径)。
// 支持 http:// 和 https:// 前缀,自动处理 URL 解析异常。
func netloc(rawURL string) string {
parts := strings.SplitN(rawURL, "/", 4)
if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" {
return parts[2]
}
u, err := url.Parse(rawURL)
if err != nil {
return ""
}
return u.Host
}
// superNetloc 返回顶级域名(去除子域名),例如 "www.example.com" → "example.com"。
// 用于识别跨子域名但同主站的情况。
func superNetloc(rawURL string) string {
host := netloc(rawURL)
parts := strings.Split(host, ".")
if len(parts) >= 2 {
return strings.Join(parts[len(parts)-2:], ".")
}
return host
}
// badURL 返回 URL 的"劣质"评分(0~0.9),基于长度、路径深度、文件扩展名等特征。
func badURL(u string) float64 {
// URL 过长惩罚
s := math.Max(0, float64(len(u)-30)/200.0)
// 使用 .htm/.php 等动态页面惩罚
if strings.Contains(u, ".htm") || strings.Contains(u, ".php") {
s += (1 - s) * 0.3
}
// 路径层级过深惩罚(超过 2 层斜杠)
if strings.Count(strings.TrimRight(u, "/"), "/") > 2 {
s += (1 - s) * 0.1
}
// 极短 URL 或协议后冒号(如 ftp:)惩罚
if len(u) < 5 || u[4] == ':' {
s += (1 - s) * 0.3
}
return math.Min(s, 0.9)
}
// truncate 将字符串截断到最多 n 个字符。
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}
// fnvHash 使用 FNV-1a 算法计算字符串的哈希值(十六进制字符串)。
// 用于增量重爬时检测页面正文是否发生变化。
func fnvHash(s string) string {
h := fnv.New128a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum(nil))
}
// sampleStrings 从字符串切片中随机不重复抽取 n 条。
func sampleStrings(s []string, n int) []string {
if len(s) <= n {
return s
}
perm := rand.Perm(len(s))
out := make([]string, n)
for i := range out {
out[i] = s[perm[i]]
}
return out
}
// sampleURLWeights 与 sampleStrings 相同,但处理 URLWeight 切片。
func sampleURLWeights(s []URLWeight, n int) []URLWeight {
if len(s) <= n {
return s
}
perm := rand.Perm(len(s))
out := make([]URLWeight, n)
for i := range out {
out[i] = s[perm[i]]
}
return out
}
// scoredURL 内部用结构体,存储 URL 和对应调度分数。
type scoredURL struct {
url string
score float64
}
// weightedSample 加权随机采样(不放回):从 scoredURL 列表中按权重概率抽取最多 k 条。
// 使用累积概率法近似 alias method(适合中等规模数据)。
func weightedSample(items []scoredURL, k int) []string {
if k >= len(items) {
out := make([]string, len(items))
for i, s := range items {
out[i] = s.url
}
return out
}
totalWeight := 0.0
for _, s := range items {
totalWeight += s.score
}
selected := make(map[int]bool)
out := make([]string, 0, k)
for len(out) < k && len(selected) < len(items) {
// 随机取 [0, totalWeight) 区间的一个点
r := rand.Float64() * totalWeight
cum := 0.0
for i, s := range items {
if selected[i] {
continue
}
cum += s.score
if cum >= r {
selected[i] = true
out = append(out, s.url)
totalWeight -= s.score // 被选中后从总权重中移除(不放回)
break
}
}
}
return out
}
// concentrationFilter 域名集中度过滤。
// 按 CrawlFocus 因子限制每个顶级域名被选中的 URL 数量,防止爬取过于集中在少数网站。
func concentrationFilter(urls []string, k float64) []string {
// 按顶级域名分组
domainGroups := make(map[string][]string)
shuffled := make([]string, len(urls))
copy(shuffled, urls)
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
for _, u := range shuffled {
d := superNetloc(u)
domainGroups[d] = append(domainGroups[d], u)
}
// 计算每组保留上限:域名规模越大允许越多,但按 k 次幂压制
limit := 10
if len(domainGroups) > 1 {
sizes := make([]int, 0, len(domainGroups))
for _, g := range domainGroups {
sizes = append(sizes, int(math.Pow(float64(len(g)), k)))
}
// 升序排列,去除最大一项,用其余项总和的 60% 作为全局上限
for i := 0; i < len(sizes)-1; i++ {
for j := i + 1; j < len(sizes)-1; j++ {
if sizes[j] < sizes[i] {
sizes[i], sizes[j] = sizes[j], sizes[i]
}
}
}
total := 0
for _, s := range sizes[:len(sizes)-1] {
total += s
}
limit = max(10, int(float64(total)*0.6))
}
// 从每组中按计算的上限采样
var result []string
for _, g := range domainGroups {
sn := 1 + min(limit, int(math.Pow(float64(len(g)), k)))
if sn > len(g) {
sn = len(g)
}
result = append(result, g[:sn]...)
}
rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] })
return result
}
// truncateMap 将 map 裁剪到最多 n 条(取前 n 条,无特定顺序)。
func truncateMap(m map[string]string, n int) map[string]string {
if len(m) <= n {
return m
}
out := make(map[string]string, n)
i := 0
for k, v := range m {
if i >= n {
break
}
out[k] = v
i++
}
return out
}
// min 返回两个整数中的较小值。
func min(a, b int) int {
if a < b {
return a
}
return b
}
// max 返回两个整数中的较大值。
func max(a, b int) int {
if a > b {
return a
}
return b
}
// GetStats 返回当前爬虫统计快照(用于监控)。
func (c *Crawler) GetStats() Stats {
return Stats{
VisitedURLs: atomic.LoadInt64(&c.stats.VisitedURLs),
SuccessURLs: atomic.LoadInt64(&c.stats.SuccessURLs),
KeywordsFetched: atomic.LoadInt64(&c.stats.KeywordsFetched),
}
}