增加爬取状态api

This commit is contained in:
2026-04-10 18:40:40 +08:00
parent 65e6547d54
commit fd827cbde3
7 changed files with 104 additions and 7 deletions
+64
View File
@@ -82,6 +82,20 @@ type Crawler struct {
// ---- Priority 子链接优先队列(来自 priority worker 的子链接会优先爬取)----
priorityChildrenMu sync.Mutex
priorityChildren []string // Priority URL 产生的子链接(优先处理)
// ---- 爬取状态暴露(供前端监控) ----
crawlStatusMu sync.RWMutex
crawlStatus CrawlStatus // 当前轮次状态
}
// CrawlStatus 暴露给前端监控的爬取状态
type CrawlStatus struct {
CurrentEpoch int `json:"current_epoch"` // 当前轮次(从1开始)
MaxEpoch int `json:"max_epoch"` // 总轮数上限
QueueLength int `json:"queue_length"` // 本轮队列长度
CompletedCount int `json:"completed_count"` // 本轮已完成的 URL 数
VisitedTotal int `json:"visited_total"` // 已收录 URL 总数
IsRunning bool `json:"is_running"` // 是否正在运行
}
// 全局活跃线程计数器(跨包可读,无需持有 Crawler 引用)
@@ -336,6 +350,12 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
c.markVisited(entryURL)
queue := []string{entryURL}
// 初始化爬取状态
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.MaxEpoch = maxEpoch
cs.IsRunning = true
})
// 启动后台重爬定时器:定期释放过期 URL 到候选池
c.startRecrawlTicker()
@@ -343,6 +363,13 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
workers := config.CrawlerWorkers()
// 更新爬取状态:新一轮开始
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.CurrentEpoch = ep + 1
cs.QueueLength = len(queue)
cs.CompletedCount = 0
})
// ---- 优先处理 priorityChildren 队列(来自 priority worker 的子链接)----
var priorityQueue []string
c.priorityChildrenMu.Lock()
@@ -371,6 +398,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
newLinks []URLWeight // 收集下一轮候选 URL
mu sync.Mutex // 保护 newLinks 的并发写入
wg sync.WaitGroup
completed int64 // 本轮已完成的计数(atomic
)
// 信号量:限制同时并发数(使用上方读取的 workers 值)
@@ -389,6 +417,13 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
// 抓取单个 URL,返回发现的子链接
hrefs := c.visitURL(rawURL)
// 更新完成计数
currentCompleted := atomic.AddInt64(&completed, 1)
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.CompletedCount = int(currentCompleted)
})
n := len(hrefs)
if n == 0 {
return
@@ -427,15 +462,30 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
}
c.priorityChildrenMu.Unlock()
// 更新已收录总数
c.visitedMu.RLock()
visitedTotal := len(c.visited)
c.visitedMu.RUnlock()
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.VisitedTotal = visitedTotal
})
// 本轮没有发现新链接,爬取结束
if len(newLinks) == 0 {
log.Println("[crawler] empty queue — stopping")
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = false
})
return
}
// 调度算法:从候选 URL 中选出下一轮要抓取的队列
queue = c.schedule(newLinks)
}
// 所有轮次完成,更新状态
c.updateCrawlStatus(func(cs *CrawlStatus) {
cs.IsRunning = false
})
}
// visitURLRaw 抓取 URL 的核心逻辑,提取标题、描述、正文、子链接。
@@ -1089,3 +1139,17 @@ func (c *Crawler) GetStats() Stats {
KeywordsFetched: atomic.LoadInt64(&c.stats.KeywordsFetched),
}
}
// GetCrawlStatus 返回当前爬取状态(供前端监控)。
func (c *Crawler) GetCrawlStatus() CrawlStatus {
c.crawlStatusMu.RLock()
defer c.crawlStatusMu.RUnlock()
return c.crawlStatus
}
// updateCrawlStatus 更新爬取状态(内部使用)。
func (c *Crawler) updateCrawlStatus(fn func(*CrawlStatus)) {
c.crawlStatusMu.Lock()
defer c.crawlStatusMu.Unlock()
fn(&c.crawlStatus)
}