优雅退出

This commit is contained in:
2026-04-11 23:12:13 +08:00
parent 0ad7c866d6
commit 079a4c6291
2 changed files with 92 additions and 16 deletions
+79 -14
View File
@@ -87,6 +87,11 @@ type Crawler struct {
// ---- 爬取状态暴露(供前端监控) ---- // ---- 爬取状态暴露(供前端监控) ----
crawlStatusMu sync.RWMutex crawlStatusMu sync.RWMutex
crawlStatus CrawlStatus // 当前轮次状态 crawlStatus CrawlStatus // 当前轮次状态
// ---- 优雅关闭 ----
stopCh chan struct{} // 关闭时表示收到停止信号
stopOnce sync.Once // 确保只关闭一次
priorityDone chan struct{} // priority worker 已完全停止
} }
// CrawlStatus 暴露给前端监控的爬取状态 // CrawlStatus 暴露给前端监控的爬取状态
@@ -170,6 +175,8 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C
priorityChildCh: make(chan string, priorityQueueSize), priorityChildCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers), prioritySem: make(chan struct{}, priorityMaxWorkers),
normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢 normalChildCh: make(chan URLWeight, 50000), // 孙链接 channel,大 buffer 避免丢
stopCh: make(chan struct{}),
priorityDone: make(chan struct{}),
} }
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制) // 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
go c.runPriorityWorker() go c.runPriorityWorker()
@@ -246,18 +253,42 @@ func (c *Crawler) markVisited(url string) {
// 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。 // 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。
// - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列 // - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列
// - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS // - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS
// 收到 stopCh 关闭信号后,等待当前批次完成然后退出。
func (c *Crawler) runPriorityWorker() { func (c *Crawler) runPriorityWorker() {
defer close(c.priorityDone) // 通知 WaitUntilStopped
for { for {
// 检查停止信号
select {
case <-c.stopCh:
log.Println("[priority] stop signal received, waiting for pending work...")
// 不直接退出,等待当前正在运行的 goroutine 完成
// 但不再接受新任务,关闭 channel 防止新任务入队
close(c.priorityCh)
c.priorityWg.Wait()
log.Println("[priority] all pending work done, exiting")
return
default:
}
// ---- 第一阶段:drain 一级队列全部取出 ---- // ---- 第一阶段:drain 一级队列全部取出 ----
level1Batch := []string{} level1Batch := []string{}
// 先阻塞等第一个一级 URL var item string
url := <-c.priorityCh // 先阻塞等第一个一级 URL(可被 stopCh 中断)
level1Batch = append(level1Batch, url) select {
case item = <-c.priorityCh:
level1Batch = append(level1Batch, item)
case <-c.stopCh:
// stopCh 刚关闭,从头重走退出流程
close(c.priorityCh)
c.priorityWg.Wait()
return
}
// drain 剩余一级 URL(非阻塞) // drain 剩余一级 URL(非阻塞)
for { for {
select { select {
case url = <-c.priorityCh: case item = <-c.priorityCh:
level1Batch = append(level1Batch, url) level1Batch = append(level1Batch, item)
default: default:
goto processLevel1 goto processLevel1
} }
@@ -458,6 +489,17 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
c.startRecrawlTicker() c.startRecrawlTicker()
for ep := 0; ep < maxEpoch; ep++ { for ep := 0; ep < maxEpoch; ep++ {
// 检查停止信号
select {
case <-c.stopCh:
log.Println("[crawler] stop signal received, exiting run loop")
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = false
})
return
default:
}
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整 // 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
workers := config.CrawlerWorkers() workers := config.CrawlerWorkers()
@@ -583,18 +625,25 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
}) })
// 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始 // 空循环等 normalChildCh,新数据到达后立即从 epoch 0 重新开始
for { for {
gc, ok := <-c.normalChildCh select {
if !ok { case gc, ok := <-c.normalChildCh:
if !ok {
// channel 已关闭(正常情况或 stop 时关闭)
return
}
newLinks = append(newLinks, gc)
log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks))
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = true
})
ep = -1 // continue 后 ep++ 变成 0
goto restartEpochLoop // 退出空循环,进入正常队列处理
case <-c.stopCh:
// 收到停止信号
return return
} }
newLinks = append(newLinks, gc)
log.Printf("[crawler] new URLs detected, restarting from epoch 0 (%d in pool)", len(newLinks))
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = true
})
ep = -1 // continue 后 ep++ 变成 0
break // 退出空循环,进入正常队列处理
} }
restartEpochLoop:
} }
// 调度算法:从候选 URL 中选出下一轮要抓取的队列 // 调度算法:从候选 URL 中选出下一轮要抓取的队列
@@ -625,6 +674,8 @@ func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text
// 使用 sync.WaitGroup + select 实现硬超时包装器, // 使用 sync.WaitGroup + select 实现硬超时包装器,
// 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。 // 确保即使 http.Client.Timout 被某些底层操作忽略,goroutine 也不会永久阻塞。
// 注意:优雅关闭时,如果恰好在抓取过程中收到停止信号,
// goroutine 会等待最多 35sfetchTimeout + 5s)后强制退出。
fetchTimeout := 30 * time.Second fetchTimeout := 30 * time.Second
var res *FetchResult var res *FetchResult
var fetchErr error var fetchErr error
@@ -1273,6 +1324,20 @@ func (c *Crawler) GetCrawlStatus() CrawlStatus {
return c.crawlStatus return c.crawlStatus
} }
// Stop 触发优雅关闭:通知所有 goroutine 停止。
// 已运行的任务会等待完成后再退出。
func (c *Crawler) Stop() {
c.stopOnce.Do(func() {
log.Println("[crawler] stop signal received, initiating graceful shutdown...")
close(c.stopCh) // 关闭通知
})
}
// WaitUntilStopped 等待爬虫完全停止(Run 和 priority worker 都退出)。
func (c *Crawler) WaitUntilStopped() {
<-c.priorityDone
}
// updateCrawlStatus 更新爬取状态(内部使用)。 // updateCrawlStatus 更新爬取状态(内部使用)。
func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) { func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) {
c.crawlStatusMu.Lock() c.crawlStatusMu.Lock()
+13 -2
View File
@@ -185,6 +185,17 @@ func main() {
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM) signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit <-quit
log.Println("shutdown signal received, flushing index...") log.Println("shutdown signal received, initiating graceful shutdown...")
searchSrv.Flush() // 退出前刷盘,不丢数据
// 通知爬虫停止(不阻塞,等待爬虫内部协调)
crawl.Stop()
// 等待爬虫完全停止(包括 priority worker
crawl.WaitUntilStopped()
log.Println("crawler stopped")
// 最后刷盘,确保数据不丢失
log.Println("flushing index...")
searchSrv.Flush()
log.Println("shutdown complete")
} }