diff --git a/crawler/crawler.go b/crawler/crawler.go index 31a7e33..b47e2fd 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -258,38 +258,32 @@ func (c *Crawler) runPriorityWorker() { defer close(c.priorityDone) // 通知 WaitUntilStopped for { - // 检查停止信号 + // ---- 第一阶段:drain 一级队列全部取出 ---- + level1Batch := []string{} + + // 先阻塞等待第一个一级 URL select { case <-c.stopCh: log.Println("[priority] stop signal received, waiting for pending work...") - // 不直接退出,等待当前正在运行的 goroutine 完成 - // 但不再接受新任务,关闭 channel 防止新任务入队 close(c.priorityCh) c.priorityWg.Wait() log.Println("[priority] all pending work done, exiting") return - default: + case item := <-c.priorityCh: + level1Batch = append(level1Batch, item) } - // ---- 第一阶段:drain 一级队列全部取出 ---- - level1Batch := []string{} - var item string - // 先阻塞等第一个一级 URL(可被 stopCh 中断) - select { - case item = <-c.priorityCh: - level1Batch = append(level1Batch, item) - case <-c.stopCh: - // stopCh 刚关闭,从头重走退出流程 - close(c.priorityCh) - c.priorityWg.Wait() - return - } - // drain 剩余一级 URL(非阻塞) + // drain 剩余一级 URL(改为阻塞模式,避免 CPU 空转) for { select { - case item = <-c.priorityCh: + case <-c.stopCh: + close(c.priorityCh) + c.priorityWg.Wait() + return + case item := <-c.priorityCh: level1Batch = append(level1Batch, item) default: + // channel 暂时为空,跳出 drain 循环处理已收集的 batch goto processLevel1 } } @@ -489,7 +483,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { c.startRecrawlTicker() for ep := 0; ep < maxEpoch; ep++ { - // 检查停止信号 + // 检查停止信号(阻塞等待,避免 CPU 空转) select { case <-c.stopCh: log.Println("[crawler] stop signal received, exiting run loop") @@ -498,6 +492,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { }) return default: + // 正常情况下继续执行,无阻塞 } // 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整 diff --git a/sese-engine-ui b/sese-engine-ui index 60e897f..52c1b9d 160000 --- a/sese-engine-ui +++ b/sese-engine-ui @@ -1 +1 @@ -Subproject commit 60e897f110866903ab30d74eb16d8e61214aea47 +Subproject commit 52c1b9de996d12e63bc2774f502dad3cec08bc6c