增加搜索功能

This commit is contained in:
2026-04-08 19:04:15 +08:00
parent 1d3570a505
commit 6637dff254
3 changed files with 190 additions and 7 deletions
+31 -1
View File
@@ -66,6 +66,29 @@ func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *C
} }
} }
// fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs
// 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。
// 返回本次插入队列的 URL 数量。
func (c *Crawler) fetchAndApplyPriorityURLs(visited map[string]bool, queue *[]string) int {
entries, err := c.db.GetPriorityURLs()
if err != nil || len(entries) == 0 {
return 0
}
added := 0
for _, e := range entries {
if visited[e.URL] {
_ = c.db.RemovePriorityURL(e.URL)
continue
}
*queue = append([]string{e.URL}, *queue...)
added++
}
_ = c.db.ClearVisitedPriorityURLs()
return added
}
// URLWeight 将 URL 和发现权重打包在一起,用于调度决策。 // URLWeight 将 URL 和发现权重打包在一起,用于调度决策。
type URLWeight struct { type URLWeight struct {
URL string // 待访问的 URL URL string // 待访问的 URL
@@ -74,12 +97,19 @@ type URLWeight struct {
// Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。 // Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。
// 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。 // 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。
// 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。
func (c *Crawler) Run(entryURL string, maxEpoch int) { func (c *Crawler) Run(entryURL string, maxEpoch int) {
visited := make(map[string]bool) // 已访问 URL 集合(防止重复抓取) visited := make(map[string]bool) // 已访问 URL 集合(防止重复抓取)
queue := []string{entryURL} // 当前轮次的待抓取队列 queue := []string{entryURL} // 当前轮次的待抓取队列
for ep := 0; ep < maxEpoch; ep++ { for ep := 0; ep < maxEpoch; ep++ {
log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue)) // 每轮开始前:拉取 priority URLs,插入队列前端
priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue)
if priorityAdded > 0 {
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority)", ep+1, maxEpoch, len(queue), priorityAdded)
} else {
log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue))
}
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队) // 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
for _, u := range queue { for _, u := range queue {
visited[u] = true visited[u] = true
+81
View File
@@ -49,6 +49,7 @@ func (s *Server) Handler() http.Handler {
mux.HandleFunc("/search", s.handleSearch) mux.HandleFunc("/search", s.handleSearch)
mux.HandleFunc("/admin/recent", s.handleAdminRecent) mux.HandleFunc("/admin/recent", s.handleAdminRecent)
mux.HandleFunc("/admin/stats", s.handleAdminStats) mux.HandleFunc("/admin/stats", s.handleAdminStats)
mux.HandleFunc("/admin/priority", s.handleAdminPriority)
return mux return mux
} }
@@ -201,6 +202,86 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
} }
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
// GET: 返回所有未访问的 priority 条目列表
// POST: 添加一条 URL 或域名(body: {url: "..."}
// DELETE: 删除指定 URLquery: ?url=...
func (s *Server) handleAdminPriority(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
switch r.Method {
case http.MethodOptions:
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,DELETE,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.WriteHeader(204)
return
case http.MethodGet:
entries, err := s.db.GetPriorityURLs()
if err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
if entries == nil {
entries = []storage.PriorityEntry{}
}
json.NewEncoder(w).Encode(map[string]any{"items": entries, "count": len(entries)})
case http.MethodPost:
var body struct {
URL string `json:"url"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.URL) == "" {
http.Error(w, `{"error":"missing url field"}`, 400)
return
}
raw := strings.TrimSpace(body.URL)
entry := storage.PriorityEntry{AddedAt: time.Now().Unix()}
// 判断是完整 URL 还是纯域名
if strings.Contains(raw, "/") || strings.HasPrefix(raw, "http") {
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
http.Error(w, `{"error":"invalid url"}`, 400)
return
}
if u.Scheme == "" {
u.Scheme = "https"
}
entry.URL = u.String()
entry.IsDomain = false
} else {
entry.URL = "https://" + raw
entry.IsDomain = true
}
if err := s.db.AddPriorityURL(entry); err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
json.NewEncoder(w).Encode(map[string]string{"status": "added", "url": entry.URL})
case http.MethodDelete:
urlParam := r.URL.Query().Get("url")
if urlParam == "" {
http.Error(w, `{"error":"missing url parameter"}`, 400)
return
}
if err := s.db.RemovePriorityURL(urlParam); err != nil {
http.Error(w, `{"error":"`+err.Error()+`"}`, 500)
return
}
json.NewEncoder(w).Encode(map[string]string{"status": "removed"})
default:
w.Header().Set("Allow", "GET,POST,DELETE")
http.Error(w, `{"error":"method not allowed"}`, 405)
}
}
// ---- 搜索处理器 ---- // ---- 搜索处理器 ----
// searchResponse 是搜索 API 的 JSON 响应结构。 // searchResponse 是搜索 API 的 JSON 响应结构。
+78 -6
View File
@@ -35,11 +35,12 @@ type SnippetEntry struct {
Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳 Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳
} }
// 个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节) // 个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节)
var ( var (
bucketIndex = []byte("index") // 倒排索引 bucket bucketIndex = []byte("index") // 倒排索引 bucket
bucketGate = []byte("gate") // URL 摘要缓存 bucket bucketGate = []byte("gate") // URL 摘要缓存 bucket
bucketSiteGate = []byte("site_gate") // 网站元信息 bucket bucketSiteGate = []byte("site_gate") // 网站元信息 bucket
bucketPriority = []byte("priority") // 优先爬取 URL bucket
) )
// DB 封装一个 bbolt 数据库,提供类型化的存取接口。 // DB 封装一个 bbolt 数据库,提供类型化的存取接口。
@@ -62,9 +63,9 @@ func Open(dir string) (*DB, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("storage.Open bolt: %w", err) return nil, fmt.Errorf("storage.Open bolt: %w", err)
} }
// 启动时确保个 bucket 都存在(不存在则创建) // 启动时确保个 bucket 都存在(不存在则创建)
err = db.Update(func(tx *bolt.Tx) error { err = db.Update(func(tx *bolt.Tx) error {
for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate} { for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} {
if _, err := tx.CreateBucketIfNotExists(b); err != nil { if _, err := tx.CreateBucketIfNotExists(b); err != nil {
return err return err
} }
@@ -328,3 +329,74 @@ func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) erro
}) })
}) })
} }
// ---- 优先爬取队列(Priority Queue)相关方法 ----
// PriorityEntry 记录一条待优先爬取的 URL 或域名。
type PriorityEntry struct {
URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL)
IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL
AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳
Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记)
}
// GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。
func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) {
var entries []PriorityEntry
err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil // 跳过损坏条目
}
if !e.Visited {
entries = append(entries, e)
}
return nil
})
})
return entries, err
}
// AddPriorityURL 添加一条 priority 条目(key = URLvalue = PriorityEntry)。
// 若已存在(且未访问)则忽略。
func (d *DB) AddPriorityURL(entry PriorityEntry) error {
return d.db.Update(func(tx *bolt.Tx) error {
k := []byte(entry.URL)
existing := tx.Bucket(bucketPriority).Get(k)
if existing != nil {
var e PriorityEntry
if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited {
return nil // 已存在且未访问,忽略
}
}
data, err := marshalCompress(entry)
if err != nil {
return err
}
return tx.Bucket(bucketPriority).Put(k, data)
})
}
// RemovePriorityURL 删除指定 URL 的 priority 条目。
func (d *DB) RemovePriorityURL(url string) error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).Delete([]byte(url))
})
}
// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。
func (d *DB) ClearVisitedPriorityURLs() error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil
}
if e.Visited {
return tx.Bucket(bucketPriority).Delete(k)
}
return nil
})
})
}