1352 lines
43 KiB
Go
1352 lines
43 KiB
Go
// crawler.go — BFS crawl loop, URL scheduling, and site-info updating.
|
||
// crawler 包的主逻辑:BFS 爬取循环、URL 调度算法、网站元信息更新。
|
||
package crawler
|
||
|
||
import (
|
||
"bytes" // 字节缓冲(构造 HTTP POST 请求体)
|
||
"context" // context 超时控制
|
||
"encoding/json" // JSON 序列化(发送关键词数据到收获服务)
|
||
"fmt" // 格式化(构造目标地址)
|
||
"hash/fnv" // FNV 哈希(内容变化检测)
|
||
"log" // 日志输出
|
||
"math" // 数学运算(指数衰减、质量评分)
|
||
"math/rand" // 随机数(加权采样、队列打乱)
|
||
"net/http" // HTTP 客户端(POST 数据到收获服务)
|
||
"net/url" // URL 解析
|
||
"strings" // 字符串操作
|
||
"sync" // 互斥锁(保护并发收集结果)
|
||
"sync/atomic" // 原子操作(计数器,无锁并发更新)
|
||
"time" // 时间戳
|
||
|
||
"sese-engine/analyzer" // 文本分析和关键词提取
|
||
"sese-engine/config" // 全局配置常量
|
||
"sese-engine/parser" // HTML 解析(提取标题、正文、链接)
|
||
"sese-engine/storage" // 持久化存储
|
||
)
|
||
|
||
// Stats 存放爬虫实时统计计数器(使用 atomic 原子读取)。
|
||
type Stats struct {
|
||
VisitedURLs int64 // 已访问的 URL 总数(含失败)
|
||
SuccessURLs int64 // 成功抓取(HTTP 200)的 URL 数
|
||
KeywordsFetched int64 // 累计提取的关键词总数
|
||
}
|
||
|
||
// 熔断器状态(用 atomic int32 代替 mutex,避免持有锁时的慢 I/O)。
|
||
const (
|
||
circuitClosed int32 = iota // 正常:所有请求都发往 harvester
|
||
circuitOpen // 断开:连续失败 N 次后,冷却时间内跳过所有请求
|
||
circuitHalfOpen // 半开:冷却结束,尝试放行一次请求试探
|
||
)
|
||
|
||
const (
|
||
circuitFailureThreshold = 5 // 连续失败多少次后触发熔断
|
||
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
|
||
)
|
||
|
||
// Priority Worker 配置
|
||
const (
|
||
priorityMaxWorkers = 50 // Priority 独立 goroutine 上限(突破主 workers)
|
||
priorityQueueSize = 100 // Priority 任务队列缓冲大小
|
||
)
|
||
|
||
// Crawler 编排整个 BFS 爬取流程。
|
||
type Crawler struct {
|
||
fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流)
|
||
db *storage.DB // 持久化数据库
|
||
analyzer *analyzer.Analyzer // 分词和关键词分析
|
||
prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值")
|
||
stats Stats // 原子计数器
|
||
|
||
// visited 记录已访问的 URL 集合(跨 epoch 持久,启动时从 DB 预热)
|
||
visited map[string]bool
|
||
visitedMu sync.RWMutex // 保护 visited 的并发读写
|
||
|
||
// 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险)
|
||
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
|
||
circuitFailures int32 // 连续失败计数(atomic)
|
||
circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒)
|
||
|
||
// 运行时活跃线程计数(atomic,每轮 epoch 自动归零前重新开始计数)
|
||
activeWorkers int64
|
||
|
||
// 本轮发现的新链接计数(atomic,供前端实时监控)
|
||
newLinksCount int64
|
||
|
||
// ---- Priority Worker(独立 goroutine,不受主 workers 限制)----
|
||
priorityCh chan string // Priority URL 任务队列(用户手动添加)
|
||
priorityChildCh chan string // Priority 子链接队列(子 URL 继续由 priority worker 爬取)
|
||
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers)
|
||
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
|
||
priorityMu sync.RWMutex // 保护 priorityStats 和 priorityChildLinks
|
||
priorityStats struct {
|
||
pending int64 // 待处理的 Priority URL 数量(入队但未开始)
|
||
active int64 // 正在处理的 Priority URL 数量
|
||
}
|
||
// 孙链接(子 URL 的子链接)进入普通 BFS 队列
|
||
normalChildCh chan URLWeight // 孙链接 channel,由 Run 循环消费
|
||
// ---- 爬取状态暴露(供前端监控) ----
|
||
crawlStatusMu sync.RWMutex
|
||
crawlStatus CrawlStatus // 当前轮次状态
|
||
|
||
// ---- 优雅关闭 ----
|
||
stopCh chan struct{} // 关闭时表示收到停止信号
|
||
stopOnce sync.Once // 确保只关闭一次
|
||
priorityDone chan struct{} // priority worker 已完全停止
|
||
}
|
||
|
||
// CrawlStatus 暴露给前端监控的爬取状态
|
||
type CrawlStatus struct {
|
||
CurrentEpoch int `json:"current_epoch"` // 当前轮次(从1开始)
|
||
MaxEpoch int `json:"max_epoch"` // 总轮数上限
|
||
QueueLength int `json:"queue_length"` // 本轮队列长度
|
||
CompletedCount int `json:"completed_count"` // 本轮已完成的 URL 数
|
||
VisitedTotal int `json:"visited_total"` // 已收录 URL 总数
|
||
NewLinksCount int64 `json:"new_links_count"` // 本轮已发现的新链接数(实时更新)
|
||
NextPoolSize int `json:"next_pool_size"` // 下一轮链接池大小(调度后的队列长度)
|
||
IsRunning bool `json:"is_running"` // 是否正在运行
|
||
}
|
||
|
||
// 全局活跃线程计数器(跨包可读,无需持有 Crawler 引用)
|
||
var globalActiveWorkers int64
|
||
|
||
// ActiveWorkers 返回当前正在运行的爬虫 goroutine 数量。
|
||
// 也可通过包级函数 GlobalActiveWorkers() 读取(供 search 等外部包使用)。
|
||
func (c *Crawler) ActiveWorkers() int64 {
|
||
return atomic.LoadInt64(&c.activeWorkers)
|
||
}
|
||
|
||
// GlobalActiveWorkers 返回当前全局活跃爬虫 goroutine 数量(包级,外部包可直接调用)。
|
||
func GlobalActiveWorkers() int64 {
|
||
return atomic.LoadInt64(&globalActiveWorkers)
|
||
}
|
||
|
||
// 全局 Priority Status 快照(跨 Crawler 实例共享,用于外部监控)
|
||
var globalPriorityStatus struct {
|
||
pending int64
|
||
active int64
|
||
}
|
||
|
||
// 全局二级优先队列待处理计数(goroutine 异步发送,无法用 len() 直接获取)
|
||
var globalPriorityLevel2 int64
|
||
|
||
// GlobalPriorityStatus 返回当前全局 Priority Worker 状态。
|
||
func GlobalPriorityStatus() map[string]interface{} {
|
||
return map[string]interface{}{
|
||
"active": atomic.LoadInt64(&globalPriorityStatus.active),
|
||
"max_workers": priorityMaxWorkers,
|
||
"level1": atomic.LoadInt64(&globalPriorityStatus.pending),
|
||
"level2": atomic.LoadInt64(&globalPriorityLevel2),
|
||
"level2_queue": atomic.LoadInt64(&globalPriorityLevel2), // 待爬取
|
||
"level2_inflight": atomic.LoadInt64(&globalPriorityLevel2Inflight), // 正在爬取
|
||
}
|
||
}
|
||
|
||
// IncrementPriorityLevel2 增加二级队列计数。
|
||
func IncrementPriorityLevel2(n int64) {
|
||
atomic.AddInt64(&globalPriorityLevel2, n)
|
||
}
|
||
|
||
// DecrementPriorityLevel2 减少二级队列计数。
|
||
func DecrementPriorityLevel2(n int64) {
|
||
atomic.AddInt64(&globalPriorityLevel2, -n)
|
||
}
|
||
|
||
// ---- 二级正在爬取的计数(drain 时启动,goroutine 结束时扣减)----
|
||
var globalPriorityLevel2Inflight int64
|
||
|
||
func IncrementPriorityLevel2Inflight(n int64) {
|
||
atomic.AddInt64(&globalPriorityLevel2Inflight, n)
|
||
}
|
||
|
||
func DecrementPriorityLevel2Inflight(n int64) {
|
||
atomic.AddInt64(&globalPriorityLevel2Inflight, -n)
|
||
}
|
||
|
||
// New 创建一个 Crawler 实例。
|
||
// prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。
|
||
func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler {
|
||
c := &Crawler{
|
||
fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second),
|
||
db: db,
|
||
analyzer: a,
|
||
prosperMap: prosperMap,
|
||
visited: make(map[string]bool),
|
||
priorityCh: make(chan string, priorityQueueSize),
|
||
priorityChildCh: make(chan string, priorityQueueSize),
|
||
prioritySem: make(chan struct{}, priorityMaxWorkers),
|
||
normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢
|
||
stopCh: make(chan struct{}),
|
||
priorityDone: make(chan struct{}),
|
||
}
|
||
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
|
||
go c.runPriorityWorker()
|
||
// 启动时从 gate bucket 预热已爬取的 URL 集合(程序重启后不会重复爬取)
|
||
c.warmVisited()
|
||
return c
|
||
}
|
||
|
||
// warmVisited 从 DB 的 gate bucket 加载所有已缓存的 URL 到 visited set。
|
||
// 超过 RecrawlMaxAge 的 URL 不加入 visited,使其可以被重新爬取。
|
||
func (c *Crawler) warmVisited() {
|
||
count := 0
|
||
expired := 0
|
||
maxAge := int64(config.RecrawlMaxAge())
|
||
now := time.Now().Unix()
|
||
_ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error {
|
||
if now-entry.Timestamp < maxAge {
|
||
c.visited[u] = true // 未过期,仍然跳过
|
||
count++
|
||
} else {
|
||
expired++
|
||
}
|
||
return nil
|
||
})
|
||
log.Printf("[crawler] visited set warmed: %d active, %d expired (eligible for recrawl)", count, expired)
|
||
}
|
||
|
||
// startRecrawlTicker 启动后台定时任务,定期扫描并释放过期 URL 回到候选池。
|
||
// 已过期的 URL 从 visited map 中移除,使其可以在后续 BFS 轮次中被重新发现和爬取。
|
||
func (c *Crawler) startRecrawlTicker() {
|
||
interval := config.RecrawlCheckInterval()
|
||
if interval <= 0 {
|
||
return // 未配置或禁用
|
||
}
|
||
go func() {
|
||
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
||
defer ticker.Stop()
|
||
for range ticker.C {
|
||
maxAge := int64(config.RecrawlMaxAge())
|
||
batchSize := config.RecrawlBatchSize()
|
||
now := time.Now().Unix()
|
||
removed := 0
|
||
|
||
c.visitedMu.Lock()
|
||
_ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error {
|
||
if removed >= batchSize {
|
||
return fmt.Errorf("batch full") // 提前终止遍历
|
||
}
|
||
if now-entry.Timestamp >= maxAge && c.visited[u] {
|
||
delete(c.visited, u)
|
||
removed++
|
||
}
|
||
return nil
|
||
})
|
||
c.visitedMu.Unlock()
|
||
|
||
if removed > 0 {
|
||
log.Printf("[crawler] recrawl ticker: released %d expired URLs back to pool", removed)
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// markVisited 将 URL 标记为已访问(线程安全)。
|
||
func (c *Crawler) markVisited(url string) {
|
||
c.visitedMu.Lock()
|
||
c.visited[url] = true
|
||
c.visitedMu.Unlock()
|
||
}
|
||
|
||
// ---- Priority Worker(突破 workers 上限,立即爬取高优先级 URL)----
|
||
|
||
// runPriorityWorker 独立处理高优先级 URL,不受主 workers 限制。
|
||
// 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。
|
||
// - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列
|
||
// - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS
|
||
// 收到 stopCh 关闭信号后,等待当前批次完成然后退出。
|
||
func (c *Crawler) runPriorityWorker() {
|
||
defer close(c.priorityDone) // 通知 WaitUntilStopped
|
||
|
||
for {
|
||
// 检查停止信号
|
||
select {
|
||
case <-c.stopCh:
|
||
log.Println("[priority] stop signal received, waiting for pending work...")
|
||
// 不直接退出,等待当前正在运行的 goroutine 完成
|
||
// 但不再接受新任务,关闭 channel 防止新任务入队
|
||
close(c.priorityCh)
|
||
c.priorityWg.Wait()
|
||
log.Println("[priority] all pending work done, exiting")
|
||
return
|
||
default:
|
||
}
|
||
|
||
// ---- 第一阶段:drain 一级队列全部取出 ----
|
||
level1Batch := []string{}
|
||
var item string
|
||
// 先阻塞等第一个一级 URL(可被 stopCh 中断)
|
||
select {
|
||
case item = <-c.priorityCh:
|
||
level1Batch = append(level1Batch, item)
|
||
case <-c.stopCh:
|
||
// stopCh 刚关闭,从头重走退出流程
|
||
close(c.priorityCh)
|
||
c.priorityWg.Wait()
|
||
return
|
||
}
|
||
// drain 剩余一级 URL(非阻塞)
|
||
for {
|
||
select {
|
||
case item = <-c.priorityCh:
|
||
level1Batch = append(level1Batch, item)
|
||
default:
|
||
goto processLevel1
|
||
}
|
||
}
|
||
|
||
processLevel1:
|
||
// batch 收集完成后才扣减 pending,此时 pending = len(level1Batch) + len(priorityCh)
|
||
// 这样 level1 能正确反映队列中待处理的 URL 总数
|
||
log.Printf("[priority] drain level1 batch: count=%d, pending_before=%d, priorityCh_len=%d",
|
||
len(level1Batch), atomic.LoadInt64(&globalPriorityStatus.pending), len(c.priorityCh))
|
||
atomic.AddInt64(&globalPriorityStatus.pending, -int64(len(level1Batch)))
|
||
|
||
// ---- 启动全部一级 goroutine ----
|
||
for _, u := range level1Batch {
|
||
c.prioritySem <- struct{}{}
|
||
c.priorityMu.Lock()
|
||
c.priorityStats.active++
|
||
c.priorityMu.Unlock()
|
||
atomic.AddInt64(&globalPriorityStatus.active, 1)
|
||
c.priorityWg.Add(1)
|
||
go c.priorityCrawlLoop(u, 1)
|
||
}
|
||
|
||
// 等待一级 goroutine 全部完成
|
||
c.priorityWg.Wait()
|
||
|
||
// ---- 第二阶段:drain 二级队列全部取出 ----
|
||
level2Batch := []string{}
|
||
drained := int64(0)
|
||
for {
|
||
select {
|
||
case child := <-c.priorityChildCh:
|
||
level2Batch = append(level2Batch, child)
|
||
drained++
|
||
// 实时扣减 level2,使 API 在 drain 期间仍能看到剩余未处理的量
|
||
atomic.AddInt64(&globalPriorityLevel2, -1)
|
||
default:
|
||
goto processLevel2
|
||
}
|
||
}
|
||
|
||
processLevel2:
|
||
// drained 项已在 drain 循环中逐个从 level2 扣减
|
||
// 启动 goroutine 后这些 URL 进入"正在爬取"状态
|
||
IncrementPriorityLevel2Inflight(int64(len(level2Batch)))
|
||
log.Printf("[priority] drain level2 done: drained=%d, inflight_before_spawn=%d",
|
||
len(level2Batch), atomic.LoadInt64(&globalPriorityLevel2Inflight))
|
||
|
||
// ---- 启动全部二级 goroutine ----
|
||
for _, u := range level2Batch {
|
||
c.prioritySem <- struct{}{}
|
||
c.priorityMu.Lock()
|
||
c.priorityStats.active++
|
||
c.priorityMu.Unlock()
|
||
atomic.AddInt64(&globalPriorityStatus.active, 1)
|
||
c.priorityWg.Add(1)
|
||
go c.priorityCrawlLoop(u, 2)
|
||
}
|
||
|
||
// 等待二级 goroutine 全部完成,才进入下一轮(再次处理一级队列)
|
||
c.priorityWg.Wait()
|
||
}
|
||
}
|
||
|
||
// priorityCrawlLoop 爬取单个 URL:
|
||
//
|
||
// level=1(一级,手动 URL):visitURLUnlimited 无限爬子链接 → 二级队列(priorityChildCh)
|
||
// level=2(二级,子 URL):visitURLUnlimited 无限爬子链接 → 孙链接(normalChildCh)
|
||
func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
|
||
defer c.priorityWg.Done()
|
||
defer func() { <-c.prioritySem }()
|
||
defer func() {
|
||
c.priorityMu.Lock()
|
||
c.priorityStats.active--
|
||
c.priorityMu.Unlock()
|
||
atomic.AddInt64(&globalPriorityStatus.active, -1)
|
||
if level == 2 {
|
||
DecrementPriorityLevel2Inflight(1)
|
||
}
|
||
}()
|
||
|
||
// 标记 DB 中该 URL 为已访问,防止重启后再次被调度
|
||
_ = c.db.MarkPriorityURLVisited(rawURL)
|
||
|
||
// 两级都不限制子链接数量
|
||
children := c.visitURLUnlimited(rawURL)
|
||
|
||
//log.Printf("[crawler] priority[%d] crawl done: %s (%d child links)", level, rawURL, len(children))
|
||
|
||
if len(children) == 0 {
|
||
return
|
||
}
|
||
|
||
// 一级:子链接进二级队列;二级:子链接直接加入 newLinks(同步)
|
||
for _, child := range children {
|
||
if level == 1 {
|
||
select {
|
||
case c.priorityChildCh <- child:
|
||
IncrementPriorityLevel2(1)
|
||
default:
|
||
// 二级队列满,丢弃
|
||
}
|
||
} else {
|
||
// 二级:子链接进孙链接 channel,由 Run 永久 drain 直到全部到达
|
||
select {
|
||
case c.normalChildCh <- URLWeight{URL: child, Weight: 1.0}:
|
||
default:
|
||
// channel 满(很少发生),孙链接丢弃
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// TriggerPriorityCrawl 立即触发高优先级爬取(突破 workers 上限)。
|
||
// 适合用户手动插入 URL 时立即响应。
|
||
func (c *Crawler) TriggerPriorityCrawl(url string) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// priorityCh 已关闭(Run 已退出),忽略
|
||
log.Printf("[crawler] priority crawl ignored (crawler stopped): %s", url)
|
||
}
|
||
}()
|
||
select {
|
||
case c.priorityCh <- url:
|
||
c.priorityMu.Lock()
|
||
c.priorityStats.pending++
|
||
c.priorityMu.Unlock()
|
||
atomic.AddInt64(&globalPriorityStatus.pending, 1)
|
||
log.Printf("[crawler] priority crawl triggered: %s", url)
|
||
default:
|
||
// 队列满了,降级到正常处理
|
||
log.Printf("[crawler] priority queue full, deferring to normal: %s", url)
|
||
}
|
||
}
|
||
|
||
// GetPriorityStatus 返回 Priority Worker 的实时状态。
|
||
func (c *Crawler) GetPriorityStatus() map[string]interface{} {
|
||
c.priorityMu.RLock()
|
||
defer c.priorityMu.RUnlock()
|
||
return map[string]interface{}{
|
||
"pending": c.priorityStats.pending,
|
||
"active": c.priorityStats.active,
|
||
"max_workers": priorityMaxWorkers,
|
||
}
|
||
}
|
||
|
||
// isVisited 检查 URL 是否已访问(线程安全)。
|
||
func (c *Crawler) isVisited(url string) bool {
|
||
c.visitedMu.RLock()
|
||
v := c.visited[url]
|
||
c.visitedMu.RUnlock()
|
||
return v
|
||
}
|
||
|
||
// fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs,
|
||
// 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。
|
||
// 返回本次插入队列的 URL 数量。
|
||
func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int {
|
||
entries, err := c.db.GetPriorityURLs()
|
||
if err != nil || len(entries) == 0 {
|
||
return 0
|
||
}
|
||
|
||
added := 0
|
||
for _, e := range entries {
|
||
if c.isVisited(e.URL) {
|
||
_ = c.db.RemovePriorityURL(e.URL)
|
||
continue
|
||
}
|
||
*queue = append([]string{e.URL}, *queue...)
|
||
added++
|
||
}
|
||
|
||
_ = c.db.ClearVisitedPriorityURLs()
|
||
return added
|
||
}
|
||
|
||
// URLWeight 将 URL 和发现权重打包在一起,用于调度决策。
|
||
type URLWeight struct {
|
||
URL string // 待访问的 URL
|
||
Weight float64 // 发现权重(从父页面分得的"关注度",页面链接越多则每个分得越少)
|
||
}
|
||
|
||
// Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。
|
||
// 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。
|
||
// 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。
|
||
func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
||
c.markVisited(entryURL)
|
||
queue := []string{entryURL}
|
||
|
||
// 初始化爬取状态
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.MaxEpoch = maxEpoch
|
||
cs.IsRunning = true
|
||
})
|
||
|
||
// 启动后台重爬定时器:定期释放过期 URL 到候选池
|
||
c.startRecrawlTicker()
|
||
|
||
for ep := 0; ep < maxEpoch; ep++ {
|
||
// 检查停止信号
|
||
select {
|
||
case <-c.stopCh:
|
||
log.Println("[crawler] stop signal received, exiting run loop")
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.IsRunning = false
|
||
})
|
||
return
|
||
default:
|
||
}
|
||
|
||
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
|
||
workers := config.CrawlerWorkers()
|
||
|
||
// 重置新链接计数器
|
||
atomic.StoreInt64(&c.newLinksCount, 0)
|
||
|
||
// 更新爬取状态:新一轮开始
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.CurrentEpoch = ep + 1
|
||
cs.QueueLength = len(queue)
|
||
cs.CompletedCount = 0
|
||
cs.NewLinksCount = 0
|
||
})
|
||
|
||
// 每轮开始前:拉取 priority URLs,插入队列前端
|
||
priorityAdded := c.fetchAndApplyPriorityURLs(&queue)
|
||
if priorityAdded > 0 {
|
||
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers)
|
||
} else {
|
||
log.Printf("[crawler] epoch %d/%d queue=%d workers=%d", ep+1, maxEpoch, len(queue), workers)
|
||
}
|
||
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
|
||
for _, u := range queue {
|
||
c.markVisited(u)
|
||
}
|
||
|
||
// ---- 并发抓取本轮所有 URL ----
|
||
var (
|
||
newLinks []URLWeight // 收集下一轮候选 URL
|
||
mu sync.Mutex // 保护 newLinks 的并发写入
|
||
wg sync.WaitGroup
|
||
completed int64 // 本轮已完成的计数(atomic)
|
||
)
|
||
|
||
// 信号量:限制同时并发数(使用上方读取的 workers 值)
|
||
sem := make(chan struct{}, workers)
|
||
|
||
for _, u := range queue {
|
||
wg.Add(1)
|
||
sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位)
|
||
atomic.AddInt64(&c.activeWorkers, 1)
|
||
atomic.AddInt64(&globalActiveWorkers, 1)
|
||
go func(rawURL string) {
|
||
defer wg.Done()
|
||
defer func() { <-sem }() // 释放令牌
|
||
defer atomic.AddInt64(&c.activeWorkers, -1)
|
||
defer atomic.AddInt64(&globalActiveWorkers, -1)
|
||
|
||
// 抓取单个 URL,返回发现的子链接
|
||
hrefs := c.visitURL(rawURL)
|
||
|
||
// 更新完成计数
|
||
currentCompleted := atomic.AddInt64(&completed, 1)
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.CompletedCount = int(currentCompleted)
|
||
})
|
||
|
||
n := len(hrefs)
|
||
if n == 0 {
|
||
return
|
||
}
|
||
|
||
// 收集未访问的子链接
|
||
var children []string
|
||
for _, h := range hrefs {
|
||
if !c.isVisited(h) {
|
||
children = append(children, h)
|
||
}
|
||
}
|
||
if len(children) == 0 {
|
||
return
|
||
}
|
||
|
||
// 分配权重
|
||
w := 1.0 / float64(n)
|
||
|
||
// 子 URL 进入 newLinks 调度,由普通 BFS 下一轮处理。
|
||
// (priority worker 通过 priorityChildCh 独立爬取子 URL,两者互不干扰)
|
||
mu.Lock()
|
||
for _, h := range children {
|
||
newLinks = append(newLinks, URLWeight{URL: h, Weight: w})
|
||
// 实时自增计数(供前端监控)
|
||
atomic.AddInt64(&c.newLinksCount, 1)
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.NewLinksCount = atomic.LoadInt64(&c.newLinksCount)
|
||
})
|
||
}
|
||
mu.Unlock()
|
||
}(u)
|
||
}
|
||
wg.Wait()
|
||
|
||
// drain 孙链接 channel(孙链接来自 priority 爬取的子链接,进入普通 BFS 队列)
|
||
// runPriorityWorker 发送完毕后等待足够时间,确保 channel 中最后几条到达
|
||
timeout := time.After(10 * time.Second)
|
||
drained := false
|
||
for !drained {
|
||
select {
|
||
case gc := <-c.normalChildCh:
|
||
newLinks = append(newLinks, gc)
|
||
atomic.AddInt64(&c.newLinksCount, 1) // 孙链接也要计入
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.NewLinksCount = atomic.LoadInt64(&c.newLinksCount)
|
||
})
|
||
case <-timeout:
|
||
drained = true
|
||
}
|
||
}
|
||
|
||
// 更新已收录总数
|
||
c.visitedMu.RLock()
|
||
visitedTotal := len(c.visited)
|
||
c.visitedMu.RUnlock()
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.VisitedTotal = visitedTotal
|
||
})
|
||
|
||
// 本轮没有发现新链接,报告停止,等待新 Priority URL 触发重启
|
||
if len(newLinks) == 0 {
|
||
log.Println("[crawler] empty — stopped, waiting for new URLs")
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.IsRunning = false
|
||
})
|
||
// 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始
|
||
// 使用 ticker 避免循环内重复创建 timer
|
||
waitTicker := time.NewTicker(100 * time.Millisecond)
|
||
defer waitTicker.Stop()
|
||
for {
|
||
select {
|
||
case gc, ok := <-c.normalChildCh:
|
||
if !ok {
|
||
// channel 已关闭(正常情况或 stop 时关闭)
|
||
return
|
||
}
|
||
newLinks = append(newLinks, gc)
|
||
log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks))
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.IsRunning = true
|
||
})
|
||
ep = -1 // continue 后 ep++ 变成 0
|
||
goto restartEpochLoop // 退出空循环,进入正常队列处理
|
||
case <-c.stopCh:
|
||
// 收到停止信号
|
||
return
|
||
case <-waitTicker.C:
|
||
// 每 100ms 检查一次,降低 CPU 占用
|
||
}
|
||
}
|
||
restartEpochLoop:
|
||
}
|
||
|
||
// 调度算法:从候选 URL 中选出下一轮要抓取的队列
|
||
nextPoolSize := len(newLinks)
|
||
queue = c.schedule(newLinks)
|
||
|
||
// 更新下一轮链接池大小
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.NextPoolSize = nextPoolSize
|
||
})
|
||
}
|
||
// 所有轮次完成,更新状态
|
||
c.updateCrawlStatus(func(cs *CrawlStatus) {
|
||
cs.IsRunning = false
|
||
})
|
||
}
|
||
|
||
// visitURLRaw 抓取 URL 的核心逻辑,提取标题、描述、正文、子链接。
|
||
// 不包含链接数限制,用于优先级爬取。
|
||
// forceIndex=true 时强制跳过增量重爬检测,直接提取关键词写入索引(用于 Priority URL)。
|
||
func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text string, hrefs []string) {
|
||
// recover 保护:防止任何模块(analyzer/storage/parser)的 panic 杀死 goroutine
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("[crawler] visitURL panic recovered: url=%s error=%v", rawURL, r)
|
||
}
|
||
}()
|
||
|
||
// 使用 sync.WaitGroup + select 实现硬超时包装器,
|
||
// 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。
|
||
// 注意:优雅关闭时,如果恰好在抓取过程中收到停止信号,
|
||
// goroutine 会等待最多 35s(fetchTimeout + 5s)后强制退出。
|
||
fetchTimeout := 30 * time.Second
|
||
var res *FetchResult
|
||
var fetchErr error
|
||
var wg sync.WaitGroup
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
// 礼貌模式抓取(遵守 robots.txt + 限流),限制页面大小防止内存爆炸
|
||
res, fetchErr = c.fetcher.fetchWithHistory(rawURL, true, fetchTimeout, config.MaxPageSize())
|
||
}()
|
||
waitCh := make(chan struct{})
|
||
go func() {
|
||
wg.Wait()
|
||
close(waitCh)
|
||
}()
|
||
select {
|
||
case <-waitCh:
|
||
// fetch 正常返回(成功或错误)
|
||
case <-time.After(fetchTimeout + 5*time.Second):
|
||
log.Printf("[crawler] fetch timeout: %s", rawURL)
|
||
}
|
||
|
||
if fetchErr != nil || res == nil {
|
||
c.updateSiteFailure(rawURL) // 记录失败,更新该网站成功率
|
||
return
|
||
}
|
||
|
||
atomic.AddInt64(&c.stats.SuccessURLs, 1) // 成功计数器 +1
|
||
|
||
// 解析 HTML:提取标题、描述、正文和所有超链接
|
||
title, desc, text, hrefs = parser.ParseHTML(res.Body, res.FinalURL)
|
||
|
||
// 计算正文内容哈希(FNV-1a),用于增量重爬检测
|
||
contentHash := fnvHash(text)
|
||
|
||
// 增量重爬检测:查询上次爬取的哈希,内容未变则跳过关键词提取
|
||
isRecrawl := false
|
||
oldEntry, _ := c.db.GetSnippet(res.FinalURL)
|
||
if !forceIndex && oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash {
|
||
isRecrawl = true
|
||
//log.Printf("[crawler] unchanged (recrawl skip): %s", res.FinalURL)
|
||
}
|
||
|
||
// 缓存 URL 摘要(仅对短 URL 缓存,防止超长 URL 浪费空间)
|
||
if len(res.FinalURL) < 250 {
|
||
_ = c.db.SetSnippet(res.FinalURL, &storage.SnippetEntry{
|
||
Title: title,
|
||
Description: truncate(desc, 256),
|
||
Text: truncate(text, 256),
|
||
Timestamp: time.Now().Unix(),
|
||
ContentHash: contentHash,
|
||
})
|
||
}
|
||
|
||
// 关键词提取:将标题/描述/正文交给 analyzer 计算关键词权重
|
||
// 增量优化:如果内容未变化(重爬),跳过关键词提取和索引更新
|
||
if !isRecrawl {
|
||
kws := c.analyzer.Analyze(title, desc, text)
|
||
if len(kws) > 0 {
|
||
// 限制每个页面最多发送的关键词数量
|
||
maxKws := config.MaxKeywordsPerPage()
|
||
if len(kws) > maxKws {
|
||
kws = kws[:maxKws]
|
||
}
|
||
atomic.AddInt64(&c.stats.KeywordsFetched, int64(len(kws)))
|
||
// 异步发送到收获服务器写入倒排索引(不阻塞爬取流程)
|
||
go c.sendToHarvester(res.FinalURL, kws)
|
||
}
|
||
}
|
||
|
||
// 更新网站元信息(成功访问)
|
||
host := netloc(res.FinalURL)
|
||
c.updateSiteSuccess(host, res, title, desc, text, hrefs)
|
||
|
||
// 处理永久重定向:更新源主机名下的重定向映射
|
||
for from, to := range res.Redirects {
|
||
fromHost := netloc(from)
|
||
if fromHost == "" {
|
||
continue
|
||
}
|
||
_ = c.db.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) {
|
||
if info.Redirects == nil {
|
||
info.Redirects = make(map[string]string)
|
||
}
|
||
info.Redirects[from] = to
|
||
if len(info.Redirects) > 50 {
|
||
info.Redirects = truncateMap(info.Redirects, 40)
|
||
}
|
||
})
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// visitURL 抓取一个 URL,提取关键词、缓存摘要、更新网站元信息,返回页面中发现的子链接。
|
||
// 限制返回的链接数,防止下一轮队列爆炸。
|
||
func (c *Crawler) visitURL(rawURL string) (hrefs []string) {
|
||
atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1
|
||
_, _, _, hrefs = c.visitURLRaw(rawURL, false)
|
||
|
||
// 限制返回的链接数,防止下一轮队列爆炸
|
||
maxLinks := config.MaxPriorityChildren()
|
||
if maxLinks > 0 && len(hrefs) > maxLinks {
|
||
hrefs = sampleStrings(hrefs, maxLinks)
|
||
}
|
||
return hrefs
|
||
}
|
||
|
||
// visitURLUnlimited 抓取一个 URL,返回页面中发现的子链接(无链接数限制)。
|
||
// 用于手动添加的 Priority URL,保留全部子链接,并强制重建索引(跳过增量检测)。
|
||
func (c *Crawler) visitURLUnlimited(rawURL string) (hrefs []string) {
|
||
atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1
|
||
// forceIndex=true:Priority URL 强制重新提取关键词,不受内容未变化影响
|
||
_, _, _, hrefs = c.visitURLRaw(rawURL, true)
|
||
return hrefs
|
||
}
|
||
|
||
// updateSiteFailure 当某 URL 抓取失败时,更新该网站的访问成功率(指数衰减)。
|
||
func (c *Crawler) updateSiteFailure(rawURL string) {
|
||
host := netloc(rawURL)
|
||
if host == "" {
|
||
return
|
||
}
|
||
_ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) {
|
||
if info.SuccessRate == nil {
|
||
zero := 0.0
|
||
info.SuccessRate = &zero
|
||
}
|
||
// 成功率每次失败乘以 0.99(无限趋近 0)
|
||
*info.SuccessRate *= 0.99
|
||
})
|
||
}
|
||
|
||
// updateSiteSuccess 当某 URL 抓取成功时,更新网站的完整元信息。
|
||
// 使用 UpdateSiteInfo 原子读-改-写,避免并发 goroutine 对同一 host 的 SiteInfo 更新丢失。
|
||
func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, text string, hrefs []string) {
|
||
now := time.Now().Unix()
|
||
httpsAvailable := strings.HasPrefix(res.FinalURL, "https://")
|
||
serverType := res.ServerType
|
||
|
||
// 语言检测(CPU 密集,在锁外执行)
|
||
var detectedLang string
|
||
// 检测条件在 UpdateSiteInfo 回调内判断,这里预先计算好
|
||
detectedLang = c.analyzer.DetectLanguage(title + " " + desc + " " + text)
|
||
|
||
// 收集外链(跨顶级域名的链接)
|
||
superHost := superNetloc(res.FinalURL)
|
||
var external []string
|
||
for _, h := range hrefs {
|
||
if superNetloc(h) != superHost {
|
||
external = append(external, h)
|
||
}
|
||
}
|
||
sampled := sampleStrings(external, 10)
|
||
|
||
_ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) {
|
||
// 访问计数 +1,更新最后访问时间
|
||
info.VisitCount++
|
||
info.LastVisitTime = now
|
||
|
||
// 成功率更新:EWM(指数加权移动)平滑,每次 +0.01
|
||
one := 1.0
|
||
if info.SuccessRate == nil {
|
||
info.SuccessRate = &one
|
||
}
|
||
*info.SuccessRate = *info.SuccessRate*0.99 + 0.01
|
||
|
||
// 记录是否支持 HTTPS
|
||
if httpsAvailable {
|
||
t := true
|
||
info.HTTPSAvailable = &t
|
||
}
|
||
|
||
// 记录 HTTP Server 类型(去重,保留最近 5 个)
|
||
if serverType != "" {
|
||
found := false
|
||
for _, s := range info.ServerTypes {
|
||
if s == serverType {
|
||
found = true
|
||
break
|
||
}
|
||
}
|
||
if !found {
|
||
info.ServerTypes = append(info.ServerTypes, serverType)
|
||
if len(info.ServerTypes) > 5 {
|
||
info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:]
|
||
}
|
||
}
|
||
}
|
||
|
||
// 语言检测和出站链接收集(仅在前 10 次访问或 10% 概率下触发,减少开销)
|
||
if info.VisitCount < 10 || rand.Float64() < 0.1 {
|
||
if detectedLang != "" {
|
||
if info.Languages == nil {
|
||
info.Languages = make(map[string]float64)
|
||
}
|
||
// 首次访问强度高,随访问次数增加强度衰减
|
||
intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1)))
|
||
for k := range info.Languages {
|
||
info.Languages[k] *= (1 - intensity) // 旧语种按 intensity 衰减
|
||
}
|
||
info.Languages[detectedLang] += intensity // 新语种增加
|
||
}
|
||
|
||
// 外链
|
||
info.OutLinks = append(info.OutLinks, sampled...)
|
||
if len(info.OutLinks) > 250 {
|
||
info.OutLinks = sampleStrings(info.OutLinks, 200)
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
// sendToHarvester 将关键词索引数据通过 HTTP POST 发送到搜索服务器(/l 端点)。
|
||
// 熔断器基于 atomic 实现(无 mutex,不在持有锁时做慢 I/O),确保 goroutine 不会因收获服务故障而堆积。
|
||
func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) {
|
||
now := time.Now().Unix()
|
||
|
||
// ---- 熔断检查(atomic,无锁) ----
|
||
state := atomic.LoadInt32(&c.circuitState)
|
||
expiry := atomic.LoadInt64(&c.circuitExpiry)
|
||
|
||
switch state {
|
||
case circuitOpen:
|
||
if now < expiry {
|
||
return // 熔断中,直接跳过
|
||
}
|
||
// 冷却结束,切换到半开,放行一个试探请求
|
||
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
|
||
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
|
||
case circuitHalfOpen:
|
||
if now < expiry {
|
||
return // 半开冷却中,只放行第一个,其余跳过
|
||
}
|
||
// 半开超时,重新进入半开状态
|
||
atomic.StoreInt32(&c.circuitState, circuitHalfOpen)
|
||
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
|
||
}
|
||
|
||
type payload struct {
|
||
URL string `json:"url"`
|
||
Keywords []analyzer.Keyword `json:"keywords"`
|
||
}
|
||
p := payload{URL: finalURL, Keywords: kws}
|
||
data, err := json.Marshal(p)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost:%d/l", config.SearchServerPort()), bytes.NewReader(data))
|
||
if err != nil {
|
||
return
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
|
||
// ---- HTTP 请求(此时没有任何锁) ----
|
||
resp, err := http.DefaultClient.Do(req)
|
||
|
||
// ---- 结果处理(atomic,无锁) ----
|
||
if err != nil {
|
||
failures := atomic.AddInt32(&c.circuitFailures, 1)
|
||
if failures >= circuitFailureThreshold {
|
||
atomic.StoreInt32(&c.circuitState, circuitOpen)
|
||
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
|
||
//log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds",failures, circuitCooldownSeconds)
|
||
}
|
||
return
|
||
}
|
||
resp.Body.Close()
|
||
|
||
// ---- 成功:重置熔断器 ----
|
||
atomic.StoreInt32(&c.circuitFailures, 0)
|
||
atomic.StoreInt32(&c.circuitState, circuitClosed)
|
||
}
|
||
|
||
// schedule 从候选 URL 集合中选出下一轮 BFS 队列。
|
||
// 包含:域名集中度过滤、HTTP/HTTPS 比例控制、繁荣 URL 占比控制、加权随机采样。
|
||
func (c *Crawler) schedule(links []URLWeight) []string {
|
||
// 候选过多时先随机采样到 10 万条,防止内存爆炸
|
||
if len(links) > 100000 {
|
||
links = sampleURLWeights(links, 100000)
|
||
}
|
||
|
||
// 预加载所有涉及的网站信息(加速后续评分计算)
|
||
domains := make(map[string]bool)
|
||
for _, lw := range links {
|
||
if h := netloc(lw.URL); h != "" {
|
||
domains[h] = true
|
||
}
|
||
if h := superNetloc(lw.URL); h != "" {
|
||
domains[h] = true
|
||
}
|
||
}
|
||
siteCache := make(map[string]*storage.SiteInfo, len(domains))
|
||
var mu sync.Mutex
|
||
var wg sync.WaitGroup
|
||
for d := range domains {
|
||
wg.Add(1)
|
||
go func(host string) {
|
||
defer wg.Done()
|
||
info, _ := c.db.GetSiteInfo(host)
|
||
mu.Lock()
|
||
siteCache[host] = info
|
||
mu.Unlock()
|
||
}(d)
|
||
}
|
||
wg.Wait()
|
||
|
||
// 对所有候选 URL 逐一计算调度优先级分数
|
||
scored_list := make([]scoredURL, len(links))
|
||
for i, lw := range links {
|
||
scored_list[i] = scoredURL{url: lw.URL, score: c.scoreURL(lw, siteCache)}
|
||
}
|
||
|
||
// 加权随机采样:从高分到低分按权重概率抽取最多 k 条
|
||
k := min(45000, len(scored_list)/3+250)
|
||
selected := weightedSample(scored_list, k)
|
||
|
||
// 域名集中度过滤:限制每个域名被选中的数量,防止被少数网站垄断
|
||
selected = concentrationFilter(selected, config.CrawlFocus())
|
||
|
||
// 分离 HTTPS 和 HTTP 链接,HTTP 最多占 HTTPS 的 1/4
|
||
var httpsURLs, httpURLs []string
|
||
for _, s := range selected {
|
||
if strings.HasPrefix(s, "https://") {
|
||
httpsURLs = append(httpsURLs, s)
|
||
} else {
|
||
httpURLs = append(httpURLs, s)
|
||
}
|
||
}
|
||
maxHTTP := len(httpsURLs) / 4
|
||
if len(httpURLs) > maxHTTP {
|
||
httpURLs = sampleStrings(httpURLs, maxHTTP)
|
||
}
|
||
|
||
// 分离繁荣(高反向链接)域名和普通域名,按比例控制繁荣 URL 占比
|
||
var prosperURLs, otherURLs []string
|
||
for _, u := range append(httpsURLs, httpURLs...) {
|
||
if c.prosperMap[netloc(u)] > 0 {
|
||
prosperURLs = append(prosperURLs, u)
|
||
} else {
|
||
otherURLs = append(otherURLs, u)
|
||
}
|
||
}
|
||
// 根据目标繁荣占比计算普通 URL 应保留数量
|
||
expectedProsperRatio := config.ExpectedProsperRatio()
|
||
n := int(float64(len(prosperURLs)) * (1 - expectedProsperRatio) / expectedProsperRatio)
|
||
if len(otherURLs) > n {
|
||
keep := max(len(otherURLs)-len(selected)/10, n)
|
||
if keep < len(otherURLs) {
|
||
otherURLs = sampleStrings(otherURLs, keep)
|
||
}
|
||
}
|
||
|
||
// 合并并随机打乱(使繁荣 URL 和普通 URL 混合)
|
||
result := append(prosperURLs, otherURLs...)
|
||
rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] })
|
||
return result
|
||
}
|
||
|
||
// scoreURL 计算单个 URL 的调度优先级分数。
|
||
// 综合考虑:中文语种权重、域名访问历史衰减、网站质量评分、繁荣值、URL 本身质量。
|
||
func (c *Crawler) scoreURL(lw URLWeight, siteCache map[string]*storage.SiteInfo) float64 {
|
||
host := netloc(lw.URL)
|
||
super := superNetloc(lw.URL)
|
||
|
||
info := siteCache[host]
|
||
if info == nil {
|
||
info = &storage.SiteInfo{}
|
||
}
|
||
|
||
// 中文倾向性:该网站中文内容占比
|
||
var chineseness float64 = 0.5
|
||
if len(info.Languages) > 0 {
|
||
total := 0.0
|
||
for _, v := range info.Languages {
|
||
total += v
|
||
}
|
||
if total > 0 {
|
||
chineseness = info.Languages["zh"] / total
|
||
}
|
||
}
|
||
|
||
// 兴趣衰减:基于访问次数的指数衰减,繁荣域名可访问更多次
|
||
prosper := math.Min(62, c.prosperMap[host])
|
||
limit := prosper*500 + 50
|
||
b := math.Pow(0.1, 1/limit)
|
||
interest := math.Pow(b, float64(info.VisitCount))
|
||
|
||
// 同理对顶级域名计算衰减(二级域名不够用时看顶级域名)
|
||
var interest2 float64 = 1.0
|
||
if super != host {
|
||
superInfo := siteCache[super]
|
||
if superInfo != nil {
|
||
limit2 := math.Min(62, c.prosperMap[super])*500 + 50
|
||
b2 := math.Pow(0.1, 1/limit2)
|
||
interest2 = math.Pow(b2, float64(superInfo.VisitCount))
|
||
}
|
||
}
|
||
|
||
// 网站质量评分
|
||
quality := 1.0
|
||
if info.Quality != nil {
|
||
quality = *info.Quality
|
||
}
|
||
|
||
// 繁荣值加分(log 变换平滑)
|
||
prosperity := prosper
|
||
if prosperity > 0 {
|
||
prosperity += 0.5
|
||
}
|
||
prosperity = math.Log2(2+prosperity) + 1
|
||
|
||
// URL 本身的质量惩罚(过长、路径过深、使用 .php/.htm 等)
|
||
bad := badURL(lw.URL)
|
||
return (0.1 + chineseness) * math.Min(0.05+interest, 0.05+interest2) * quality * (1 - bad) * lw.Weight * prosperity
|
||
}
|
||
|
||
// ---- 辅助函数 ----
|
||
|
||
// netloc 从原始 URL 字符串提取主机名(不含路径)。
|
||
// 支持 http:// 和 https:// 前缀,自动处理 URL 解析异常。
|
||
func netloc(rawURL string) string {
|
||
parts := strings.SplitN(rawURL, "/", 4)
|
||
if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" {
|
||
return parts[2]
|
||
}
|
||
u, err := url.Parse(rawURL)
|
||
if err != nil {
|
||
return ""
|
||
}
|
||
return u.Host
|
||
}
|
||
|
||
// superNetloc 返回顶级域名(去除子域名),例如 "www.example.com" → "example.com"。
|
||
// 用于识别跨子域名但同主站的情况。
|
||
func superNetloc(rawURL string) string {
|
||
host := netloc(rawURL)
|
||
parts := strings.Split(host, ".")
|
||
if len(parts) >= 2 {
|
||
return strings.Join(parts[len(parts)-2:], ".")
|
||
}
|
||
return host
|
||
}
|
||
|
||
// badURL 返回 URL 的"劣质"评分(0~0.9),基于长度、路径深度、文件扩展名等特征。
|
||
func badURL(u string) float64 {
|
||
// URL 过长惩罚
|
||
s := math.Max(0, float64(len(u)-30)/200.0)
|
||
// 使用 .htm/.php 等动态页面惩罚
|
||
if strings.Contains(u, ".htm") || strings.Contains(u, ".php") {
|
||
s += (1 - s) * 0.3
|
||
}
|
||
// 路径层级过深惩罚(超过 2 层斜杠)
|
||
if strings.Count(strings.TrimRight(u, "/"), "/") > 2 {
|
||
s += (1 - s) * 0.1
|
||
}
|
||
// 极短 URL 或协议后冒号(如 ftp:)惩罚
|
||
if len(u) < 5 || u[4] == ':' {
|
||
s += (1 - s) * 0.3
|
||
}
|
||
return math.Min(s, 0.9)
|
||
}
|
||
|
||
// truncate 将字符串截断到最多 n 个字符。
|
||
func truncate(s string, n int) string {
|
||
if len(s) <= n {
|
||
return s
|
||
}
|
||
return s[:n]
|
||
}
|
||
|
||
// fnvHash 使用 FNV-1a 算法计算字符串的哈希值(十六进制字符串)。
|
||
// 用于增量重爬时检测页面正文是否发生变化。
|
||
func fnvHash(s string) string {
|
||
h := fnv.New128a()
|
||
h.Write([]byte(s))
|
||
return fmt.Sprintf("%x", h.Sum(nil))
|
||
}
|
||
|
||
// sampleStrings 从字符串切片中随机不重复抽取 n 条。
|
||
func sampleStrings(s []string, n int) []string {
|
||
if len(s) <= n {
|
||
return s
|
||
}
|
||
perm := rand.Perm(len(s))
|
||
out := make([]string, n)
|
||
for i := range out {
|
||
out[i] = s[perm[i]]
|
||
}
|
||
return out
|
||
}
|
||
|
||
// sampleURLWeights 与 sampleStrings 相同,但处理 URLWeight 切片。
|
||
func sampleURLWeights(s []URLWeight, n int) []URLWeight {
|
||
if len(s) <= n {
|
||
return s
|
||
}
|
||
perm := rand.Perm(len(s))
|
||
out := make([]URLWeight, n)
|
||
for i := range out {
|
||
out[i] = s[perm[i]]
|
||
}
|
||
return out
|
||
}
|
||
|
||
// scoredURL 内部用结构体,存储 URL 和对应调度分数。
|
||
type scoredURL struct {
|
||
url string
|
||
score float64
|
||
}
|
||
|
||
// weightedSample 加权随机采样(不放回):从 scoredURL 列表中按权重概率抽取最多 k 条。
|
||
// 使用累积概率法近似 alias method(适合中等规模数据)。
|
||
func weightedSample(items []scoredURL, k int) []string {
|
||
if k >= len(items) {
|
||
out := make([]string, len(items))
|
||
for i, s := range items {
|
||
out[i] = s.url
|
||
}
|
||
return out
|
||
}
|
||
totalWeight := 0.0
|
||
for _, s := range items {
|
||
totalWeight += s.score
|
||
}
|
||
selected := make(map[int]bool)
|
||
out := make([]string, 0, k)
|
||
for len(out) < k && len(selected) < len(items) {
|
||
// 随机取 [0, totalWeight) 区间的一个点
|
||
r := rand.Float64() * totalWeight
|
||
cum := 0.0
|
||
for i, s := range items {
|
||
if selected[i] {
|
||
continue
|
||
}
|
||
cum += s.score
|
||
if cum >= r {
|
||
selected[i] = true
|
||
out = append(out, s.url)
|
||
totalWeight -= s.score // 被选中后从总权重中移除(不放回)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
// concentrationFilter 域名集中度过滤。
|
||
// 按 CrawlFocus 因子限制每个顶级域名被选中的 URL 数量,防止爬取过于集中在少数网站。
|
||
func concentrationFilter(urls []string, k float64) []string {
|
||
// 按顶级域名分组
|
||
domainGroups := make(map[string][]string)
|
||
shuffled := make([]string, len(urls))
|
||
copy(shuffled, urls)
|
||
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||
|
||
for _, u := range shuffled {
|
||
d := superNetloc(u)
|
||
domainGroups[d] = append(domainGroups[d], u)
|
||
}
|
||
|
||
// 计算每组保留上限:域名规模越大允许越多,但按 k 次幂压制
|
||
limit := 10
|
||
if len(domainGroups) > 1 {
|
||
sizes := make([]int, 0, len(domainGroups))
|
||
for _, g := range domainGroups {
|
||
sizes = append(sizes, int(math.Pow(float64(len(g)), k)))
|
||
}
|
||
// 升序排列,去除最大一项,用其余项总和的 60% 作为全局上限
|
||
for i := 0; i < len(sizes)-1; i++ {
|
||
for j := i + 1; j < len(sizes)-1; j++ {
|
||
if sizes[j] < sizes[i] {
|
||
sizes[i], sizes[j] = sizes[j], sizes[i]
|
||
}
|
||
}
|
||
}
|
||
total := 0
|
||
for _, s := range sizes[:len(sizes)-1] {
|
||
total += s
|
||
}
|
||
limit = max(10, int(float64(total)*0.6))
|
||
}
|
||
|
||
// 从每组中按计算的上限采样
|
||
var result []string
|
||
for _, g := range domainGroups {
|
||
sn := 1 + min(limit, int(math.Pow(float64(len(g)), k)))
|
||
if sn > len(g) {
|
||
sn = len(g)
|
||
}
|
||
result = append(result, g[:sn]...)
|
||
}
|
||
rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] })
|
||
return result
|
||
}
|
||
|
||
// truncateMap 将 map 裁剪到最多 n 条(取前 n 条,无特定顺序)。
|
||
func truncateMap(m map[string]string, n int) map[string]string {
|
||
if len(m) <= n {
|
||
return m
|
||
}
|
||
out := make(map[string]string, n)
|
||
i := 0
|
||
for k, v := range m {
|
||
if i >= n {
|
||
break
|
||
}
|
||
out[k] = v
|
||
i++
|
||
}
|
||
return out
|
||
}
|
||
|
||
// min 返回两个整数中的较小值。
|
||
func min(a, b int) int {
|
||
if a < b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
// max 返回两个整数中的较大值。
|
||
func max(a, b int) int {
|
||
if a > b {
|
||
return a
|
||
}
|
||
return b
|
||
}
|
||
|
||
// GetStats 返回当前爬虫统计快照(用于监控)。
|
||
func (c *Crawler) GetStats() Stats {
|
||
return Stats{
|
||
VisitedURLs: atomic.LoadInt64(&c.stats.VisitedURLs),
|
||
SuccessURLs: atomic.LoadInt64(&c.stats.SuccessURLs),
|
||
KeywordsFetched: atomic.LoadInt64(&c.stats.KeywordsFetched),
|
||
}
|
||
}
|
||
|
||
// GetCrawlStatus 返回当前爬取状态(供前端监控)。
|
||
func (c *Crawler) GetCrawlStatus() CrawlStatus {
|
||
c.crawlStatusMu.RLock()
|
||
defer c.crawlStatusMu.RUnlock()
|
||
return c.crawlStatus
|
||
}
|
||
|
||
// Stop 触发优雅关闭:通知所有 goroutine 停止。
|
||
// 已运行的任务会等待完成后再退出。
|
||
func (c *Crawler) Stop() {
|
||
c.stopOnce.Do(func() {
|
||
log.Println("[crawler] stop signal received, initiating graceful shutdown...")
|
||
close(c.stopCh) // 关闭通知
|
||
})
|
||
}
|
||
|
||
// WaitUntilStopped 等待爬虫完全停止(Run 和 priority worker 都退出)。
|
||
func (c *Crawler) WaitUntilStopped() {
|
||
<-c.priorityDone
|
||
}
|
||
|
||
// updateCrawlStatus 更新爬取状态(内部使用)。
|
||
func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) {
|
||
c.crawlStatusMu.Lock()
|
||
defer c.crawlStatusMu.Unlock()
|
||
fn(&c.crawlStatus)
|
||
}
|