This commit is contained in:
2026-04-09 00:14:55 +08:00
parent 223177b1dd
commit 439d0c1cb6
9 changed files with 88 additions and 46 deletions
+24 -25
View File
@@ -3,24 +3,24 @@
package crawler
import (
"bytes" // 字节缓冲(构造 HTTP POST 请求体)
"context" // context 超时控制
"encoding/json" // JSON 序列化(发送关键词数据到收获服务)
"fmt" // 格式化(构造目标地址)
"log" // 日志输出
"math" // 数学运算(指数衰减、质量评分)
"math/rand" // 随机数(加权采样、队列打乱)
"net/http" // HTTP 客户端(POST 数据到收获服务)
"net/url" // URL 解析
"strings" // 字符串操作
"sync" // 互斥锁(保护并发收集结果)
"sync/atomic" // 原子操作(计数器,无锁并发更新)
"time" // 时间戳
"bytes" // 字节缓冲(构造 HTTP POST 请求体)
"context" // context 超时控制
"encoding/json" // JSON 序列化(发送关键词数据到收获服务)
"fmt" // 格式化(构造目标地址)
"log" // 日志输出
"math" // 数学运算(指数衰减、质量评分)
"math/rand" // 随机数(加权采样、队列打乱)
"net/http" // HTTP 客户端(POST 数据到收获服务)
"net/url" // URL 解析
"strings" // 字符串操作
"sync" // 互斥锁(保护并发收集结果)
"sync/atomic" // 原子操作(计数器,无锁并发更新)
"time" // 时间戳
"sese-engine/analyzer" // 文本分析和关键词提取
"sese-engine/config" // 全局配置常量
"sese-engine/parser" // HTML 解析(提取标题、正文、链接)
"sese-engine/storage" // 持久化存储
"sese-engine/config" // 全局配置常量
"sese-engine/parser" // HTML 解析(提取标题、正文、链接)
"sese-engine/storage" // 持久化存储
)
// Stats 存放爬虫实时统计计数器(使用 atomic 原子读取)。
@@ -39,7 +39,7 @@ const (
const (
circuitFailureThreshold = 5 // 连续失败多少次后触发熔断
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
circuitCooldownSeconds = 30 // 熔断持续时间(秒)
)
// Crawler 编排整个 BFS 爬取流程。
@@ -51,9 +51,9 @@ type Crawler struct {
stats Stats // 原子计数器
// 熔断器(全用 atomic,无 mutex,无慢 I/O 时持有锁的风险)
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
circuitFailures int32 // 连续失败计数(atomic
circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒)
circuitState int32 // circuitClosed | circuitOpen | circuitHalfOpen
circuitFailures int32 // 连续失败计数(atomic
circuitExpiry int64 // 熔断/半开截止 Unix 时间戳(秒)
}
// New 创建一个 Crawler 实例。
@@ -92,8 +92,8 @@ func (c *Crawler) fetchAndApplyPriorityURLs(visited map[string]bool, queue *[]st
// URLWeight 将 URL 和发现权重打包在一起,用于调度决策。
type URLWeight struct {
URL string // 待访问的 URL
Weight float64 // 发现权重(从父页面分得的"关注度",页面链接越多则每个分得越少)
URL string // 待访问的 URL
Weight float64 // 发现权重(从父页面分得的"关注度",页面链接越多则每个分得越少)
}
// Run 启动 BFS 爬取,从 entryURL 开始,执行最多 maxEpoch 轮。
@@ -399,8 +399,7 @@ func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) {
if failures >= circuitFailureThreshold {
atomic.StoreInt32(&c.circuitState, circuitOpen)
atomic.StoreInt64(&c.circuitExpiry, now+int64(circuitCooldownSeconds))
log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds",
failures, circuitCooldownSeconds)
//log.Printf("[crawler] circuit OPEN: harvest endpoint unreachable (%d failures), cooling for %ds",failures, circuitCooldownSeconds)
}
return
}
@@ -482,7 +481,7 @@ func (c *Crawler) schedule(links []URLWeight) []string {
}
// 根据目标繁荣占比计算普通 URL 应保留数量
expectedProsperRatio := config.ExpectedProsperRatio()
n := int(float64(len(prosperURLs)) * (1-expectedProsperRatio) / expectedProsperRatio)
n := int(float64(len(prosperURLs)) * (1 - expectedProsperRatio) / expectedProsperRatio)
if len(otherURLs) > n {
keep := max(len(otherURLs)-len(selected)/10, n)
if keep < len(otherURLs) {
File diff suppressed because one or more lines are too long
+6
View File
File diff suppressed because one or more lines are too long
+6
View File
File diff suppressed because one or more lines are too long
+6
View File
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+6
View File
File diff suppressed because one or more lines are too long
+2 -2
View File
@@ -5,8 +5,8 @@
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SESE 爬取管理</title>
<script type="module" crossorigin src="/assets/index-KhES34ts.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CO25_jVn.css">
<script type="module" crossorigin src="/assets/index-BomiJv32.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Bj4UMEhQ.css">
</head>
<body>
<div id="app"></div>
+22 -7
View File
@@ -290,11 +290,12 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
}
resp := map[string]any{
"total_urls": total,
"total_words": totalWords,
"domains": domainsMap,
"languages": langsMap,
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
"total_urls": total,
"total_words": totalWords,
"total_domains": len(domainCount), // 真实的域名总数(非Top 20
"domains": domainsMap,
"languages": langsMap,
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
}
json.NewEncoder(w).Encode(resp)
@@ -1232,11 +1233,12 @@ func (s *Server) flush() {
s.mem = make(map[string][]storage.IndexEntry)
atomic.StoreInt64(&s.rowCount, 0)
s.memMu.Unlock()
log.Printf("[harvester] flushing %d keys", len(snapshot))
totalKeys := len(snapshot)
log.Printf("[harvester] flushing %d keys", totalKeys)
items := make([]struct {
key string
entries []storage.IndexEntry
}, 0, len(snapshot))
}, 0, totalKeys)
for k, v := range snapshot {
items = append(items, struct {
key string
@@ -1250,6 +1252,14 @@ func (s *Server) flush() {
}
results := make(chan result, len(items))
sem := make(chan struct{}, 8)
processed := int64(0)
progressInterval := 1000
if totalKeys < 10000 {
progressInterval = totalKeys / 10
}
if progressInterval < 1 {
progressInterval = 1
}
for _, item := range items {
sem <- struct{}{}
go func(k string, newEntries []storage.IndexEntry) {
@@ -1262,6 +1272,11 @@ func (s *Server) flush() {
for range items {
r := <-results
batch[r.key] = r.entries
current := atomic.AddInt64(&processed, 1)
if int(current)%progressInterval == 0 || int(current) == totalKeys {
percent := float64(current) * 100 / float64(totalKeys)
log.Printf("[harvester] flush progress: %d/%d (%.1f%%)", current, totalKeys, percent)
}
}
if err := s.db.BatchSetIndex(batch); err != nil {
log.Printf("[harvester] flush write error: %v", err)