diff --git a/crawler/crawler.go b/crawler/crawler.go index 3aea8c0..0bdb8b8 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -66,6 +66,29 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C } } +// fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs, +// 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。 +// 返回本次插入队列的 URL 数量。 +func (c *Crawler) fetchAndApplyPriorityURLs(visited map[string]bool, queue *[]string) int { + entries, err := c.db.GetPriorityURLs() + if err != nil || len(entries) == 0 { + return 0 + } + + added := 0 + for _, e := range entries { + if visited[e.URL] { + _ = c.db.RemovePriorityURL(e.URL) + continue + } + *queue = append([]string{e.URL}, *queue...) + added++ + } + + _ = c.db.ClearVisitedPriorityURLs() + return added +} + // URLWeight 将 URL 和发现权重打包在一起,用于调度决策。 type URLWeight struct { URL string // 待访问的 URL @@ -74,12 +97,19 @@ type URLWeight struct { // Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。 // 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。 +// 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。 func (c *Crawler) Run(entryURL string, maxEpoch int) { visited := make(map[string]bool) // 已访问 URL 集合(防止重复抓取) queue := []string{entryURL} // 当前轮次的待抓取队列 for ep := 0; ep < maxEpoch; ep++ { - log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue)) + // 每轮开始前:拉取 priority URLs,插入队列前端 + priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue) + if priorityAdded > 0 { + log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority)", ep+1, maxEpoch, len(queue), priorityAdded) + } else { + log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue)) + } // 将本轮所有 URL 标记为已访问(防止下一轮重复入队) for _, u := range queue { visited[u] = true diff --git a/search/server.go b/search/server.go index 70e3f74..40f1384 100644 --- a/search/server.go +++ b/search/server.go @@ -49,6 +49,7 @@ func (s *Server) Handler() http.Handler { mux.HandleFunc("/search", s.handleSearch) mux.HandleFunc("/admin/recent", s.handleAdminRecent) mux.HandleFunc("/admin/stats", s.handleAdminStats) + mux.HandleFunc("/admin/priority", s.handleAdminPriority) return mux } @@ -201,6 +202,86 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resp) } +// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。 +// GET: 返回所有未访问的 priority 条目列表 +// POST: 添加一条 URL 或域名(body: {url: "..."}) +// DELETE: 删除指定 URL(query: ?url=...) +func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + switch r.Method { + case http.MethodOptions: + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET,POST,DELETE,OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(204) + return + + case http.MethodGet: + entries, err := s.db.GetPriorityURLs() + if err != nil { + http.Error(w, `{"error":"`+err.Error()+`"}`, 500) + return + } + if entries == nil { + entries = []storage.PriorityEntry{} + } + json.NewEncoder(w).Encode(map[string]any{"items": entries, "count": len(entries)}) + + case http.MethodPost: + var body struct { + URL string `json:"url"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.URL) == "" { + http.Error(w, `{"error":"missing url field"}`, 400) + return + } + + raw := strings.TrimSpace(body.URL) + entry := storage.PriorityEntry{AddedAt: time.Now().Unix()} + + // 判断是完整 URL 还是纯域名 + if strings.Contains(raw, "/") || strings.HasPrefix(raw, "http") { + u, err := url.Parse(raw) + if err != nil || u.Host == "" { + http.Error(w, `{"error":"invalid url"}`, 400) + return + } + if u.Scheme == "" { + u.Scheme = "https" + } + entry.URL = u.String() + entry.IsDomain = false + } else { + entry.URL = "https://" + raw + entry.IsDomain = true + } + + if err := s.db.AddPriorityURL(entry); err != nil { + http.Error(w, `{"error":"`+err.Error()+`"}`, 500) + return + } + json.NewEncoder(w).Encode(map[string]string{"status": "added", "url": entry.URL}) + + case http.MethodDelete: + urlParam := r.URL.Query().Get("url") + if urlParam == "" { + http.Error(w, `{"error":"missing url parameter"}`, 400) + return + } + if err := s.db.RemovePriorityURL(urlParam); err != nil { + http.Error(w, `{"error":"`+err.Error()+`"}`, 500) + return + } + json.NewEncoder(w).Encode(map[string]string{"status": "removed"}) + + default: + w.Header().Set("Allow", "GET,POST,DELETE") + http.Error(w, `{"error":"method not allowed"}`, 405) + } +} + // ---- 搜索处理器 ---- // searchResponse 是搜索 API 的 JSON 响应结构。 diff --git a/storage/storage.go b/storage/storage.go index edebe58..13f7fd6 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -35,11 +35,12 @@ type SnippetEntry struct { Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳 } -// 三个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节) +// 四个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节) var ( - bucketIndex = []byte("index") // 倒排索引 bucket - bucketGate = []byte("gate") // URL 摘要缓存 bucket - bucketSiteGate = []byte("site_gate") // 网站元信息 bucket + bucketIndex = []byte("index") // 倒排索引 bucket + bucketGate = []byte("gate") // URL 摘要缓存 bucket + bucketSiteGate = []byte("site_gate") // 网站元信息 bucket + bucketPriority = []byte("priority") // 优先爬取 URL bucket ) // DB 封装一个 bbolt 数据库,提供类型化的存取接口。 @@ -62,9 +63,9 @@ func Open(dir string) (*DB, error) { if err != nil { return nil, fmt.Errorf("storage.Open bolt: %w", err) } - // 启动时确保三个 bucket 都存在(不存在则创建) + // 启动时确保四个 bucket 都存在(不存在则创建) err = db.Update(func(tx *bolt.Tx) error { - for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate} { + for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} { if _, err := tx.CreateBucketIfNotExists(b); err != nil { return err } @@ -328,3 +329,74 @@ func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) erro }) }) } + +// ---- 优先爬取队列(Priority Queue)相关方法 ---- + +// PriorityEntry 记录一条待优先爬取的 URL 或域名。 +type PriorityEntry struct { + URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL) + IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL) + AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳 + Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记) +} + +// GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。 +func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) { + var entries []PriorityEntry + err := d.db.View(func(tx *bolt.Tx) error { + return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { + var e PriorityEntry + if err := decompressUnmarshal(v, &e); err != nil { + return nil // 跳过损坏条目 + } + if !e.Visited { + entries = append(entries, e) + } + return nil + }) + }) + return entries, err +} + +// AddPriorityURL 添加一条 priority 条目(key = URL,value = PriorityEntry)。 +// 若已存在(且未访问)则忽略。 +func (d *DB) AddPriorityURL(entry PriorityEntry) error { + return d.db.Update(func(tx *bolt.Tx) error { + k := []byte(entry.URL) + existing := tx.Bucket(bucketPriority).Get(k) + if existing != nil { + var e PriorityEntry + if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited { + return nil // 已存在且未访问,忽略 + } + } + data, err := marshalCompress(entry) + if err != nil { + return err + } + return tx.Bucket(bucketPriority).Put(k, data) + }) +} + +// RemovePriorityURL 删除指定 URL 的 priority 条目。 +func (d *DB) RemovePriorityURL(url string) error { + return d.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketPriority).Delete([]byte(url)) + }) +} + +// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。 +func (d *DB) ClearVisitedPriorityURLs() error { + return d.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { + var e PriorityEntry + if err := decompressUnmarshal(v, &e); err != nil { + return nil + } + if e.Visited { + return tx.Bucket(bucketPriority).Delete(k) + } + return nil + }) + }) +}