修复一个错误

This commit is contained in:
2026-04-10 00:51:31 +08:00
parent dddc445955
commit a13ba3e992
+74 -20
View File
@@ -51,6 +51,11 @@ type Server struct {
statsCache map[string]any statsCache map[string]any
statsCacheMu sync.RWMutex statsCacheMu sync.RWMutex
// recent 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt
recentCache map[int][]recentItem // limit → 预截取的结果列表
recentCacheMu sync.RWMutex
recentTotal int // 总条目数(不截取)
// backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发) // backlinkRunner 反向链接计算器(可为 nil,仅用于 admin 手动触发)
backlinkRunner interface { backlinkRunner interface {
Status() map[string]interface{} Status() map[string]interface{}
@@ -71,8 +76,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
} }
// 启动定期刷盘 goroutine // 启动定期刷盘 goroutine
go s.runPeriodicFlush() go s.runPeriodicFlush()
// 启动 stats 缓存定期刷新 goroutine // 启动 stats + recent 缓存定期刷新 goroutine
go s.runStatsCacheRefresher() go s.runCacheRefresher()
return s return s
} }
@@ -204,6 +209,7 @@ type recentItem struct {
} }
// handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。 // handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。
// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。
// 参数:limit(默认50,最大200)。 // 参数:limit(默认50,最大200)。
func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -219,31 +225,64 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
limit = 200 limit = 200
} }
s.recentCacheMu.RLock()
cached := s.recentCache[limit]
total := s.recentTotal
s.recentCacheMu.RUnlock()
if cached == nil {
// 缓存尚未就绪,返回空列表
json.NewEncoder(w).Encode(map[string]any{
"items": []recentItem{},
"total": 0,
})
return
}
json.NewEncoder(w).Encode(map[string]any{
"items": cached,
"total": total,
})
}
// refreshRecentCache 全量遍历 bbolt 计算 recent 快照,预截取常用 limit,存入 recentCache。
func (s *Server) refreshRecentCache() {
type entry struct { type entry struct {
url string url string
snippet *storage.SnippetEntry snippet *storage.SnippetEntry
siteInfo *storage.SiteInfo domain string
} }
var items []entry var items []entry
domainSet := make(map[string]bool)
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error { s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
siteInfo, _ := s.db.GetSiteInfo(netloc(url)) domain := netloc(url)
items = append(items, entry{url, snippet, siteInfo}) items = append(items, entry{url: url, snippet: snippet, domain: domain})
domainSet[domain] = true
return nil return nil
}) })
// 遍历结束后批量查 SiteInfo(避免嵌套事务)
siteInfoMap := make(map[string]*storage.SiteInfo)
for domain := range domainSet {
if si, _ := s.db.GetSiteInfo(domain); si != nil {
siteInfoMap[domain] = si
}
}
// 按时间倒序 // 按时间倒序
sort.Slice(items, func(i, j int) bool { sort.Slice(items, func(i, j int) bool {
return items[i].snippet.Timestamp > items[j].snippet.Timestamp return items[i].snippet.Timestamp > items[j].snippet.Timestamp
}) })
if len(items) > limit { // 转换为 recentItem
items = items[:limit] allItems := make([]recentItem, 0, len(items))
}
result := make([]recentItem, 0, len(items))
for _, e := range items { for _, e := range items {
lang := e.siteInfo.Languages lang := make(map[string]float64)
if si := siteInfoMap[e.domain]; si != nil {
lang = si.Languages
}
if lang == nil { if lang == nil {
lang = make(map[string]float64) lang = make(map[string]float64)
} }
@@ -251,22 +290,34 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
if len(desc) > 200 { if len(desc) > 200 {
desc = desc[:200] desc = desc[:200]
} }
result = append(result, recentItem{ allItems = append(allItems, recentItem{
URL: e.url, URL: e.url,
Title: e.snippet.Title, Title: e.snippet.Title,
Description: desc, Description: desc,
Domain: netloc(e.url), Domain: e.domain,
Language: lang, Language: lang,
WordCount: len(e.snippet.Text), WordCount: len(e.snippet.Text),
CrawledAt: e.snippet.Timestamp, CrawledAt: e.snippet.Timestamp,
}) })
} }
resp := map[string]any{ // 预截取常用 limit 值(50, 100, 200
"items": result, cache := make(map[int][]recentItem, 3)
"total": len(items), for _, l := range []int{50, 100, 200} {
if len(allItems) > l {
copied := make([]recentItem, l)
copy(copied, allItems[:l])
cache[l] = copied
} else {
cache[l] = allItems
} }
json.NewEncoder(w).Encode(resp) }
s.recentCacheMu.Lock()
s.recentCache = cache
s.recentTotal = len(allItems)
s.recentCacheMu.Unlock()
log.Printf("[recent] cache refreshed: %d items", len(allItems))
} }
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 // handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
@@ -390,13 +441,15 @@ func (s *Server) refreshStatsCache() {
log.Printf("[stats] cache refreshed: %d urls, %d domains, %d words", total, len(domainCount), totalWords) log.Printf("[stats] cache refreshed: %d urls, %d domains, %d words", total, len(domainCount), totalWords)
} }
// runStatsCacheRefresher 后台定时刷新 stats 缓存。 // runCacheRefresher 后台定时刷新 stats 和 recent 缓存。
func (s *Server) runStatsCacheRefresher() { // 统一由一个 goroutine 交替刷新,避免同时全量遍历 bbolt 造成压力。
func (s *Server) runCacheRefresher() {
interval := time.Duration(config.StatsRefreshInterval()) * time.Second interval := time.Duration(config.StatsRefreshInterval()) * time.Second
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
s.refreshStatsCache() s.refreshStatsCache()
s.refreshRecentCache()
} }
} }
@@ -1595,8 +1648,9 @@ func (s *Server) flush() {
log.Printf("[harvester] flush write error: %v", err) log.Printf("[harvester] flush write error: %v", err)
} }
log.Printf("[harvester] flush done, %d keys written", len(batch)) log.Printf("[harvester] flush done, %d keys written", len(batch))
// flush 完成后立即刷新 stats 缓存(确保数据实时性) // flush 完成后立即刷新 stats + recent 缓存(确保数据实时性)
go s.refreshStatsCache() go s.refreshStatsCache()
go s.refreshRecentCache()
} }
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。 // getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。