From 2e5876004b81e0d985bf0ae4891424b5fa1d29f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Thu, 9 Apr 2026 12:52:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A8=E6=80=81=E4=BF=AE=E6=94=B9=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=95=B0=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 17 +++++++++++++++++ crawler/crawler.go | 11 +++++++---- search/server.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index d060cf5..ee60685 100644 --- a/config/config.go +++ b/config/config.go @@ -6,10 +6,14 @@ import ( "fmt" "os" "path/filepath" + "sync" "gopkg.in/yaml.v3" ) +// configMu 保护 Global 的运行时修改(动态调参场景)。 +var configMu sync.RWMutex + // Config 是完整的配置结构体 type Config struct { Index IndexConfig `yaml:"index"` @@ -171,6 +175,19 @@ func CrawlerCooldown() int { return Global.Crawler.Cooldown } // CrawlerWorkers 返回配置值 func CrawlerWorkers() int { return Global.Crawler.Workers } +// SetCrawlerWorkers 在运行时动态修改爬虫并发数(线程安全)。 +func SetCrawlerWorkers(n int) { + if n < 1 { + n = 1 + } + if n > 500 { + n = 500 + } + configMu.Lock() + Global.Crawler.Workers = n + configMu.Unlock() +} + // CrawlFocus 返回配置值 func CrawlFocus() float64 { return Global.Crawler.CrawlFocus } diff --git a/crawler/crawler.go b/crawler/crawler.go index a7f71e1..f8bfdd8 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -104,12 +104,15 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { queue := []string{entryURL} // 当前轮次的待抓取队列 for ep := 0; ep < maxEpoch; ep++ { + // 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整 + workers := config.CrawlerWorkers() + // 每轮开始前:拉取 priority URLs,插入队列前端 priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue) if priorityAdded > 0 { - log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority)", ep+1, maxEpoch, len(queue), priorityAdded) + log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers) } else { - log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue)) + log.Printf("[crawler] epoch %d/%d queue=%d workers=%d", ep+1, maxEpoch, len(queue), workers) } // 将本轮所有 URL 标记为已访问(防止下一轮重复入队) for _, u := range queue { @@ -123,8 +126,8 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) { wg sync.WaitGroup ) - // 信号量:限制同时并发数不超过配置的工作线程数 - sem := make(chan struct{}, config.CrawlerWorkers()) + // 信号量:限制同时并发数(使用上方读取的 workers 值) + sem := make(chan struct{}, workers) for _, u := range queue { wg.Add(1) sem <- struct{}{} // 获取一个令牌(阻塞直到有空闲槽位) diff --git a/search/server.go b/search/server.go index 68b475a..207b05f 100644 --- a/search/server.go +++ b/search/server.go @@ -89,6 +89,7 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/admin/priority", s.handleAdminPriority) mux.HandleFunc("/admin/flush", s.handleAdminFlush) mux.HandleFunc("/admin/pending", s.handleAdminPending) + mux.HandleFunc("/admin/workers", s.handleAdminWorkers) // 静态文件(SPA fallback) mux.Handle("/", spaHandler{dist: "dist"}) return mux @@ -408,6 +409,51 @@ func (s *Server) handleAdminPending(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(map[string]int64{"pending": count}) } +// handleAdminWorkers 查看和动态调整爬虫并发线程数。 +// GET 返回当前 workers 值 +// POST {"workers": N} 动态修改(范围 1~500),下一轮 epoch 立即生效 +func (s *Server) handleAdminWorkers(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + switch r.Method { + case http.MethodOptions: + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(204) + return + + case http.MethodGet: + json.NewEncoder(w).Encode(map[string]int{"workers": config.CrawlerWorkers()}) + + case http.MethodPost: + var body struct { + Workers int `json:"workers"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, `{"error":"invalid json"}`, 400) + return + } + if body.Workers < 1 || body.Workers > 500 { + http.Error(w, `{"error":"workers must be between 1 and 500"}`, 400) + return + } + old := config.CrawlerWorkers() + config.SetCrawlerWorkers(body.Workers) + log.Printf("[admin] workers changed: %d → %d (takes effect next epoch)", old, body.Workers) + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "ok", + "old": old, + "current": config.CrawlerWorkers(), + }) + + default: + w.Header().Set("Allow", "GET,POST") + http.Error(w, `{"error":"method not allowed"}`, 405) + } +} + // ---- 搜索处理器 ---- // searchResponse 是搜索 API 的 JSON 响应结构。