修复可能存在的cpu空转问题
This commit is contained in:
+15
-20
@@ -258,38 +258,32 @@ func (c *Crawler) runPriorityWorker() {
|
|||||||
defer close(c.priorityDone) // 通知 WaitUntilStopped
|
defer close(c.priorityDone) // 通知 WaitUntilStopped
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// 检查停止信号
|
// ---- 第一阶段:drain 一级队列全部取出 ----
|
||||||
|
level1Batch := []string{}
|
||||||
|
|
||||||
|
// 先阻塞等待第一个一级 URL
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
log.Println("[priority] stop signal received, waiting for pending work...")
|
log.Println("[priority] stop signal received, waiting for pending work...")
|
||||||
// 不直接退出,等待当前正在运行的 goroutine 完成
|
|
||||||
// 但不再接受新任务,关闭 channel 防止新任务入队
|
|
||||||
close(c.priorityCh)
|
close(c.priorityCh)
|
||||||
c.priorityWg.Wait()
|
c.priorityWg.Wait()
|
||||||
log.Println("[priority] all pending work done, exiting")
|
log.Println("[priority] all pending work done, exiting")
|
||||||
return
|
return
|
||||||
default:
|
case item := <-c.priorityCh:
|
||||||
|
level1Batch = append(level1Batch, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- 第一阶段:drain 一级队列全部取出 ----
|
// drain 剩余一级 URL(改为阻塞模式,避免 CPU 空转)
|
||||||
level1Batch := []string{}
|
|
||||||
var item string
|
|
||||||
// 先阻塞等第一个一级 URL(可被 stopCh 中断)
|
|
||||||
select {
|
|
||||||
case item = <-c.priorityCh:
|
|
||||||
level1Batch = append(level1Batch, item)
|
|
||||||
case <-c.stopCh:
|
|
||||||
// stopCh 刚关闭,从头重走退出流程
|
|
||||||
close(c.priorityCh)
|
|
||||||
c.priorityWg.Wait()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// drain 剩余一级 URL(非阻塞)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case item = <-c.priorityCh:
|
case <-c.stopCh:
|
||||||
|
close(c.priorityCh)
|
||||||
|
c.priorityWg.Wait()
|
||||||
|
return
|
||||||
|
case item := <-c.priorityCh:
|
||||||
level1Batch = append(level1Batch, item)
|
level1Batch = append(level1Batch, item)
|
||||||
default:
|
default:
|
||||||
|
// channel 暂时为空,跳出 drain 循环处理已收集的 batch
|
||||||
goto processLevel1
|
goto processLevel1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -489,7 +483,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
|||||||
c.startRecrawlTicker()
|
c.startRecrawlTicker()
|
||||||
|
|
||||||
for ep := 0; ep < maxEpoch; ep++ {
|
for ep := 0; ep < maxEpoch; ep++ {
|
||||||
// 检查停止信号
|
// 检查停止信号(阻塞等待,避免 CPU 空转)
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
log.Println("[crawler] stop signal received, exiting run loop")
|
log.Println("[crawler] stop signal received, exiting run loop")
|
||||||
@@ -498,6 +492,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
|||||||
})
|
})
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
// 正常情况下继续执行,无阻塞
|
||||||
}
|
}
|
||||||
|
|
||||||
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
|
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
|
||||||
|
|||||||
+1
-1
Submodule sese-engine-ui updated: 60e897f110...52c1b9de99
Reference in New Issue
Block a user