优先爬取的队列立即执行

This commit is contained in:
2026-04-10 13:14:12 +08:00
parent c5da660c7f
commit 5b8b256b35
8 changed files with 150 additions and 6 deletions
+106
View File
@@ -43,6 +43,12 @@ const (
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
)
// Priority Worker 配置
const (
priorityMaxWorkers = 50 // Priority 独立 goroutine 上限(突破主 workers
priorityQueueSize = 100 // Priority 任务队列缓冲大小
)
// Crawler 编排整个 BFS 爬取流程。
type Crawler struct {
fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流)
@@ -62,6 +68,16 @@ type Crawler struct {
// 运行时活跃线程计数(atomic,每轮 epoch 自动归零前重新开始计数)
activeWorkers int64
// ---- Priority Worker(独立 goroutine,不受主 workers 限制)----
priorityCh chan string // Priority URL 任务队列
prioritySem chan struct{} // Priority 信号量(上限 priorityMaxWorkers
priorityWg sync.WaitGroup // 等待所有 Priority goroutine 结束
priorityMu sync.RWMutex // 保护 priorityStats
priorityStats struct {
pending int64 // 待处理的 Priority URL 数量(入队但未开始)
active int64 // 正在处理的 Priority URL 数量
}
}
// 全局活跃线程计数器(跨包可读,无需持有 Crawler 引用)
@@ -78,6 +94,21 @@ func GlobalActiveWorkers() int64 {
return atomic.LoadInt64(&globalActiveWorkers)
}
// 全局 Priority Status 快照(跨 Crawler 实例共享,用于外部监控)
var globalPriorityStatus struct {
pending int64
active int64
}
// GlobalPriorityStatus 返回当前全局 Priority Worker 状态。
func GlobalPriorityStatus() map[string]interface{} {
return map[string]interface{}{
"pending": atomic.LoadInt64(&globalPriorityStatus.pending),
"active": atomic.LoadInt64(&globalPriorityStatus.active),
"max_workers": priorityMaxWorkers,
}
}
// New 创建一个 Crawler 实例。
// prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。
func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler {
@@ -87,7 +118,11 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C
analyzer: a,
prosperMap: prosperMap,
visited: make(map[string]bool),
priorityCh: make(chan string, priorityQueueSize),
prioritySem: make(chan struct{}, priorityMaxWorkers),
}
// 启动 Priority Worker(独立 goroutine,不受主 workers 限制)
go c.runPriorityWorker()
// 启动时从 gate bucket 预热已爬取的 URL 集合(程序重启后不会重复爬取)
c.warmVisited()
return c
@@ -155,6 +190,77 @@ func (c *Crawler) markVisited(url string) {
c.visitedMu.Unlock()
}
// ---- Priority Worker(突破 workers 上限,立即爬取高优先级 URL)----
// runPriorityWorker 独立处理高优先级 URL,不受主 workers 限制。
// 当用户插入 Priority URL 时,立即触发爬取(不等待 epoch 调度)。
func (c *Crawler) runPriorityWorker() {
for url := range c.priorityCh {
c.prioritySem <- struct{}{} // 获取令牌(阻塞直到有空闲槽位)
c.priorityMu.Lock()
c.priorityStats.active++
c.priorityStats.pending--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.pending, -1)
atomic.AddInt64(&globalPriorityStatus.active, 1)
c.priorityWg.Add(1)
go func(rawURL string) {
defer c.priorityWg.Done()
defer func() { <-c.prioritySem }() // 释放令牌
defer func() {
c.priorityMu.Lock()
c.priorityStats.active--
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.active, -1)
}()
// 关键:强制移除 visited 标记(即使未过期也要重新爬取)
c.visitedMu.Lock()
delete(c.visited, rawURL)
c.visitedMu.Unlock()
log.Printf("[crawler] priority crawl started: %s", rawURL)
// 直接调用 visitURL,绕过队列调度
hrefs := c.visitURL(rawURL)
// 收集的子链接正常进入 BFS 队列(由调用方处理,这里只负责爬取本身)
log.Printf("[crawler] priority crawl done: %s (%d child links)", rawURL, len(hrefs))
// 清理已访问的 priority URL(防止重复爬取)
_ = c.db.RemovePriorityURL(rawURL)
}(url)
}
}
// TriggerPriorityCrawl 立即触发高优先级爬取(突破 workers 上限)。
// 适合用户手动插入 URL 时立即响应。
func (c *Crawler) TriggerPriorityCrawl(url string) {
select {
case c.priorityCh <- url:
c.priorityMu.Lock()
c.priorityStats.pending++
c.priorityMu.Unlock()
atomic.AddInt64(&globalPriorityStatus.pending, 1)
log.Printf("[crawler] priority crawl triggered: %s", url)
default:
// 队列满了,降级到正常处理
log.Printf("[crawler] priority queue full, deferring to normal: %s", url)
}
}
// GetPriorityStatus 返回 Priority Worker 的实时状态。
func (c *Crawler) GetPriorityStatus() map[string]interface{} {
c.priorityMu.RLock()
defer c.priorityMu.RUnlock()
return map[string]interface{}{
"pending": c.priorityStats.pending,
"active": c.priorityStats.active,
"max_workers": priorityMaxWorkers,
}
}
// isVisited 检查 URL 是否已访问(线程安全)。
func (c *Crawler) isVisited(url string) bool {
c.visitedMu.RLock()
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+2 -2
View File
@@ -5,8 +5,8 @@
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SESE 爬取管理</title>
<script type="module" crossorigin src="/assets/index-BtgFYOR8.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-B024bVSr.css">
<script type="module" crossorigin src="/assets/index-ClaCiNQl.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Dr22_wUg.css">
</head>
<body>
<div id="app"></div>
+1
View File
@@ -175,6 +175,7 @@ func main() {
// 从 info 服务获取繁荣表快照,用于调度优先级决策
prosperMap := infoSvc.ProsperMap()
crawl := crawler.New(db, anal, prosperMap)
searchSrv.SetCrawler(crawl) // 注入爬虫用于 Priority URL 立即触发
go crawl.Run(*entryURL, config.MaxEpoch())
log.Println("all modules started — press Ctrl-C to stop")
+37
View File
@@ -61,6 +61,11 @@ type Server struct {
Status() map[string]interface{}
RunNow() error
}
// crawler 爬虫实例(用于 Priority URL 添加时立即触发爬取)
crawler interface {
TriggerPriorityCrawl(url string)
}
}
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
@@ -89,6 +94,13 @@ func (s *Server) SetBacklinkRunner(r interface {
s.backlinkRunner = r
}
// SetCrawler 注入爬虫实例(用于 Priority URL 添加时立即触发爬取)。
func (s *Server) SetCrawler(c interface {
TriggerPriorityCrawl(url string)
}) {
s.crawler = c
}
// runPeriodicFlush 每隔 FlushIntervalSeconds 秒触发一次刷盘。
func (s *Server) runPeriodicFlush() {
ticker := time.NewTicker(time.Duration(config.FlushIntervalSeconds()) * time.Second)
@@ -113,6 +125,7 @@ func (s *Server) Handler() http.Handler {
mux.HandleFunc("/admin/recent", s.handleAdminRecent)
mux.HandleFunc("/admin/stats", s.handleAdminStats)
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
mux.HandleFunc("/admin/priority/status", s.handleAdminPriorityStatus)
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
mux.HandleFunc("/admin/pending", s.handleAdminPending)
mux.HandleFunc("/admin/workers", s.handleAdminWorkers)
@@ -513,6 +526,12 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
// 关键:立即触发 priority crawl(突破 workers 上限)
if s.crawler != nil {
s.crawler.TriggerPriorityCrawl(entry.URL)
}
json.NewEncoder(w).Encode(map[string]string{"status": "added", "url": entry.URL})
case http.MethodDelete:
@@ -533,6 +552,24 @@ func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
}
}
// handleAdminPriorityStatus 返回 Priority Worker 的实时状态。
// GET: 返回 pending(待处理)、active(正在处理)、max_workers(独立上限)
func (s *Server) handleAdminPriorityStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if r.Method != http.MethodGet && r.Method != http.MethodOptions {
http.Error(w, `{"error":"method not allowed"}`, 405)
return
}
if r.Method == http.MethodOptions {
w.WriteHeader(204)
return
}
json.NewEncoder(w).Encode(crawler.GlobalPriorityStatus())
}
// handleAdminFlush 强制刷盘。
func (s *Server) handleAdminFlush(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
Submodule sese-engine-ui updated: 60e897f110...2407ce0d05