优化手动增加的逻辑

This commit is contained in:
2026-04-11 21:38:04 +08:00
parent 0ce381b8a7
commit 5abe9271fe
7 changed files with 57 additions and 29 deletions
+49 -22
View File
@@ -73,15 +73,14 @@ type Crawler struct {
priorityCh chan string // Priority URL 任务队列(用户手动添加)
priorityChildCh chan string // Priority 子链接队列(子 URL 继续由 priority worker 爬取)
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats
priorityStats struct {
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats 和 priorityChildLinks
priorityStats struct {
pending int64 // 待处理的 Priority URL 数量(入队但未开始)
active int64 // 正在处理的 Priority URL 数量
}
// 孙链接(子 URL 的子链接)进入普通 BFS 队列
normalChildCh chan URLWeight // 孙链接 channel,由 Run 循环消费
// ---- 爬取状态暴露(供前端监控) ----
crawlStatusMu sync.RWMutex
crawlStatus CrawlStatus // 当前轮次状态
@@ -94,6 +93,7 @@ type CrawlStatus struct {
QueueLength int `json:"queue_length"` // 本轮队列长度
CompletedCount int `json:"completed_count"` // 本轮已完成的 URL 数
VisitedTotal int `json:"visited_total"` // 已收录 URL 总数
NextPoolSize int `json:"next_pool_size"` // 下一轮链接池大小(newLinks 调度后的队列长度)
IsRunning bool `json:"is_running"` // 是否正在运行
}
@@ -157,15 +157,15 @@ func DecrementPriorityLevel2Inflight(n int64) {
// prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。
func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler {
c := &Crawler{
fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second),
db: db,
analyzer: a,
prosperMap: prosperMap,
visited: make(map[string]bool),
priorityCh: make(chan string, priorityQueueSize),
fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second),
db: db,
analyzer: a,
prosperMap: prosperMap,
visited: make(map[string]bool),
priorityCh: make(chan string, priorityQueueSize),
priorityChildCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
normalChildCh: make(chan URLWeight, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢
}
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
go c.runPriorityWorker()
@@ -320,8 +320,8 @@ func (c *Crawler) runPriorityWorker() {
// priorityCrawlLoop 爬取单个 URL:
//
// level=1(一级,手动 URL):visitURLUnlimited 无限爬子链接 → 二级队列
// level=2(二级,子 URL):visitURLUnlimited 无限爬子链接 → 普通 BFS
// level=1(一级,手动 URL):visitURLUnlimited 无限爬子链接 → 二级队列priorityChildCh
// level=2(二级,子 URL):visitURLUnlimited 无限爬子链接 → 孙链接(normalChildCh
func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
defer c.priorityWg.Done()
defer func() { <-c.prioritySem }()
@@ -341,13 +341,13 @@ func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
// 两级都不限制子链接数量
children := c.visitURLUnlimited(rawURL)
log.Printf("[crawler] priority[%d] crawl done: %s (%d child links)", level, rawURL, len(children))
//log.Printf("[crawler] priority[%d] crawl done: %s (%d child links)", level, rawURL, len(children))
if len(children) == 0 {
return
}
// 一级:子链接进二级队列;二级:子链接进普通 BFS
// 一级:子链接进二级队列;二级:子链接直接加入 newLinks(同步)
for _, child := range children {
if level == 1 {
select {
@@ -357,9 +357,11 @@ func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
// 二级队列满,丢弃
}
} else {
// 二级:子链接进孙链接 channel,由 Run 永久 drain 直到全部到达
select {
case c.normalChildCh <- URLWeight{URL: child, Weight: 1.0}:
default:
// channel 满(很少发生),孙链接丢弃
}
}
}
@@ -368,6 +370,12 @@ func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
// TriggerPriorityCrawl 立即触发高优先级爬取(突破 workers 上限)。
// 适合用户手动插入 URL 时立即响应。
func (c *Crawler) TriggerPriorityCrawl(url string) {
defer func() {
if r := recover(); r != nil {
// priorityCh 已关闭(Run 已退出),忽略
log.Printf("[crawler] priority crawl ignored (crawler stopped): %s", url)
}
}()
select {
case c.priorityCh <- url:
c.priorityMu.Lock()
@@ -529,9 +537,9 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
}
wg.Wait()
// 消费孙链接 channel(孙链接来自 priority 爬取的子链接,进入普通 BFS 队列)
// 孙链接爬取是异步的,使用 timeout 确保全部到达后再调度
timeout := time.After(5 * time.Second)
// drain 孙链接 channel(孙链接来自 priority 爬取的子链接,进入普通 BFS 队列)
// runPriorityWorker 发送完毕后等待足够时间,确保 channel 中最后几条到达
timeout := time.After(10 * time.Second)
drained := false
for !drained {
select {
@@ -550,17 +558,36 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
cs.VisitedTotal = visitedTotal
})
// 本轮没有发现新链接,爬取结束
// 本轮没有发现新链接,报告停止,等待新 Priority URL 触发重启
if len(newLinks) == 0 {
log.Println("[crawler] empty queue — stopping")
log.Println("[crawler] empty — stopped, waiting for new URLs")
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = false
})
return
// 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始
for {
gc, ok := <-c.normalChildCh
if !ok {
return
}
newLinks = append(newLinks, gc)
log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks))
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = true
})
ep = -1 // continue 后 ep++ 变成 0
break // 退出空循环,进入正常队列处理
}
}
// 调度算法:从候选 URL 中选出下一轮要抓取的队列
nextPoolSize := len(newLinks)
queue = c.schedule(newLinks)
// 更新下一轮链接池大小
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.NextPoolSize = nextPoolSize
})
}
// 所有轮次完成,更新状态
c.updateCrawlStatus(func(cs *CrawlStatus) {