From 079a4c62910008dd13ffa096a3c6dd10e8a8e53f Mon Sep 17 00:00:00 2001 From: kevin Date: Sat, 11 Apr 2026 23:12:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E9=9B=85=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler/crawler.go | 93 +++++++++++++++++++++++++++++++++++++++------- main.go | 15 +++++++- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index f987767..782c389 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -87,6 +87,11 @@ type Crawler struct { // ---- 爬取状态暴露(供前端监控) ---- crawlStatusMu sync.RWMutex crawlStatus CrawlStatus // 当前轮次状态 + + // ---- 优雅关闭 ---- + stopCh chan struct{} // 关闭时表示收到停止信号 + stopOnce sync.Once // 确保只关闭一次 + priorityDone chan struct{} // priority worker 已完全停止 } // CrawlStatus 暴露给前端监控的爬取状态 @@ -170,6 +175,8 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C priorityChildCh: make(chan string, priorityQueueSize), prioritySem: make(chan struct{}, priorityMaxWorkers), normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢 + stopCh: make(chan struct{}), + priorityDone: make(chan struct{}), } // 启动 Priority Worker(独立 goroutine,不受主 workers 限制) go c.runPriorityWorker() @@ -246,18 +253,42 @@ func (c *Crawler) markVisited(url string) { // 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。 // - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列 // - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS +// 收到 stopCh 关闭信号后,等待当前批次完成然后退出。 func (c *Crawler) runPriorityWorker() { + defer close(c.priorityDone) // 通知 WaitUntilStopped + for { + // 检查停止信号 + select { + case <-c.stopCh: + log.Println("[priority] stop signal received, waiting for pending work...") + // 不直接退出,等待当前正在运行的 goroutine 完成 + // 但不再接受新任务,关闭 channel 防止新任务入队 + close(c.priorityCh) + c.priorityWg.Wait() + log.Println("[priority] all pending work done, exiting") + return + default: + } + // ---- 第一阶段:drain 一级队列全部取出 ---- level1Batch := []string{} - // 先阻塞等第一个一级 URL - url := <-c.priorityCh - level1Batch = append(level1Batch, url) + var item string + // 先阻塞等第一个一级 URL(可被 stopCh 中断) + select { + case item = <-c.priorityCh: + level1Batch = append(level1Batch, item) + case <-c.stopCh: + // stopCh 刚关闭,从头重走退出流程 + close(c.priorityCh) + c.priorityWg.Wait() + return + } // drain 剩余一级 URL(非阻塞) for { select { - case url = <-c.priorityCh: - level1Batch = append(level1Batch, url) + case item = <-c.priorityCh: + level1Batch = append(level1Batch, item) default: goto processLevel1 } @@ -458,6 +489,17 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { c.startRecrawlTicker() for ep := 0; ep < maxEpoch; ep++ { + // 检查停止信号 + select { + case <-c.stopCh: + log.Println("[crawler] stop signal received, exiting run loop") + c.updateCrawlStatus(func(cs *CrawlStatus) { + cs.IsRunning = false + }) + return + default: + } + // 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整 workers := config.CrawlerWorkers() @@ -583,18 +625,25 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { }) // 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始 for { - gc, ok := <-c.normalChildCh - if !ok { + select { + case gc, ok := <-c.normalChildCh: + if !ok { + // channel 已关闭(正常情况或 stop 时关闭) + return + } + newLinks = append(newLinks, gc) + log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks)) + c.updateCrawlStatus(func(cs *CrawlStatus) { + cs.IsRunning = true + }) + ep = -1 // continue 后 ep++ 变成 0 + goto restartEpochLoop // 退出空循环,进入正常队列处理 + case <-c.stopCh: + // 收到停止信号 return } - newLinks = append(newLinks, gc) - log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks)) - c.updateCrawlStatus(func(cs *CrawlStatus) { - cs.IsRunning = true - }) - ep = -1 // continue 后 ep++ 变成 0 - break // 退出空循环,进入正常队列处理 } + restartEpochLoop: } // 调度算法:从候选 URL 中选出下一轮要抓取的队列 @@ -625,6 +674,8 @@ func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text // 使用 sync.WaitGroup + select 实现硬超时包装器, // 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。 + // 注意:优雅关闭时,如果恰好在抓取过程中收到停止信号, + // goroutine 会等待最多 35s(fetchTimeout + 5s)后强制退出。 fetchTimeout := 30 * time.Second var res *FetchResult var fetchErr error @@ -1273,6 +1324,20 @@ func (c *Crawler) GetCrawlStatus() CrawlStatus { return c.crawlStatus } +// Stop 触发优雅关闭:通知所有 goroutine 停止。 +// 已运行的任务会等待完成后再退出。 +func (c *Crawler) Stop() { + c.stopOnce.Do(func() { + log.Println("[crawler] stop signal received, initiating graceful shutdown...") + close(c.stopCh) // 关闭通知 + }) +} + +// WaitUntilStopped 等待爬虫完全停止(Run 和 priority worker 都退出)。 +func (c *Crawler) WaitUntilStopped() { + <-c.priorityDone +} + // updateCrawlStatus 更新爬取状态(内部使用)。 func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) { c.crawlStatusMu.Lock() diff --git a/main.go b/main.go index bb6c7db..51d711a 100644 --- a/main.go +++ b/main.go @@ -185,6 +185,17 @@ func main() { quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit - log.Println("shutdown signal received, flushing index...") - searchSrv.Flush() // 退出前刷盘,不丢数据 + log.Println("shutdown signal received, initiating graceful shutdown...") + + // 通知爬虫停止(不阻塞,等待爬虫内部协调) + crawl.Stop() + + // 等待爬虫完全停止(包括 priority worker) + crawl.WaitUntilStopped() + log.Println("crawler stopped") + + // 最后刷盘,确保数据不丢失 + log.Println("flushing index...") + searchSrv.Flush() + log.Println("shutdown complete") }