// crawler.go — BFS crawl loop, URL scheduling, and site-info updating. // crawler 包的主逻辑:BFS 爬取循环、URL 调度算法、网站元信息更新。 package crawler import ( "bytes" // 字节缓冲(构造 HTTP POST 请求体) "context" // context 超时控制 "encoding/json" // JSON 序列化(发送关键词数据到收获服务) "fmt" // 格式化(构造目标地址) "hash/fnv" // FNV 哈希(内容变化检测) "log" // 日志输出 "math" // 数学运算(指数衰减、质量评分) "math/rand" // 随机数(加权采样、队列打乱) "net/http" // HTTP 客户端(POST 数据到收获服务) "net/url" // URL 解析 "strings" // 字符串操作 "sync" // 互斥锁(保护并发收集结果) "sync/atomic" // 原子操作(计数器,无锁并发更新) "time" // 时间戳 "sese-engine/analyzer" // 文本分析和关键词提取 "sese-engine/config" // 全局配置常量 "sese-engine/parser" // HTML 解析(提取标题、正文、链接) "sese-engine/storage" // 持久化存储 ) // Stats 存放爬虫实时统计计数器(使用 atomic 原子读取)。 type Stats struct { VisitedURLs int64 // 已访问的 URL 总数(含失败) SuccessURLs int64 // 成功抓取(HTTP 200)的 URL 数 KeywordsFetched int64 // 累计提取的关键词总数 } // 熔断器状态(用 atomic int32 代替 mutex,避免持有锁时的慢 I/O)。 const ( circuitClosed int32 = iota // 正常:所有请求都发往 harvester circuitOpen // 断开:连续失败 N 次后,冷却时间内跳过所有请求 circuitHalfOpen // 半开:冷却结束,尝试放行一次请求试探 ) const ( circuitFailureThreshold = 5 // 连续失败多少次后触发熔断 circuitCooldownSeconds = 30 // 熔断持续时间(秒) ) // Priority Worker 配置 const ( priorityMaxWorkers = 50 // Priority 独立 goroutine 上限(突破主 workers) priorityQueueSize = 100 // Priority 任务队列缓冲大小 ) // Crawler 编排整个 BFS 爬取流程。 type Crawler struct { fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流) db *storage.DB // 持久化数据库 analyzer *analyzer.Analyzer // 分词和关键词分析 prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值") stats Stats // 原子计数器 // visited 记录已访问的 URL 集合(跨 epoch 持久,启动时从 DB 预热) visited map[string]bool visitedMu sync.RWMutex // 保护 visited 的并发读写 // 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险) circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen circuitFailures int32 // 连续失败计数(atomic) circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒) // 运行时活跃线程计数(atomic,每轮 epoch 自动归零前重新开始计数) activeWorkers int64 // 本轮发现的新链接计数(atomic,供前端实时监控) newLinksCount int64 // ---- Priority Worker(独立 goroutine,不受主 workers 限制)---- priorityCh chan string // Priority URL 任务队列(用户手动添加) priorityChildCh chan string // Priority 子链接队列(子 URL 继续由 priority worker 爬取) prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers) priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束 priorityMu sync.RWMutex // 保护 priorityStats 和 priorityChildLinks priorityStats struct { pending int64 // 待处理的 Priority URL 数量(入队但未开始) active int64 // 正在处理的 Priority URL 数量 } // 孙链接(子 URL 的子链接)进入普通 BFS 队列 normalChildCh chan URLWeight // 孙链接 channel,由 Run 循环消费 // ---- 爬取状态暴露(供前端监控) ---- crawlStatusMu sync.RWMutex crawlStatus CrawlStatus // 当前轮次状态 } // CrawlStatus 暴露给前端监控的爬取状态 type CrawlStatus struct { CurrentEpoch int `json:"current_epoch"` // 当前轮次(从1开始) MaxEpoch int `json:"max_epoch"` // 总轮数上限 QueueLength int `json:"queue_length"` // 本轮队列长度 CompletedCount int `json:"completed_count"` // 本轮已完成的 URL 数 VisitedTotal int `json:"visited_total"` // 已收录 URL 总数 NewLinksCount int64 `json:"new_links_count"` // 本轮已发现的新链接数(实时更新) NextPoolSize int `json:"next_pool_size"` // 下一轮链接池大小(调度后的队列长度) IsRunning bool `json:"is_running"` // 是否正在运行 } // 全局活跃线程计数器(跨包可读,无需持有 Crawler 引用) var globalActiveWorkers int64 // ActiveWorkers 返回当前正在运行的爬虫 goroutine 数量。 // 也可通过包级函数 GlobalActiveWorkers() 读取(供 search 等外部包使用)。 func (c *Crawler) ActiveWorkers() int64 { return atomic.LoadInt64(&c.activeWorkers) } // GlobalActiveWorkers 返回当前全局活跃爬虫 goroutine 数量(包级,外部包可直接调用)。 func GlobalActiveWorkers() int64 { return atomic.LoadInt64(&globalActiveWorkers) } // 全局 Priority Status 快照(跨 Crawler 实例共享,用于外部监控) var globalPriorityStatus struct { pending int64 active int64 } // 全局二级优先队列待处理计数(goroutine 异步发送,无法用 len() 直接获取) var globalPriorityLevel2 int64 // GlobalPriorityStatus 返回当前全局 Priority Worker 状态。 func GlobalPriorityStatus() map[string]interface{} { return map[string]interface{}{ "active": atomic.LoadInt64(&globalPriorityStatus.active), "max_workers": priorityMaxWorkers, "level1": atomic.LoadInt64(&globalPriorityStatus.pending), "level2": atomic.LoadInt64(&globalPriorityLevel2), "level2_queue": atomic.LoadInt64(&globalPriorityLevel2), // 待爬取 "level2_inflight": atomic.LoadInt64(&globalPriorityLevel2Inflight), // 正在爬取 } } // IncrementPriorityLevel2 增加二级队列计数。 func IncrementPriorityLevel2(n int64) { atomic.AddInt64(&globalPriorityLevel2, n) } // DecrementPriorityLevel2 减少二级队列计数。 func DecrementPriorityLevel2(n int64) { atomic.AddInt64(&globalPriorityLevel2, -n) } // ---- 二级正在爬取的计数(drain 时启动,goroutine 结束时扣减)---- var globalPriorityLevel2Inflight int64 func IncrementPriorityLevel2Inflight(n int64) { atomic.AddInt64(&globalPriorityLevel2Inflight, n) } func DecrementPriorityLevel2Inflight(n int64) { atomic.AddInt64(&globalPriorityLevel2Inflight, -n) } // New 创建一个 Crawler 实例。 // prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。 func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler { c := &Crawler{ fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second), db: db, analyzer: a, prosperMap: prosperMap, visited: make(map[string]bool), priorityCh: make(chan string, priorityQueueSize), priorityChildCh: make(chan string, priorityQueueSize), prioritySem: make(chan struct{}, priorityMaxWorkers), normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢 } // 启动 Priority Worker(独立 goroutine,不受主 workers 限制) go c.runPriorityWorker() // 启动时从 gate bucket 预热已爬取的 URL 集合(程序重启后不会重复爬取) c.warmVisited() return c } // warmVisited 从 DB 的 gate bucket 加载所有已缓存的 URL 到 visited set。 // 超过 RecrawlMaxAge 的 URL 不加入 visited,使其可以被重新爬取。 func (c *Crawler) warmVisited() { count := 0 expired := 0 maxAge := int64(config.RecrawlMaxAge()) now := time.Now().Unix() _ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { if now-entry.Timestamp < maxAge { c.visited[u] = true // 未过期,仍然跳过 count++ } else { expired++ } return nil }) log.Printf("[crawler] visited set warmed: %d active, %d expired (eligible for recrawl)", count, expired) } // startRecrawlTicker 启动后台定时任务,定期扫描并释放过期 URL 回到候选池。 // 已过期的 URL 从 visited map 中移除,使其可以在后续 BFS 轮次中被重新发现和爬取。 func (c *Crawler) startRecrawlTicker() { interval := config.RecrawlCheckInterval() if interval <= 0 { return // 未配置或禁用 } go func() { ticker := time.NewTicker(time.Duration(interval) * time.Second) defer ticker.Stop() for range ticker.C { maxAge := int64(config.RecrawlMaxAge()) batchSize := config.RecrawlBatchSize() now := time.Now().Unix() removed := 0 c.visitedMu.Lock() _ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { if removed >= batchSize { return fmt.Errorf("batch full") // 提前终止遍历 } if now-entry.Timestamp >= maxAge && c.visited[u] { delete(c.visited, u) removed++ } return nil }) c.visitedMu.Unlock() if removed > 0 { log.Printf("[crawler] recrawl ticker: released %d expired URLs back to pool", removed) } } }() } // markVisited 将 URL 标记为已访问(线程安全)。 func (c *Crawler) markVisited(url string) { c.visitedMu.Lock() c.visited[url] = true c.visitedMu.Unlock() } // ---- Priority Worker(突破 workers 上限,立即爬取高优先级 URL)---- // runPriorityWorker 独立处理高优先级 URL,不受主 workers 限制。 // 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。 // - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列 // - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS func (c *Crawler) runPriorityWorker() { for { // ---- 第一阶段:drain 一级队列全部取出 ---- level1Batch := []string{} // 先阻塞等第一个一级 URL url := <-c.priorityCh level1Batch = append(level1Batch, url) // drain 剩余一级 URL(非阻塞) for { select { case url = <-c.priorityCh: level1Batch = append(level1Batch, url) default: goto processLevel1 } } processLevel1: // batch 收集完成后才扣减 pending,此时 pending = len(level1Batch) + len(priorityCh) // 这样 level1 能正确反映队列中待处理的 URL 总数 log.Printf("[priority] drain level1 batch: count=%d, pending_before=%d, priorityCh_len=%d", len(level1Batch), atomic.LoadInt64(&globalPriorityStatus.pending), len(c.priorityCh)) atomic.AddInt64(&globalPriorityStatus.pending, -int64(len(level1Batch))) // ---- 启动全部一级 goroutine ---- for _, u := range level1Batch { c.prioritySem <- struct{}{} c.priorityMu.Lock() c.priorityStats.active++ c.priorityMu.Unlock() atomic.AddInt64(&globalPriorityStatus.active, 1) c.priorityWg.Add(1) go c.priorityCrawlLoop(u, 1) } // 等待一级 goroutine 全部完成 c.priorityWg.Wait() // ---- 第二阶段:drain 二级队列全部取出 ---- level2Batch := []string{} drained := int64(0) for { select { case child := <-c.priorityChildCh: level2Batch = append(level2Batch, child) drained++ // 实时扣减 level2,使 API 在 drain 期间仍能看到剩余未处理的量 atomic.AddInt64(&globalPriorityLevel2, -1) default: goto processLevel2 } } processLevel2: // drained 项已在 drain 循环中逐个从 level2 扣减 // 启动 goroutine 后这些 URL 进入"正在爬取"状态 IncrementPriorityLevel2Inflight(int64(len(level2Batch))) log.Printf("[priority] drain level2 done: drained=%d, inflight_before_spawn=%d", len(level2Batch), atomic.LoadInt64(&globalPriorityLevel2Inflight)) // ---- 启动全部二级 goroutine ---- for _, u := range level2Batch { c.prioritySem <- struct{}{} c.priorityMu.Lock() c.priorityStats.active++ c.priorityMu.Unlock() atomic.AddInt64(&globalPriorityStatus.active, 1) c.priorityWg.Add(1) go c.priorityCrawlLoop(u, 2) } // 等待二级 goroutine 全部完成,才进入下一轮(再次处理一级队列) c.priorityWg.Wait() } } // priorityCrawlLoop 爬取单个 URL: // // level=1(一级,手动 URL):visitURLUnlimited 无限爬子链接 → 二级队列(priorityChildCh) // level=2(二级,子 URL):visitURLUnlimited 无限爬子链接 → 孙链接(normalChildCh) func (c *Crawler) priorityCrawlLoop(rawURL string, level int) { defer c.priorityWg.Done() defer func() { <-c.prioritySem }() defer func() { c.priorityMu.Lock() c.priorityStats.active-- c.priorityMu.Unlock() atomic.AddInt64(&globalPriorityStatus.active, -1) if level == 2 { DecrementPriorityLevel2Inflight(1) } }() // 标记 DB 中该 URL 为已访问,防止重启后再次被调度 _ = c.db.MarkPriorityURLVisited(rawURL) // 两级都不限制子链接数量 children := c.visitURLUnlimited(rawURL) //log.Printf("[crawler] priority[%d] crawl done: %s (%d child links)", level, rawURL, len(children)) if len(children) == 0 { return } // 一级:子链接进二级队列;二级:子链接直接加入 newLinks(同步) for _, child := range children { if level == 1 { select { case c.priorityChildCh <- child: IncrementPriorityLevel2(1) default: // 二级队列满,丢弃 } } else { // 二级:子链接进孙链接 channel,由 Run 永久 drain 直到全部到达 select { case c.normalChildCh <- URLWeight{URL: child, Weight: 1.0}: default: // channel 满(很少发生),孙链接丢弃 } } } } // TriggerPriorityCrawl 立即触发高优先级爬取(突破 workers 上限)。 // 适合用户手动插入 URL 时立即响应。 func (c *Crawler) TriggerPriorityCrawl(url string) { defer func() { if r := recover(); r != nil { // priorityCh 已关闭(Run 已退出),忽略 log.Printf("[crawler] priority crawl ignored (crawler stopped): %s", url) } }() select { case c.priorityCh <- url: c.priorityMu.Lock() c.priorityStats.pending++ c.priorityMu.Unlock() atomic.AddInt64(&globalPriorityStatus.pending, 1) log.Printf("[crawler] priority crawl triggered: %s", url) default: // 队列满了,降级到正常处理 log.Printf("[crawler] priority queue full, deferring to normal: %s", url) } } // GetPriorityStatus 返回 Priority Worker 的实时状态。 func (c *Crawler) GetPriorityStatus() map[string]interface{} { c.priorityMu.RLock() defer c.priorityMu.RUnlock() return map[string]interface{}{ "pending": c.priorityStats.pending, "active": c.priorityStats.active, "max_workers": priorityMaxWorkers, } } // isVisited 检查 URL 是否已访问(线程安全)。 func (c *Crawler) isVisited(url string) bool { c.visitedMu.RLock() v := c.visited[url] c.visitedMu.RUnlock() return v } // fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs, // 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。 // 返回本次插入队列的 URL 数量。 func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int { entries, err := c.db.GetPriorityURLs() if err != nil || len(entries) == 0 { return 0 } added := 0 for _, e := range entries { if c.isVisited(e.URL) { _ = c.db.RemovePriorityURL(e.URL) continue } *queue = append([]string{e.URL}, *queue...) added++ } _ = c.db.ClearVisitedPriorityURLs() return added } // URLWeight 将 URL 和发现权重打包在一起,用于调度决策。 type URLWeight struct { URL string // 待访问的 URL Weight float64 // 发现权重(从父页面分得的"关注度",页面链接越多则每个分得越少) } // Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。 // 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。 // 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。 func (c *Crawler) Run(entryURL string, maxEpoch int) { c.markVisited(entryURL) queue := []string{entryURL} // 初始化爬取状态 c.updateCrawlStatus(func(cs *CrawlStatus) { cs.MaxEpoch = maxEpoch cs.IsRunning = true }) // 启动后台重爬定时器:定期释放过期 URL 到候选池 c.startRecrawlTicker() for ep := 0; ep < maxEpoch; ep++ { // 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整 workers := config.CrawlerWorkers() // 重置新链接计数器 atomic.StoreInt64(&c.newLinksCount, 0) // 更新爬取状态:新一轮开始 c.updateCrawlStatus(func(cs *CrawlStatus) { cs.CurrentEpoch = ep + 1 cs.QueueLength = len(queue) cs.CompletedCount = 0 cs.NewLinksCount = 0 }) // 每轮开始前:拉取 priority URLs,插入队列前端 priorityAdded := c.fetchAndApplyPriorityURLs(&queue) if priorityAdded > 0 { log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers) } else { log.Printf("[crawler] epoch %d/%d queue=%d workers=%d", ep+1, maxEpoch, len(queue), workers) } // 将本轮所有 URL 标记为已访问(防止下一轮重复入队) for _, u := range queue { c.markVisited(u) } // ---- 并发抓取本轮所有 URL ---- var ( newLinks []URLWeight // 收集下一轮候选 URL mu sync.Mutex // 保护 newLinks 的并发写入 wg sync.WaitGroup completed int64 // 本轮已完成的计数(atomic) ) // 信号量:限制同时并发数(使用上方读取的 workers 值) sem := make(chan struct{}, workers) for _, u := range queue { wg.Add(1) sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位) atomic.AddInt64(&c.activeWorkers, 1) atomic.AddInt64(&globalActiveWorkers, 1) go func(rawURL string) { defer wg.Done() defer func() { <-sem }() // 释放令牌 defer atomic.AddInt64(&c.activeWorkers, -1) defer atomic.AddInt64(&globalActiveWorkers, -1) // 抓取单个 URL,返回发现的子链接 hrefs := c.visitURL(rawURL) // 更新完成计数 currentCompleted := atomic.AddInt64(&completed, 1) c.updateCrawlStatus(func(cs *CrawlStatus) { cs.CompletedCount = int(currentCompleted) }) n := len(hrefs) if n == 0 { return } // 收集未访问的子链接 var children []string for _, h := range hrefs { if !c.isVisited(h) { children = append(children, h) } } if len(children) == 0 { return } // 分配权重 w := 1.0 / float64(n) // 子 URL 进入 newLinks 调度,由普通 BFS 下一轮处理。 // (priority worker 通过 priorityChildCh 独立爬取子 URL,两者互不干扰) mu.Lock() for _, h := range children { newLinks = append(newLinks, URLWeight{URL: h, Weight: w}) // 实时自增计数(供前端监控) atomic.AddInt64(&c.newLinksCount, 1) c.updateCrawlStatus(func(cs *CrawlStatus) { cs.NewLinksCount = atomic.LoadInt64(&c.newLinksCount) }) } mu.Unlock() }(u) } wg.Wait() // drain 孙链接 channel(孙链接来自 priority 爬取的子链接,进入普通 BFS 队列) // runPriorityWorker 发送完毕后等待足够时间,确保 channel 中最后几条到达 timeout := time.After(10 * time.Second) drained := false for !drained { select { case gc := <-c.normalChildCh: newLinks = append(newLinks, gc) atomic.AddInt64(&c.newLinksCount, 1) // 孙链接也要计入 c.updateCrawlStatus(func(cs *CrawlStatus) { cs.NewLinksCount = atomic.LoadInt64(&c.newLinksCount) }) case <-timeout: drained = true } } // 更新已收录总数 c.visitedMu.RLock() visitedTotal := len(c.visited) c.visitedMu.RUnlock() c.updateCrawlStatus(func(cs *CrawlStatus) { cs.VisitedTotal = visitedTotal }) // 本轮没有发现新链接,报告停止,等待新 Priority URL 触发重启 if len(newLinks) == 0 { log.Println("[crawler] empty — stopped, waiting for new URLs") c.updateCrawlStatus(func(cs *CrawlStatus) { cs.IsRunning = false }) // 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始 for { gc, ok := <-c.normalChildCh if !ok { return } newLinks = append(newLinks, gc) log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks)) c.updateCrawlStatus(func(cs *CrawlStatus) { cs.IsRunning = true }) ep = -1 // continue 后 ep++ 变成 0 break // 退出空循环,进入正常队列处理 } } // 调度算法:从候选 URL 中选出下一轮要抓取的队列 nextPoolSize := len(newLinks) queue = c.schedule(newLinks) // 更新下一轮链接池大小 c.updateCrawlStatus(func(cs *CrawlStatus) { cs.NextPoolSize = nextPoolSize }) } // 所有轮次完成,更新状态 c.updateCrawlStatus(func(cs *CrawlStatus) { cs.IsRunning = false }) } // visitURLRaw 抓取 URL 的核心逻辑,提取标题、描述、正文、子链接。 // 不包含链接数限制,用于优先级爬取。 // forceIndex=true 时强制跳过增量重爬检测,直接提取关键词写入索引(用于 Priority URL)。 func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text string, hrefs []string) { // recover 保护:防止任何模块(analyzer/storage/parser)的 panic 杀死 goroutine defer func() { if r := recover(); r != nil { log.Printf("[crawler] visitURL panic recovered: url=%s error=%v", rawURL, r) } }() // 使用 sync.WaitGroup + select 实现硬超时包装器, // 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。 fetchTimeout := 30 * time.Second var res *FetchResult var fetchErr error var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() // 礼貌模式抓取(遵守 robots.txt + 限流),限制页面大小防止内存爆炸 res, fetchErr = c.fetcher.fetchWithHistory(rawURL, true, fetchTimeout, config.MaxPageSize()) }() waitCh := make(chan struct{}) go func() { wg.Wait() close(waitCh) }() select { case <-waitCh: // fetch 正常返回(成功或错误) case <-time.After(fetchTimeout + 5*time.Second): log.Printf("[crawler] fetch timeout: %s", rawURL) } if fetchErr != nil || res == nil { c.updateSiteFailure(rawURL) // 记录失败,更新该网站成功率 return } atomic.AddInt64(&c.stats.SuccessURLs, 1) // 成功计数器 +1 // 解析 HTML:提取标题、描述、正文和所有超链接 title, desc, text, hrefs = parser.ParseHTML(res.Body, res.FinalURL) // 计算正文内容哈希(FNV-1a),用于增量重爬检测 contentHash := fnvHash(text) // 增量重爬检测:查询上次爬取的哈希,内容未变则跳过关键词提取 isRecrawl := false oldEntry, _ := c.db.GetSnippet(res.FinalURL) if !forceIndex && oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash { isRecrawl = true //log.Printf("[crawler] unchanged (recrawl skip): %s", res.FinalURL) } // 缓存 URL 摘要(仅对短 URL 缓存,防止超长 URL 浪费空间) if len(res.FinalURL) < 250 { _ = c.db.SetSnippet(res.FinalURL, &storage.SnippetEntry{ Title: title, Description: truncate(desc, 256), Text: truncate(text, 256), Timestamp: time.Now().Unix(), ContentHash: contentHash, }) } // 关键词提取:将标题/描述/正文交给 analyzer 计算关键词权重 // 增量优化:如果内容未变化(重爬),跳过关键词提取和索引更新 if !isRecrawl { kws := c.analyzer.Analyze(title, desc, text) if len(kws) > 0 { // 限制每个页面最多发送的关键词数量 maxKws := config.MaxKeywordsPerPage() if len(kws) > maxKws { kws = kws[:maxKws] } atomic.AddInt64(&c.stats.KeywordsFetched, int64(len(kws))) // 异步发送到收获服务器写入倒排索引(不阻塞爬取流程) go c.sendToHarvester(res.FinalURL, kws) } } // 更新网站元信息(成功访问) host := netloc(res.FinalURL) c.updateSiteSuccess(host, res, title, desc, text, hrefs) // 处理永久重定向:更新源主机名下的重定向映射 for from, to := range res.Redirects { fromHost := netloc(from) if fromHost == "" { continue } _ = c.db.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) { if info.Redirects == nil { info.Redirects = make(map[string]string) } info.Redirects[from] = to if len(info.Redirects) > 50 { info.Redirects = truncateMap(info.Redirects, 40) } }) } return } // visitURL 抓取一个 URL,提取关键词、缓存摘要、更新网站元信息,返回页面中发现的子链接。 // 限制返回的链接数,防止下一轮队列爆炸。 func (c *Crawler) visitURL(rawURL string) (hrefs []string) { atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1 _, _, _, hrefs = c.visitURLRaw(rawURL, false) // 限制返回的链接数,防止下一轮队列爆炸 maxLinks := config.MaxPriorityChildren() if maxLinks > 0 && len(hrefs) > maxLinks { hrefs = sampleStrings(hrefs, maxLinks) } return hrefs } // visitURLUnlimited 抓取一个 URL,返回页面中发现的子链接(无链接数限制)。 // 用于手动添加的 Priority URL,保留全部子链接,并强制重建索引(跳过增量检测)。 func (c *Crawler) visitURLUnlimited(rawURL string) (hrefs []string) { atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1 // forceIndex=true:Priority URL 强制重新提取关键词,不受内容未变化影响 _, _, _, hrefs = c.visitURLRaw(rawURL, true) return hrefs } // updateSiteFailure 当某 URL 抓取失败时,更新该网站的访问成功率(指数衰减)。 func (c *Crawler) updateSiteFailure(rawURL string) { host := netloc(rawURL) if host == "" { return } _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { if info.SuccessRate == nil { zero := 0.0 info.SuccessRate = &zero } // 成功率每次失败乘以 0.99(无限趋近 0) *info.SuccessRate *= 0.99 }) } // updateSiteSuccess 当某 URL 抓取成功时,更新网站的完整元信息。 // 使用 UpdateSiteInfo 原子读-改-写,避免并发 goroutine 对同一 host 的 SiteInfo 更新丢失。 func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, text string, hrefs []string) { now := time.Now().Unix() httpsAvailable := strings.HasPrefix(res.FinalURL, "https://") serverType := res.ServerType // 语言检测(CPU 密集,在锁外执行) var detectedLang string // 检测条件在 UpdateSiteInfo 回调内判断,这里预先计算好 detectedLang = c.analyzer.DetectLanguage(title + " " + desc + " " + text) // 收集外链(跨顶级域名的链接) superHost := superNetloc(res.FinalURL) var external []string for _, h := range hrefs { if superNetloc(h) != superHost { external = append(external, h) } } sampled := sampleStrings(external, 10) _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { // 访问计数 +1,更新最后访问时间 info.VisitCount++ info.LastVisitTime = now // 成功率更新:EWM(指数加权移动)平滑,每次 +0.01 one := 1.0 if info.SuccessRate == nil { info.SuccessRate = &one } *info.SuccessRate = *info.SuccessRate*0.99 + 0.01 // 记录是否支持 HTTPS if httpsAvailable { t := true info.HTTPSAvailable = &t } // 记录 HTTP Server 类型(去重,保留最近 5 个) if serverType != "" { found := false for _, s := range info.ServerTypes { if s == serverType { found = true break } } if !found { info.ServerTypes = append(info.ServerTypes, serverType) if len(info.ServerTypes) > 5 { info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:] } } } // 语言检测和出站链接收集(仅在前 10 次访问或 10% 概率下触发,减少开销) if info.VisitCount < 10 || rand.Float64() < 0.1 { if detectedLang != "" { if info.Languages == nil { info.Languages = make(map[string]float64) } // 首次访问强度高,随访问次数增加强度衰减 intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1))) for k := range info.Languages { info.Languages[k] *= (1 - intensity) // 旧语种按 intensity 衰减 } info.Languages[detectedLang] += intensity // 新语种增加 } // 外链 info.OutLinks = append(info.OutLinks, sampled...) if len(info.OutLinks) > 250 { info.OutLinks = sampleStrings(info.OutLinks, 200) } } }) } // sendToHarvester 将关键词索引数据通过 HTTP POST 发送到搜索服务器(/l 端点)。 // 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因收获服务故障而堆积。 func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { now := time.Now().Unix() // ---- 熔断检查(atomic,无锁) ---- state := atomic.LoadInt32(&c.circuitState) expiry := atomic.LoadInt64(&c.circuitExpiry) switch state { case circuitOpen: if now < expiry { return // 熔断中,直接跳过 } // 冷却结束,切换到半开,放行一个试探请求 atomic.StoreInt32(&c.circuitState, circuitHalfOpen) atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) case circuitHalfOpen: if now < expiry { return // 半开冷却中,只放行第一个,其余跳过 } // 半开超时,重新进入半开状态 atomic.StoreInt32(&c.circuitState, circuitHalfOpen) atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) } type payload struct { URL string `json:"url"` Keywords []analyzer.Keyword `json:"keywords"` } p := payload{URL: finalURL, Keywords: kws} data, err := json.Marshal(p) if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d/l", config.SearchServerPort()), bytes.NewReader(data)) if err != nil { return } req.Header.Set("Content-Type", "application/json") // ---- HTTP 请求(此时没有任何锁) ---- resp, err := http.DefaultClient.Do(req) // ---- 结果处理(atomic,无锁) ---- if err != nil { failures := atomic.AddInt32(&c.circuitFailures, 1) if failures >= circuitFailureThreshold { atomic.StoreInt32(&c.circuitState, circuitOpen) atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds)) //log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds",failures, circuitCooldownSeconds) } return } resp.Body.Close() // ---- 成功:重置熔断器 ---- atomic.StoreInt32(&c.circuitFailures, 0) atomic.StoreInt32(&c.circuitState, circuitClosed) } // schedule 从候选 URL 集合中选出下一轮 BFS 队列。 // 包含:域名集中度过滤、HTTP/HTTPS 比例控制、繁荣 URL 占比控制、加权随机采样。 func (c *Crawler) schedule(links []URLWeight) []string { // 候选过多时先随机采样到 10 万条,防止内存爆炸 if len(links) > 100000 { links = sampleURLWeights(links, 100000) } // 预加载所有涉及的网站信息(加速后续评分计算) domains := make(map[string]bool) for _, lw := range links { if h := netloc(lw.URL); h != "" { domains[h] = true } if h := superNetloc(lw.URL); h != "" { domains[h] = true } } siteCache := make(map[string]*storage.SiteInfo, len(domains)) var mu sync.Mutex var wg sync.WaitGroup for d := range domains { wg.Add(1) go func(host string) { defer wg.Done() info, _ := c.db.GetSiteInfo(host) mu.Lock() siteCache[host] = info mu.Unlock() }(d) } wg.Wait() // 对所有候选 URL 逐一计算调度优先级分数 scored_list := make([]scoredURL, len(links)) for i, lw := range links { scored_list[i] = scoredURL{url: lw.URL, score: c.scoreURL(lw, siteCache)} } // 加权随机采样:从高分到低分按权重概率抽取最多 k 条 k := min(45000, len(scored_list)/3+250) selected := weightedSample(scored_list, k) // 域名集中度过滤:限制每个域名被选中的数量,防止被少数网站垄断 selected = concentrationFilter(selected, config.CrawlFocus()) // 分离 HTTPS 和 HTTP 链接,HTTP 最多占 HTTPS 的 1/4 var httpsURLs, httpURLs []string for _, s := range selected { if strings.HasPrefix(s, "https://") { httpsURLs = append(httpsURLs, s) } else { httpURLs = append(httpURLs, s) } } maxHTTP := len(httpsURLs) / 4 if len(httpURLs) > maxHTTP { httpURLs = sampleStrings(httpURLs, maxHTTP) } // 分离繁荣(高反向链接)域名和普通域名,按比例控制繁荣 URL 占比 var prosperURLs, otherURLs []string for _, u := range append(httpsURLs, httpURLs...) { if c.prosperMap[netloc(u)] > 0 { prosperURLs = append(prosperURLs, u) } else { otherURLs = append(otherURLs, u) } } // 根据目标繁荣占比计算普通 URL 应保留数量 expectedProsperRatio := config.ExpectedProsperRatio() n := int(float64(len(prosperURLs)) * (1 - expectedProsperRatio) / expectedProsperRatio) if len(otherURLs) > n { keep := max(len(otherURLs)-len(selected)/10, n) if keep < len(otherURLs) { otherURLs = sampleStrings(otherURLs, keep) } } // 合并并随机打乱(使繁荣 URL 和普通 URL 混合) result := append(prosperURLs, otherURLs...) rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] }) return result } // scoreURL 计算单个 URL 的调度优先级分数。 // 综合考虑:中文语种权重、域名访问历史衰减、网站质量评分、繁荣值、URL 本身质量。 func (c *Crawler) scoreURL(lw URLWeight, siteCache map[string]*storage.SiteInfo) float64 { host := netloc(lw.URL) super := superNetloc(lw.URL) info := siteCache[host] if info == nil { info = &storage.SiteInfo{} } // 中文倾向性:该网站中文内容占比 var chineseness float64 = 0.5 if len(info.Languages) > 0 { total := 0.0 for _, v := range info.Languages { total += v } if total > 0 { chineseness = info.Languages["zh"] / total } } // 兴趣衰减:基于访问次数的指数衰减,繁荣域名可访问更多次 prosper := math.Min(62, c.prosperMap[host]) limit := prosper*500 + 50 b := math.Pow(0.1, 1/limit) interest := math.Pow(b, float64(info.VisitCount)) // 同理对顶级域名计算衰减(二级域名不够用时看顶级域名) var interest2 float64 = 1.0 if super != host { superInfo := siteCache[super] if superInfo != nil { limit2 := math.Min(62, c.prosperMap[super])*500 + 50 b2 := math.Pow(0.1, 1/limit2) interest2 = math.Pow(b2, float64(superInfo.VisitCount)) } } // 网站质量评分 quality := 1.0 if info.Quality != nil { quality = *info.Quality } // 繁荣值加分(log 变换平滑) prosperity := prosper if prosperity > 0 { prosperity += 0.5 } prosperity = math.Log2(2+prosperity) + 1 // URL 本身的质量惩罚(过长、路径过深、使用 .php/.htm 等) bad := badURL(lw.URL) return (0.1 + chineseness) * math.Min(0.05+interest, 0.05+interest2) * quality * (1 - bad) * lw.Weight * prosperity } // ---- 辅助函数 ---- // netloc 从原始 URL 字符串提取主机名(不含路径)。 // 支持 http:// 和 https:// 前缀,自动处理 URL 解析异常。 func netloc(rawURL string) string { parts := strings.SplitN(rawURL, "/", 4) if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { return parts[2] } u, err := url.Parse(rawURL) if err != nil { return "" } return u.Host } // superNetloc 返回顶级域名(去除子域名),例如 "www.example.com" → "example.com"。 // 用于识别跨子域名但同主站的情况。 func superNetloc(rawURL string) string { host := netloc(rawURL) parts := strings.Split(host, ".") if len(parts) >= 2 { return strings.Join(parts[len(parts)-2:], ".") } return host } // badURL 返回 URL 的"劣质"评分(0~0.9),基于长度、路径深度、文件扩展名等特征。 func badURL(u string) float64 { // URL 过长惩罚 s := math.Max(0, float64(len(u)-30)/200.0) // 使用 .htm/.php 等动态页面惩罚 if strings.Contains(u, ".htm") || strings.Contains(u, ".php") { s += (1 - s) * 0.3 } // 路径层级过深惩罚(超过 2 层斜杠) if strings.Count(strings.TrimRight(u, "/"), "/") > 2 { s += (1 - s) * 0.1 } // 极短 URL 或协议后冒号(如 ftp:)惩罚 if len(u) < 5 || u[4] == ':' { s += (1 - s) * 0.3 } return math.Min(s, 0.9) } // truncate 将字符串截断到最多 n 个字符。 func truncate(s string, n int) string { if len(s) <= n { return s } return s[:n] } // fnvHash 使用 FNV-1a 算法计算字符串的哈希值(十六进制字符串)。 // 用于增量重爬时检测页面正文是否发生变化。 func fnvHash(s string) string { h := fnv.New128a() h.Write([]byte(s)) return fmt.Sprintf("%x", h.Sum(nil)) } // sampleStrings 从字符串切片中随机不重复抽取 n 条。 func sampleStrings(s []string, n int) []string { if len(s) <= n { return s } perm := rand.Perm(len(s)) out := make([]string, n) for i := range out { out[i] = s[perm[i]] } return out } // sampleURLWeights 与 sampleStrings 相同,但处理 URLWeight 切片。 func sampleURLWeights(s []URLWeight, n int) []URLWeight { if len(s) <= n { return s } perm := rand.Perm(len(s)) out := make([]URLWeight, n) for i := range out { out[i] = s[perm[i]] } return out } // scoredURL 内部用结构体,存储 URL 和对应调度分数。 type scoredURL struct { url string score float64 } // weightedSample 加权随机采样(不放回):从 scoredURL 列表中按权重概率抽取最多 k 条。 // 使用累积概率法近似 alias method(适合中等规模数据)。 func weightedSample(items []scoredURL, k int) []string { if k >= len(items) { out := make([]string, len(items)) for i, s := range items { out[i] = s.url } return out } totalWeight := 0.0 for _, s := range items { totalWeight += s.score } selected := make(map[int]bool) out := make([]string, 0, k) for len(out) < k && len(selected) < len(items) { // 随机取 [0, totalWeight) 区间的一个点 r := rand.Float64() * totalWeight cum := 0.0 for i, s := range items { if selected[i] { continue } cum += s.score if cum >= r { selected[i] = true out = append(out, s.url) totalWeight -= s.score // 被选中后从总权重中移除(不放回) break } } } return out } // concentrationFilter 域名集中度过滤。 // 按 CrawlFocus 因子限制每个顶级域名被选中的 URL 数量,防止爬取过于集中在少数网站。 func concentrationFilter(urls []string, k float64) []string { // 按顶级域名分组 domainGroups := make(map[string][]string) shuffled := make([]string, len(urls)) copy(shuffled, urls) rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) for _, u := range shuffled { d := superNetloc(u) domainGroups[d] = append(domainGroups[d], u) } // 计算每组保留上限:域名规模越大允许越多,但按 k 次幂压制 limit := 10 if len(domainGroups) > 1 { sizes := make([]int, 0, len(domainGroups)) for _, g := range domainGroups { sizes = append(sizes, int(math.Pow(float64(len(g)), k))) } // 升序排列,去除最大一项,用其余项总和的 60% 作为全局上限 for i := 0; i < len(sizes)-1; i++ { for j := i + 1; j < len(sizes)-1; j++ { if sizes[j] < sizes[i] { sizes[i], sizes[j] = sizes[j], sizes[i] } } } total := 0 for _, s := range sizes[:len(sizes)-1] { total += s } limit = max(10, int(float64(total)*0.6)) } // 从每组中按计算的上限采样 var result []string for _, g := range domainGroups { sn := 1 + min(limit, int(math.Pow(float64(len(g)), k))) if sn > len(g) { sn = len(g) } result = append(result, g[:sn]...) } rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] }) return result } // truncateMap 将 map 裁剪到最多 n 条(取前 n 条,无特定顺序)。 func truncateMap(m map[string]string, n int) map[string]string { if len(m) <= n { return m } out := make(map[string]string, n) i := 0 for k, v := range m { if i >= n { break } out[k] = v i++ } return out } // min 返回两个整数中的较小值。 func min(a, b int) int { if a < b { return a } return b } // max 返回两个整数中的较大值。 func max(a, b int) int { if a > b { return a } return b } // GetStats 返回当前爬虫统计快照(用于监控)。 func (c *Crawler) GetStats() Stats { return Stats{ VisitedURLs: atomic.LoadInt64(&c.stats.VisitedURLs), SuccessURLs: atomic.LoadInt64(&c.stats.SuccessURLs), KeywordsFetched: atomic.LoadInt64(&c.stats.KeywordsFetched), } } // GetCrawlStatus 返回当前爬取状态(供前端监控)。 func (c *Crawler) GetCrawlStatus() CrawlStatus { c.crawlStatusMu.RLock() defer c.crawlStatusMu.RUnlock() return c.crawlStatus } // updateCrawlStatus 更新爬取状态(内部使用)。 func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) { c.crawlStatusMu.Lock() defer c.crawlStatusMu.Unlock() fn(&c.crawlStatus) }