恢复,去除缓存机制

This commit is contained in:
2026-04-09 09:28:55 +08:00
parent c96d622366
commit a36c51de1e
+60 -111
View File
@@ -40,12 +40,6 @@ type Server struct {
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
// 统计缓存(按需更新:请求到来时触发,5s 内有结果则用新值,否则返回旧缓存)
statsCache map[string]any // 缓存的统计结果
statsCacheMu sync.RWMutex // 保护统计缓存及更新状态
statsUpdating bool // 是否正在后台更新(防止重复启动)
statsUpdateDone chan struct{} // 当前更新完成时关闭此 channel
}
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
@@ -58,7 +52,6 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
httpCli: &http.Client{
Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second,
},
statsCache: make(map[string]any),
}
// 启动定期刷盘 goroutine
go s.runPeriodicFlush()
@@ -74,99 +67,6 @@ func (s *Server) runPeriodicFlush() {
}
}
// triggerStatsUpdate 触发一次后台统计更新(若已在更新中则不重复启动)。
// 返回一个 channel,调用方可等待本次更新完成。
func (s *Server) triggerStatsUpdate() <-chan struct{} {
s.statsCacheMu.Lock()
defer s.statsCacheMu.Unlock()
if s.statsUpdating {
// 已有更新在跑,直接返回其 done channel
return s.statsUpdateDone
}
// 开启新一轮更新
done := make(chan struct{})
s.statsUpdating = true
s.statsUpdateDone = done
go func() {
s.updateStatsCache()
s.statsCacheMu.Lock()
s.statsUpdating = false
s.statsCacheMu.Unlock()
close(done)
}()
return done
}
// updateStatsCache 计算统计信息并更新缓存。
func (s *Server) updateStatsCache() {
domainCount := make(map[string]int)
langCount := make(map[string]int)
totalWords := 0
total := 0
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
total++
domain := netloc(url)
domainCount[domain]++
totalWords += len(snippet.Text)
siteInfo, _ := s.db.GetSiteInfo(domain)
if siteInfo != nil {
for lang, ratio := range siteInfo.Languages {
if ratio > 0.1 {
langCount[lang]++
}
}
}
return nil
})
// 排序取 Top
type kv struct{ k string; v int }
topDomains := make([]kv, 0, len(domainCount))
for k, v := range domainCount {
topDomains = append(topDomains, kv{k, v})
}
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
if len(topDomains) > 20 {
topDomains = topDomains[:20]
}
topLangs := make([]kv, 0, len(langCount))
for k, v := range langCount {
topLangs = append(topLangs, kv{k, v})
}
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
if len(topLangs) > 10 {
topLangs = topLangs[:10]
}
domainsMap := make(map[string]int)
for _, kv := range topDomains {
domainsMap[kv.k] = kv.v
}
langsMap := make(map[string]int)
for _, kv := range topLangs {
langsMap[kv.k] = kv.v
}
cache := map[string]any{
"total_urls": total,
"total_words": totalWords,
"total_domains": len(domainCount),
"domains": domainsMap,
"languages": langsMap,
"pending": atomic.LoadInt64(&s.rowCount),
"cached_at": time.Now().Unix(),
}
s.statsCacheMu.Lock()
s.statsCache = cache
s.statsCacheMu.Unlock()
}
// Flush 公开的刷盘方法,供定时任务和外部调用。
func (s *Server) Flush() { s.flush() }
@@ -334,25 +234,74 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp)
}
// handleAdminStats 返回全局统计:触发后台更新,5s 内有结果返回新值,否则返回旧缓存
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
done := s.triggerStatsUpdate()
domainCount := make(map[string]int)
langCount := make(map[string]int)
totalWords := 0
total := 0
select {
case <-done:
// 更新完成,返回最新缓存
case <-time.After(5 * time.Second):
// 超时,使用旧缓存
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
total++
domain := netloc(url)
domainCount[domain]++
totalWords += len(snippet.Text)
siteInfo, _ := s.db.GetSiteInfo(domain)
if siteInfo != nil {
for lang, ratio := range siteInfo.Languages {
if ratio > 0.1 {
langCount[lang]++
}
}
}
return nil
})
// 排序取 Top
type kv struct {
k string
v int
}
topDomains := make([]kv, 0, len(domainCount))
for k, v := range domainCount {
topDomains = append(topDomains, kv{k, v})
}
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
if len(topDomains) > 20 {
topDomains = topDomains[:20]
}
topLangs := make([]kv, 0, len(langCount))
for k, v := range langCount {
topLangs = append(topLangs, kv{k, v})
}
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
if len(topLangs) > 10 {
topLangs = topLangs[:10]
}
s.statsCacheMu.RLock()
cache := s.statsCache
s.statsCacheMu.RUnlock()
domainsMap := make(map[string]int)
for _, kv := range topDomains {
domainsMap[kv.k] = kv.v
}
langsMap := make(map[string]int)
for _, kv := range topLangs {
langsMap[kv.k] = kv.v
}
json.NewEncoder(w).Encode(cache)
resp := map[string]any{
"total_urls": total,
"total_words": totalWords,
"total_domains": len(domainCount), // 真实的域名总数(非Top 20
"domains": domainsMap,
"languages": langsMap,
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
}
json.NewEncoder(w).Encode(resp)
}
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。