up
This commit is contained in:
+96
-58
@@ -40,6 +40,10 @@ type Server struct {
|
|||||||
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
||||||
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
||||||
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
||||||
|
|
||||||
|
// 统计缓存(后台定期更新,避免 API 阻塞)
|
||||||
|
statsCache map[string]any // 缓存的统计结果
|
||||||
|
statsCacheMu sync.RWMutex // 保护统计缓存
|
||||||
}
|
}
|
||||||
|
|
||||||
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
|
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
|
||||||
@@ -52,9 +56,12 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server {
|
|||||||
httpCli: &http.Client{
|
httpCli: &http.Client{
|
||||||
Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second,
|
Timeout: time.Duration(config.OnlineSnippetTimeout()) * time.Second,
|
||||||
},
|
},
|
||||||
|
statsCache: make(map[string]any),
|
||||||
}
|
}
|
||||||
// 启动定期刷盘 goroutine
|
// 启动定期刷盘 goroutine
|
||||||
go s.runPeriodicFlush()
|
go s.runPeriodicFlush()
|
||||||
|
// 启动统计缓存更新 goroutine(每 60 秒更新一次)
|
||||||
|
go s.runStatsCacheUpdater()
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +74,84 @@ func (s *Server) runPeriodicFlush() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runStatsCacheUpdater 定期更新统计缓存(后台线程,避免 API 阻塞)。
|
||||||
|
func (s *Server) runStatsCacheUpdater() {
|
||||||
|
ticker := time.NewTicker(60 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
// 立即执行一次
|
||||||
|
s.updateStatsCache()
|
||||||
|
for range ticker.C {
|
||||||
|
s.updateStatsCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateStatsCache 计算统计信息并更新缓存。
|
||||||
|
func (s *Server) updateStatsCache() {
|
||||||
|
domainCount := make(map[string]int)
|
||||||
|
langCount := make(map[string]int)
|
||||||
|
totalWords := 0
|
||||||
|
total := 0
|
||||||
|
|
||||||
|
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
|
||||||
|
total++
|
||||||
|
domain := netloc(url)
|
||||||
|
domainCount[domain]++
|
||||||
|
totalWords += len(snippet.Text)
|
||||||
|
|
||||||
|
siteInfo, _ := s.db.GetSiteInfo(domain)
|
||||||
|
if siteInfo != nil {
|
||||||
|
for lang, ratio := range siteInfo.Languages {
|
||||||
|
if ratio > 0.1 {
|
||||||
|
langCount[lang]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// 排序取 Top
|
||||||
|
type kv struct{ k string; v int }
|
||||||
|
topDomains := make([]kv, 0, len(domainCount))
|
||||||
|
for k, v := range domainCount {
|
||||||
|
topDomains = append(topDomains, kv{k, v})
|
||||||
|
}
|
||||||
|
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
|
||||||
|
if len(topDomains) > 20 {
|
||||||
|
topDomains = topDomains[:20]
|
||||||
|
}
|
||||||
|
topLangs := make([]kv, 0, len(langCount))
|
||||||
|
for k, v := range langCount {
|
||||||
|
topLangs = append(topLangs, kv{k, v})
|
||||||
|
}
|
||||||
|
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
|
||||||
|
if len(topLangs) > 10 {
|
||||||
|
topLangs = topLangs[:10]
|
||||||
|
}
|
||||||
|
|
||||||
|
domainsMap := make(map[string]int)
|
||||||
|
for _, kv := range topDomains {
|
||||||
|
domainsMap[kv.k] = kv.v
|
||||||
|
}
|
||||||
|
langsMap := make(map[string]int)
|
||||||
|
for _, kv := range topLangs {
|
||||||
|
langsMap[kv.k] = kv.v
|
||||||
|
}
|
||||||
|
|
||||||
|
cache := map[string]any{
|
||||||
|
"total_urls": total,
|
||||||
|
"total_words": totalWords,
|
||||||
|
"total_domains": len(domainCount),
|
||||||
|
"domains": domainsMap,
|
||||||
|
"languages": langsMap,
|
||||||
|
"pending": atomic.LoadInt64(&s.rowCount),
|
||||||
|
"cached_at": time.Now().Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.statsCacheMu.Lock()
|
||||||
|
s.statsCache = cache
|
||||||
|
s.statsCacheMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// Flush 公开的刷盘方法,供定时任务和外部调用。
|
// Flush 公开的刷盘方法,供定时任务和外部调用。
|
||||||
func (s *Server) Flush() { s.flush() }
|
func (s *Server) Flush() { s.flush() }
|
||||||
|
|
||||||
@@ -234,71 +319,24 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) {
|
|||||||
json.NewEncoder(w).Encode(resp)
|
json.NewEncoder(w).Encode(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。
|
// handleAdminStats 返回缓存的全局统计(后台线程定期更新,API 不阻塞)。
|
||||||
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
|
||||||
domainCount := make(map[string]int)
|
s.statsCacheMu.RLock()
|
||||||
langCount := make(map[string]int)
|
cache := s.statsCache
|
||||||
totalWords := 0
|
s.statsCacheMu.RUnlock()
|
||||||
total := 0
|
|
||||||
|
|
||||||
s.db.ForEachSnippet(func(url string, snippet *storage.SnippetEntry) error {
|
// 如果缓存为空,同步计算一次(首次启动时)
|
||||||
total++
|
if len(cache) == 0 {
|
||||||
domain := netloc(url)
|
s.updateStatsCache()
|
||||||
domainCount[domain]++
|
s.statsCacheMu.RLock()
|
||||||
totalWords += len(snippet.Text)
|
cache = s.statsCache
|
||||||
|
s.statsCacheMu.RUnlock()
|
||||||
siteInfo, _ := s.db.GetSiteInfo(domain)
|
|
||||||
if siteInfo != nil {
|
|
||||||
for lang, ratio := range siteInfo.Languages {
|
|
||||||
if ratio > 0.1 {
|
|
||||||
langCount[lang]++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// 排序取 Top
|
|
||||||
type kv struct{ k string; v int }
|
|
||||||
topDomains := make([]kv, 0, len(domainCount))
|
|
||||||
for k, v := range domainCount {
|
|
||||||
topDomains = append(topDomains, kv{k, v})
|
|
||||||
}
|
|
||||||
sort.Slice(topDomains, func(i, j int) bool { return topDomains[i].v > topDomains[j].v })
|
|
||||||
if len(topDomains) > 20 {
|
|
||||||
topDomains = topDomains[:20]
|
|
||||||
}
|
|
||||||
topLangs := make([]kv, 0, len(langCount))
|
|
||||||
for k, v := range langCount {
|
|
||||||
topLangs = append(topLangs, kv{k, v})
|
|
||||||
}
|
|
||||||
sort.Slice(topLangs, func(i, j int) bool { return topLangs[i].v > topLangs[j].v })
|
|
||||||
if len(topLangs) > 10 {
|
|
||||||
topLangs = topLangs[:10]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
domainsMap := make(map[string]int)
|
json.NewEncoder(w).Encode(cache)
|
||||||
for _, kv := range topDomains {
|
|
||||||
domainsMap[kv.k] = kv.v
|
|
||||||
}
|
|
||||||
langsMap := make(map[string]int)
|
|
||||||
for _, kv := range topLangs {
|
|
||||||
langsMap[kv.k] = kv.v
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := map[string]any{
|
|
||||||
"total_urls": total,
|
|
||||||
"total_words": totalWords,
|
|
||||||
"total_domains": len(domainCount), // 真实的域名总数(非Top 20)
|
|
||||||
"domains": domainsMap,
|
|
||||||
"languages": langsMap,
|
|
||||||
"pending": atomic.LoadInt64(&s.rowCount), // 内存中未刷盘的索引条目数
|
|
||||||
}
|
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
|
// handleAdminPriority 处理 /admin/priority 的 GET/POST/DELETE 请求。
|
||||||
|
|||||||
Reference in New Issue
Block a user