优化优先线程

This commit is contained in:
2026-04-11 19:44:51 +08:00
parent e3a6d18a8c
commit e372ef2295
7 changed files with 210 additions and 106 deletions
+2
View File
@@ -0,0 +1,2 @@
CC=C:\mingw64\bin\gcc.exe
CXX=C:\mingw64\bin\g++.exe
+1 -1
View File
@@ -124,7 +124,7 @@ func GetDefaultConfig() Config {
MaxKeywordsPerPage: 250,
MaxEpoch: 100,
ExpectedProsperRatio: 0.6,
EntryURL: "https://zh.wikipedia.org/",
EntryURL: "http://127.0.0.1",
MaxPageSize: 5 * 1024 * 1024,
RecrawlMaxAge: 30 * 86400, // 30 天
RecrawlCheckInterval: 3600, // 1 小时
+177 -92
View File
@@ -70,18 +70,17 @@ type Crawler struct {
activeWorkers int64
// ---- Priority Worker(独立 goroutine,不受主 workers 限制)----
priorityCh chan string // Priority URL 任务队列
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats
priorityStats struct {
priorityCh chan string // Priority URL 任务队列(用户手动添加)
priorityChildCh chan string // Priority 子链接队列(子 URL 继续由 priority worker 爬取
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats
priorityStats struct {
pending int64 // 待处理的 Priority URL 数量(入队但未开始)
active int64 // 正在处理的 Priority URL 数量
}
// ---- Priority 子链接优先队列(来自 priority worker 的子链接会优先爬取)----
priorityChildrenMu sync.Mutex
priorityChildren []string // Priority URL 产生的子链接(优先处理)
// 孙链接(子 URL 的子链接)进入普通 BFS 队列
normalChildCh chan URLWeight // 孙链接 channel,由 Run 循环消费
// ---- 爬取状态暴露(供前端监控) ----
crawlStatusMu sync.RWMutex
@@ -118,27 +117,40 @@ var globalPriorityStatus struct {
active int64
}
// 全局二级优先队列待处理计数(goroutine 异步发送,无法用 len() 直接获取)
var globalPriorityLevel2 int64
// GlobalPriorityStatus 返回当前全局 Priority Worker 状态。
func GlobalPriorityStatus() map[string]interface{} {
return map[string]interface{}{
"pending": atomic.LoadInt64(&globalPriorityStatus.pending),
"active": atomic.LoadInt64(&globalPriorityStatus.active),
"max_workers": priorityMaxWorkers,
"children_queue": atomic.LoadInt64(&globalPriorityChildren),
}
"active": atomic.LoadInt64(&globalPriorityStatus.active),
"max_workers": priorityMaxWorkers,
"level1": atomic.LoadInt64(&globalPriorityStatus.pending),
"level2": atomic.LoadInt64(&globalPriorityLevel2),
"level2_queue": atomic.LoadInt64(&globalPriorityLevel2), // 待爬取
"level2_inflight": atomic.LoadInt64(&globalPriorityLevel2Inflight), // 正在爬取
}
}
// 全局 Priority 子链接队列长度(跨 Crawler 实例共享)
var globalPriorityChildren int64
// IncrementPriorityChildren 增加 priorityChildren 计数。
func IncrementPriorityChildren(n int64) {
atomic.AddInt64(&globalPriorityChildren, n)
// IncrementPriorityLevel2 增加二级队列计数。
func IncrementPriorityLevel2(n int64) {
atomic.AddInt64(&globalPriorityLevel2, n)
}
// DecrementPriorityChildren 减少 priorityChildren 计数。
func DecrementPriorityChildren(n int64) {
atomic.AddInt64(&globalPriorityChildren, -n)
// DecrementPriorityLevel2 减少二级队列计数。
func DecrementPriorityLevel2(n int64) {
atomic.AddInt64(&globalPriorityLevel2, -n)
}
// ---- 二级正在爬取的计数(drain 时启动,goroutine 结束时扣减)----
var globalPriorityLevel2Inflight int64
func IncrementPriorityLevel2Inflight(n int64) {
atomic.AddInt64(&globalPriorityLevel2Inflight, n)
}
func DecrementPriorityLevel2Inflight(n int64) {
atomic.AddInt64(&globalPriorityLevel2Inflight, -n)
}
// New 创建一个 Crawler 实例。
@@ -150,8 +162,10 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C
analyzer: a,
prosperMap: prosperMap,
visited: make(map[string]bool),
priorityCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
priorityCh: make(chan string, priorityQueueSize),
priorityChildCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
normalChildCh: make(chan URLWeight, priorityQueueSize),
}
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
go c.runPriorityWorker()
@@ -225,54 +239,129 @@ func (c *Crawler) markVisited(url string) {
// ---- Priority Worker(突破 workers 上限,立即爬取高优先级 URL)----
// runPriorityWorker 独立处理高优先级 URL,不受主 workers 限制。
// 当用户插入 Priority URL 时,立即触发爬取(不等待 epoch 调度)。
// 两级队列共享同一线程池:一级队列(手动 URL)优先处理完,再处理二级队列(子 URL)。
// - 一级队列:visitURLUnlimited,无限爬子链接,子链接进入二级队列
// - 二级队列:visitURL,有限爬子链接,子链接进入普通 BFS
func (c *Crawler) runPriorityWorker() {
for url := range c.priorityCh {
c.prioritySem <- struct{}{} // 获取令牌(阻塞直到有空闲槽位)
c.priorityMu.Lock()
c.priorityStats.active++
c.priorityStats.pending--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.pending, -1)
atomic.AddInt64(&globalPriorityStatus.active, 1)
c.priorityWg.Add(1)
go func(rawURL string) {
defer c.priorityWg.Done()
defer func() { <-c.prioritySem }() // 释放令牌
defer func() {
c.priorityMu.Lock()
c.priorityStats.active--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, -1)
}()
// 关键:强制移除 visited 标记(即使未过期也要重新爬取)
c.visitedMu.Lock()
delete(c.visited, rawURL)
c.visitedMu.Unlock()
log.Printf("[crawler] priority crawl started: %s", rawURL)
// 直接调用 visitURLUnlimited,绕过队列调度和链接数限制
hrefs := c.visitURLUnlimited(rawURL)
// 将子链接加入优先队列(保持优先级)
// 注意:Priority URL 的子链接强制加入队列,即使已访问过也要重新爬取
if len(hrefs) > 0 {
c.priorityChildrenMu.Lock()
for _, child := range hrefs {
c.priorityChildren = append(c.priorityChildren, child)
for {
// ---- 第一阶段:drain 一级队列全部取出 ----
level1Batch := []string{}
// 先阻塞等第一个一级 URL
url := <-c.priorityCh
level1Batch = append(level1Batch, url)
// drain 剩余一级 URL(非阻塞)
for {
select {
case url = <-c.priorityCh:
level1Batch = append(level1Batch, url)
default:
goto processLevel1
}
IncrementPriorityChildren(int64(len(hrefs)))
c.priorityChildrenMu.Unlock()
}
log.Printf("[crawler] priority crawl done: %s (%d child links)", rawURL, len(hrefs))
processLevel1:
// batch 收集完成后才扣减 pending,此时 pending = len(level1Batch) + len(priorityCh)
// 这样 level1 能正确反映队列中待处理的 URL 总数
log.Printf("[priority] drain level1 batch: count=%d, pending_before=%d, priorityCh_len=%d",
len(level1Batch), atomic.LoadInt64(&globalPriorityStatus.pending), len(c.priorityCh))
atomic.AddInt64(&globalPriorityStatus.pending, -int64(len(level1Batch)))
// 清理已访问的 priority URL(防止重复爬取)
_ = c.db.RemovePriorityURL(rawURL)
}(url)
// ---- 启动全部一级 goroutine ----
for _, u := range level1Batch {
c.prioritySem <- struct{}{}
c.priorityMu.Lock()
c.priorityStats.active++
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, 1)
c.priorityWg.Add(1)
go c.priorityCrawlLoop(u, 1)
}
// 等待一级 goroutine 全部完成
c.priorityWg.Wait()
// ---- 第二阶段:drain 二级队列全部取出 ----
level2Batch := []string{}
drained := int64(0)
for {
select {
case child := <-c.priorityChildCh:
level2Batch = append(level2Batch, child)
drained++
// 实时扣减 level2,使 API 在 drain 期间仍能看到剩余未处理的量
atomic.AddInt64(&globalPriorityLevel2, -1)
default:
goto processLevel2
}
}
processLevel2:
// drained 项已在 drain 循环中逐个从 level2 扣减
// 启动 goroutine 后这些 URL 进入"正在爬取"状态
IncrementPriorityLevel2Inflight(int64(len(level2Batch)))
log.Printf("[priority] drain level2 done: drained=%d, inflight_before_spawn=%d",
len(level2Batch), atomic.LoadInt64(&globalPriorityLevel2Inflight))
// ---- 启动全部二级 goroutine ----
for _, u := range level2Batch {
c.prioritySem <- struct{}{}
c.priorityMu.Lock()
c.priorityStats.active++
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, 1)
c.priorityWg.Add(1)
go c.priorityCrawlLoop(u, 2)
}
// 等待二级 goroutine 全部完成,才进入下一轮(再次处理一级队列)
c.priorityWg.Wait()
}
}
// priorityCrawlLoop 爬取单个 URL:
//
// level=1(一级,手动 URL):visitURLUnlimited 无限爬子链接 → 二级队列
// level=2(二级,子 URL):visitURLUnlimited 无限爬子链接 → 普通 BFS
func (c *Crawler) priorityCrawlLoop(rawURL string, level int) {
defer c.priorityWg.Done()
defer func() { <-c.prioritySem }()
defer func() {
c.priorityMu.Lock()
c.priorityStats.active--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, -1)
if level == 2 {
DecrementPriorityLevel2Inflight(1)
}
}()
// 标记 DB 中该 URL 为已访问,防止重启后再次被调度
_ = c.db.MarkPriorityURLVisited(rawURL)
// 两级都不限制子链接数量
children := c.visitURLUnlimited(rawURL)
log.Printf("[crawler] priority[%d] crawl done: %s (%d child links)", level, rawURL, len(children))
if len(children) == 0 {
return
}
// 一级:子链接进二级队列;二级:子链接进普通 BFS
for _, child := range children {
if level == 1 {
select {
case c.priorityChildCh <- child:
IncrementPriorityLevel2(1)
default:
// 二级队列满,丢弃
}
} else {
select {
case c.normalChildCh <- URLWeight{URL: child, Weight: 1.0}:
default:
}
}
}
}
@@ -367,17 +456,6 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
cs.CompletedCount = 0
})
// ---- 优先处理 priorityChildren 队列(来自 priority worker 的子链接)----
var priorityQueue []string
c.priorityChildrenMu.Lock()
if len(c.priorityChildren) > 0 {
priorityQueue = c.priorityChildren
// 更新全局计数器:这些 URL 即将被处理
DecrementPriorityChildren(int64(len(priorityQueue)))
log.Printf("[crawler] epoch %d/%d processing %d priority children first", ep+1, maxEpoch, len(priorityQueue))
}
c.priorityChildrenMu.Unlock()
// 每轮开始前:拉取 priority URLs,插入队列前端
priorityAdded := c.fetchAndApplyPriorityURLs(&queue)
if priorityAdded > 0 {
@@ -440,8 +518,8 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
// 分配权重
w := 1.0 / float64(n)
// 孙链接(来自 priorityChildren)爬取后,子链接进入正常 BFS 队列(不再优先传递)
// 所有子链接统一进入 newLinks,经过 schedule() 调度
// 子 URL 进入 newLinks 调度,由普通 BFS 下一轮处理。
// priority worker 通过 priorityChildCh 独立爬取子 URL,两者互不干扰)
mu.Lock()
for _, h := range children {
newLinks = append(newLinks, URLWeight{URL: h, Weight: w})
@@ -451,13 +529,18 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
}
wg.Wait()
// ---- 清空本轮已处理的 priorityQueue ----
c.priorityChildrenMu.Lock()
if len(priorityQueue) > 0 {
c.priorityChildren = c.priorityChildren[len(priorityQueue):]
// 计数器已在提取时扣除,这里不需要额外操作
// 消费孙链接 channel(孙链接来自 priority 爬取的子链接,进入普通 BFS 队列)
// 孙链接爬取是异步的,使用 timeout 确保全部到达后再调度
timeout := time.After(5 * time.Second)
drained := false
for !drained {
select {
case gc := <-c.normalChildCh:
newLinks = append(newLinks, gc)
case <-timeout:
drained = true
}
}
c.priorityChildrenMu.Unlock()
// 更新已收录总数
c.visitedMu.RLock()
@@ -487,7 +570,8 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
// visitURLRaw 抓取 URL 的核心逻辑,提取标题、描述、正文、子链接。
// 不包含链接数限制,用于优先级爬取。
func (c *Crawler) visitURLRaw(rawURL string) (title, desc, text string, hrefs []string) {
// forceIndex=true 时强制跳过增量重爬检测,直接提取关键词写入索引(用于 Priority URL)。
func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text string, hrefs []string) {
// recover 保护:防止任何模块(analyzer/storage/parser)的 panic 杀死 goroutine
defer func() {
if r := recover(); r != nil {
@@ -535,7 +619,7 @@ func (c *Crawler) visitURLRaw(rawURL string) (title, desc, text string, hrefs []
// 增量重爬检测:查询上次爬取的哈希,内容未变则跳过关键词提取
isRecrawl := false
oldEntry, _ := c.db.GetSnippet(res.FinalURL)
if oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash {
if !forceIndex && oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash {
isRecrawl = true
//log.Printf("[crawler] unchanged (recrawl skip): %s", res.FinalURL)
}
@@ -595,7 +679,7 @@ func (c *Crawler) visitURLRaw(rawURL string) (title, desc, text string, hrefs []
// 限制返回的链接数,防止下一轮队列爆炸。
func (c *Crawler) visitURL(rawURL string) (hrefs []string) {
atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1
_, _, _, hrefs = c.visitURLRaw(rawURL)
_, _, _, hrefs = c.visitURLRaw(rawURL, false)
// 限制返回的链接数,防止下一轮队列爆炸
maxLinks := config.MaxPriorityChildren()
@@ -606,10 +690,11 @@ func (c *Crawler) visitURL(rawURL string) (hrefs []string) {
}
// visitURLUnlimited 抓取一个 URL,返回页面中发现的子链接(无链接数限制)。
// 用于手动添加的 Priority URL,保留全部子链接。
// 用于手动添加的 Priority URL,保留全部子链接,并强制重建索引(跳过增量检测)
func (c *Crawler) visitURLUnlimited(rawURL string) (hrefs []string) {
atomic.AddInt64(&c.stats.VisitedURLs, 1) // 计数器 +1
_, _, _, hrefs = c.visitURLRaw(rawURL)
// forceIndex=truePriority URL 强制重新提取关键词,不受内容未变化影响
_, _, _, hrefs = c.visitURLRaw(rawURL, true)
return hrefs
}
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -5,7 +5,7 @@
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SESE 爬取管理</title>
<script type="module" crossorigin src="/assets/index-CYFclJJJ.js"></script>
<script type="module" crossorigin src="/assets/index-CJeEgrKF.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-D1EwdjSH.css">
</head>
<body>
+27 -10
View File
@@ -690,22 +690,14 @@ func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) {
}
// AddPriorityURL 添加一条 priority 条目(key = URLvalue = PriorityEntry)。
// 若已存在(且未访问)则忽略
// 每次手动添加都会重新爬取,不做去重检查
func (d *DB) AddPriorityURL(entry PriorityEntry) error {
return d.db.Update(func(tx *bolt.Tx) error {
k := []byte(entry.URL)
existing := tx.Bucket(bucketPriority).Get(k)
if existing != nil {
var e PriorityEntry
if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited {
return nil // 已存在且未访问,忽略
}
}
data, err := marshalCompress(entry)
if err != nil {
return err
}
return tx.Bucket(bucketPriority).Put(k, data)
return tx.Bucket(bucketPriority).Put([]byte(entry.URL), data)
})
}
@@ -716,6 +708,31 @@ func (d *DB) RemovePriorityURL(url string) error {
})
}
// MarkPriorityURLVisited 将指定 URL 的 priority 条目标记为已访问。
// 用于 priority 爬取完成后标记,避免 RemovePriorityURL 后同一 URL 被重复添加。
func (d *DB) MarkPriorityURLVisited(url string) error {
return d.db.Update(func(tx *bolt.Tx) error {
k := []byte(url)
v := tx.Bucket(bucketPriority).Get(k)
if v == nil {
return nil // 条目不存在,无需处理
}
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil
}
if e.Visited {
return nil // 已是 visited 状态
}
e.Visited = true
data, err := marshalCompress(e)
if err != nil {
return err
}
return tx.Bucket(bucketPriority).Put(k, data)
})
}
// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。
func (d *DB) ClearVisitedPriorityURLs() error {
return d.db.Update(func(tx *bolt.Tx) error {