防御一些爬虫陷阱
This commit is contained in:
+42
-8
@@ -50,6 +50,10 @@ type Crawler struct {
|
||||
prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值")
|
||||
stats Stats // 原子计数器
|
||||
|
||||
// visited 记录已访问的 URL 集合(跨 epoch 持久,启动时从 DB 预热)
|
||||
visited map[string]bool
|
||||
visitedMu sync.RWMutex // 保护 visited 的并发读写
|
||||
|
||||
// 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险)
|
||||
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
|
||||
circuitFailures int32 // 连续失败计数(atomic)
|
||||
@@ -76,18 +80,48 @@ func GlobalActiveWorkers() int64 {
|
||||
// New 创建一个 Crawler 实例。
|
||||
// prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。
|
||||
func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler {
|
||||
return &Crawler{
|
||||
c := &Crawler{
|
||||
fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second),
|
||||
db: db,
|
||||
analyzer: a,
|
||||
prosperMap: prosperMap,
|
||||
visited: make(map[string]bool),
|
||||
}
|
||||
// 启动时从 gate bucket 预热已爬取的 URL 集合(程序重启后不会重复爬取)
|
||||
c.warmVisited()
|
||||
return c
|
||||
}
|
||||
|
||||
// warmVisited 从 DB 的 gate bucket 加载所有已缓存的 URL 到 visited set。
|
||||
func (c *Crawler) warmVisited() {
|
||||
count := 0
|
||||
_ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error {
|
||||
c.visited[u] = true
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
log.Printf("[crawler] visited set warmed: %d URLs loaded", count)
|
||||
}
|
||||
|
||||
// markVisited 将 URL 标记为已访问(线程安全)。
|
||||
func (c *Crawler) markVisited(url string) {
|
||||
c.visitedMu.Lock()
|
||||
c.visited[url] = true
|
||||
c.visitedMu.Unlock()
|
||||
}
|
||||
|
||||
// isVisited 检查 URL 是否已访问(线程安全)。
|
||||
func (c *Crawler) isVisited(url string) bool {
|
||||
c.visitedMu.RLock()
|
||||
v := c.visited[url]
|
||||
c.visitedMu.RUnlock()
|
||||
return v
|
||||
}
|
||||
|
||||
// fetchAndApplyPriorityURLs 从数据库读取用户插入的 priority URLs,
|
||||
// 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。
|
||||
// 返回本次插入队列的 URL 数量。
|
||||
func (c *Crawler) fetchAndApplyPriorityURLs(visited map[string]bool, queue *[]string) int {
|
||||
func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int {
|
||||
entries, err := c.db.GetPriorityURLs()
|
||||
if err != nil || len(entries) == 0 {
|
||||
return 0
|
||||
@@ -95,7 +129,7 @@ func (c *Crawler) fetchAndApplyPriorityURLs(visited map[string]bool, queue *[]st
|
||||
|
||||
added := 0
|
||||
for _, e := range entries {
|
||||
if visited[e.URL] {
|
||||
if c.isVisited(e.URL) {
|
||||
_ = c.db.RemovePriorityURL(e.URL)
|
||||
continue
|
||||
}
|
||||
@@ -117,15 +151,15 @@ type URLWeight struct {
|
||||
// 各轮之间是串行的,每轮内并发抓取,按调度算法选择下一轮 URL。
|
||||
// 每轮开始前会检查 priority 队列,优先爬取用户插入的 URL。
|
||||
func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
||||
visited := make(map[string]bool) // 已访问 URL 集合(防止重复抓取)
|
||||
queue := []string{entryURL} // 当前轮次的待抓取队列
|
||||
c.markVisited(entryURL)
|
||||
queue := []string{entryURL}
|
||||
|
||||
for ep := 0; ep < maxEpoch; ep++ {
|
||||
// 每轮 epoch 从 config 读取最新 workers 值,支持运行时动态调整
|
||||
workers := config.CrawlerWorkers()
|
||||
|
||||
// 每轮开始前:拉取 priority URLs,插入队列前端
|
||||
priorityAdded := c.fetchAndApplyPriorityURLs(visited, &queue)
|
||||
priorityAdded := c.fetchAndApplyPriorityURLs(&queue)
|
||||
if priorityAdded > 0 {
|
||||
log.Printf("[crawler] epoch %d/%d queue=%d (+%d priority) workers=%d", ep+1, maxEpoch, len(queue), priorityAdded, workers)
|
||||
} else {
|
||||
@@ -133,7 +167,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
||||
}
|
||||
// 将本轮所有 URL 标记为已访问(防止下一轮重复入队)
|
||||
for _, u := range queue {
|
||||
visited[u] = true
|
||||
c.markVisited(u)
|
||||
}
|
||||
|
||||
// 并发抓取本轮所有 URL
|
||||
@@ -164,7 +198,7 @@ func (c *Crawler) Run(entryURL string, maxEpoch int) {
|
||||
w := 1.0 / float64(n)
|
||||
mu.Lock()
|
||||
for _, h := range hrefs {
|
||||
if !visited[h] {
|
||||
if !c.isVisited(h) {
|
||||
newLinks = append(newLinks, URLWeight{URL: h, Weight: w})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user