动态修改线程数量
This commit is contained in:
@@ -6,10 +6,14 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// configMu 保护 Global 的运行时修改(动态调参场景)。
|
||||||
|
var configMu sync.RWMutex
|
||||||
|
|
||||||
// Config 是完整的配置结构体
|
// Config 是完整的配置结构体
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Index IndexConfig `yaml:"index"`
|
Index IndexConfig `yaml:"index"`
|
||||||
@@ -171,6 +175,19 @@ func CrawlerCooldown() int { return Global.Crawler.Cooldown }
|
|||||||
// CrawlerWorkers 返回配置值
|
// CrawlerWorkers 返回配置值
|
||||||
func CrawlerWorkers() int { return Global.Crawler.Workers }
|
func CrawlerWorkers() int { return Global.Crawler.Workers }
|
||||||
|
|
||||||
|
// SetCrawlerWorkers 在运行时动态修改爬虫并发数(线程安全)。
|
||||||
|
func SetCrawlerWorkers(n int) {
|
||||||
|
if n < 1 {
|
||||||
|
n = 1
|
||||||
|
}
|
||||||
|
if n > 500 {
|
||||||
|
n = 500
|
||||||
|
}
|
||||||
|
configMu.Lock()
|
||||||
|
Global.Crawler.Workers = n
|
||||||
|
configMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// CrawlFocus 返回配置值
|
// CrawlFocus 返回配置值
|
||||||
func CrawlFocus() float64 { return Global.Crawler.CrawlFocus }
|
func CrawlFocus() float64 { return Global.Crawler.CrawlFocus }
|
||||||
|
|
||||||
|
|||||||
+7
-4
@@ -104,12 +104,15 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
|||||||
queue := []string{entryURL} // 当前轮次的待抓取队列
|
queue := []string{entryURL} // 当前轮次的待抓取队列
|
||||||
|
|
||||||
for ep := 0; ep < maxEpoch; ep++ {
|
for ep := 0; ep < maxEpoch; ep++ {
|
||||||
|
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
|
||||||
|
workers := config.CrawlerWorkers()
|
||||||
|
|
||||||
// 每轮开始前:拉取 priority URLs,插入队列前端
|
// 每轮开始前:拉取 priority URLs,插入队列前端
|
||||||
priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue)
|
priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue)
|
||||||
if priorityAdded > 0 {
|
if priorityAdded > 0 {
|
||||||
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority)", ep+1, maxEpoch, len(queue), priorityAdded)
|
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue))
|
log.Printf("[crawler] epoch %d/%d queue=%d workers=%d", ep+1, maxEpoch, len(queue), workers)
|
||||||
}
|
}
|
||||||
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
|
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
|
||||||
for _, u := range queue {
|
for _, u := range queue {
|
||||||
@@ -123,8 +126,8 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
)
|
)
|
||||||
|
|
||||||
// 信号量:限制同时并发数不超过配置的工作线程数
|
// 信号量:限制同时并发数(使用上方读取的 workers 值)
|
||||||
sem := make(chan struct{}, config.CrawlerWorkers())
|
sem := make(chan struct{}, workers)
|
||||||
for _, u := range queue {
|
for _, u := range queue {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位)
|
sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位)
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ func (s *Server) Handler() http.Handler {
|
|||||||
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
|
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
|
||||||
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
|
mux.HandleFunc("/admin/flush", s.handleAdminFlush)
|
||||||
mux.HandleFunc("/admin/pending", s.handleAdminPending)
|
mux.HandleFunc("/admin/pending", s.handleAdminPending)
|
||||||
|
mux.HandleFunc("/admin/workers", s.handleAdminWorkers)
|
||||||
// 静态文件(SPA fallback)
|
// 静态文件(SPA fallback)
|
||||||
mux.Handle("/", spaHandler{dist: "dist"})
|
mux.Handle("/", spaHandler{dist: "dist"})
|
||||||
return mux
|
return mux
|
||||||
@@ -408,6 +409,51 @@ func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) {
|
|||||||
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
|
json.NewEncoder(w).Encode(map[string]int64{"pending": count})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleAdminWorkers 查看和动态调整爬虫并发线程数。
|
||||||
|
// GET 返回当前 workers 值
|
||||||
|
// POST {"workers": N} 动态修改(范围 1~500),下一轮 epoch 立即生效
|
||||||
|
func (s *Server) handleAdminWorkers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
|
||||||
|
switch r.Method {
|
||||||
|
case http.MethodOptions:
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
|
||||||
|
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||||||
|
w.WriteHeader(204)
|
||||||
|
return
|
||||||
|
|
||||||
|
case http.MethodGet:
|
||||||
|
json.NewEncoder(w).Encode(map[string]int{"workers": config.CrawlerWorkers()})
|
||||||
|
|
||||||
|
case http.MethodPost:
|
||||||
|
var body struct {
|
||||||
|
Workers int `json:"workers"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||||
|
http.Error(w, `{"error":"invalid json"}`, 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if body.Workers < 1 || body.Workers > 500 {
|
||||||
|
http.Error(w, `{"error":"workers must be between 1 and 500"}`, 400)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
old := config.CrawlerWorkers()
|
||||||
|
config.SetCrawlerWorkers(body.Workers)
|
||||||
|
log.Printf("[admin] workers changed: %d → %d (takes effect next epoch)", old, body.Workers)
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"status": "ok",
|
||||||
|
"old": old,
|
||||||
|
"current": config.CrawlerWorkers(),
|
||||||
|
})
|
||||||
|
|
||||||
|
default:
|
||||||
|
w.Header().Set("Allow", "GET,POST")
|
||||||
|
http.Error(w, `{"error":"method not allowed"}`, 405)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---- 搜索处理器 ----
|
// ---- 搜索处理器 ----
|
||||||
|
|
||||||
// searchResponse 是搜索 API 的 JSON 响应结构。
|
// searchResponse 是搜索 API 的 JSON 响应结构。
|
||||||
|
|||||||
Reference in New Issue
Block a user