刷盘与搜索都是独立线程

This commit is contained in:
2026-04-09 02:33:31 +08:00
parent 7f2cb12e71
commit 5c90ad246c
2 changed files with 30 additions and 7 deletions
+5 -4
View File
@@ -250,7 +250,7 @@ func (r *Runner) aggregate(filter func(*storage.SiteInfo) bool, stats *siteStats
}) })
// 向量余弦过滤:去除 Server 类型特征偏离核心向量的域名(可能是噪音/作弊) // 向量余弦过滤:去除 Server 类型特征偏离核心向量的域名(可能是噪音/作弊)
d = vectorFilter(d, vectors, desc) d = vectorFilter(d, vectors, desc, r.storagePath)
// 最终清理:分数 ≤ 0.16 的域名不写入(低于此阈值认为不繁荣) // 最终清理:分数 ≤ 0.16 的域名不写入(低于此阈值认为不繁荣)
for k, v := range d { for k, v := range d {
@@ -313,7 +313,7 @@ func (r *Runner) aggregateWithScores(scores map[string]float64, stats *siteStats
return nil return nil
}) })
d = vectorFilter(d, vectors, desc) d = vectorFilter(d, vectors, desc, r.storagePath)
for k, v := range d { for k, v := range d {
if v <= 0.16 { if v <= 0.16 {
delete(d, k) delete(d, k)
@@ -326,7 +326,7 @@ func (r *Runner) aggregateWithScores(scores map[string]float64, stats *siteStats
// vectorFilter 使用余弦相似度过滤域名分数:保留与核心 Server 类型向量相似的域名。 // vectorFilter 使用余弦相似度过滤域名分数:保留与核心 Server 类型向量相似的域名。
// 与核心方向偏离的域名可能是噪音(如作弊农场、链接买卖)。 // 与核心方向偏离的域名可能是噪音(如作弊农场、链接买卖)。
func vectorFilter(d map[string]float64, vectors map[string][]float32, desc string) map[string]float64 { func vectorFilter(d map[string]float64, vectors map[string][]float32, desc string, storagePath string) map[string]float64 {
// 计算全网站的 Server 类型核心向量(所有向量求和) // 计算全网站的 Server 类型核心向量(所有向量求和)
core := make([]float64, 64) core := make([]float64, 64)
for _, vec := range vectors { for _, vec := range vectors {
@@ -370,7 +370,8 @@ func vectorFilter(d map[string]float64, vectors map[string][]float32, desc strin
cosMap[k] = dot32_64(vec, core) / (vn * coreNorm) cosMap[k] = dot32_64(vec, core) / (vn * coreNorm)
} }
} }
_ = writeJSON(desc+"_cos.json", cosMap) cosPath := filepath.Join(storagePath, desc+"_cos.json")
_ = writeJSON(cosPath, cosMap)
return newD return newD
} }
+25 -3
View File
@@ -37,7 +37,7 @@ type Server struct {
// 以下为收获服务(harvester)内嵌字段 // 以下为收获服务(harvester)内嵌字段
mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目 mem map[string][]storage.IndexEntry // 内存索引聚合器:关键词 → [权重, URL] 条目
memMu sync.Mutex // 保护内存索引的并发写入 memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
rowCount int64 // 内存中累计的索引条目总数(触发刷盘) rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
} }
@@ -508,7 +508,7 @@ func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]sear
return nil, 0 return nil, 0
} }
// 加载每个词对应的倒排索引条目 // 加载每个词对应的倒排索引条目(磁盘 + 内存)
type tokenIndex struct { type tokenIndex struct {
token string token string
entries []storage.IndexEntry entries []storage.IndexEntry
@@ -516,8 +516,29 @@ func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]sear
} }
tokenIndexes := make([]tokenIndex, 0, len(tokens)) tokenIndexes := make([]tokenIndex, 0, len(tokens))
maxURLsPerKey := config.MaxURLsPerKey() maxURLsPerKey := config.MaxURLsPerKey()
// 读锁保护内存索引访问(与刷盘互斥,但多个搜索可并发)
s.memMu.RLock()
for _, t := range tokens { for _, t := range tokens {
entries, _ := s.db.GetIndex(t) // 1. 从磁盘加载
diskEntries, _ := s.db.GetIndex(t)
// 2. 从内存加载(尚未刷盘的数据)
memEntries := s.mem[t]
// 3. 合并(内存数据优先,因为更新)
entries := make([]storage.IndexEntry, 0, len(diskEntries)+len(memEntries))
seen := make(map[string]bool, len(diskEntries)+len(memEntries))
for _, e := range memEntries {
if !seen[e.URL] {
entries = append(entries, e)
seen[e.URL] = true
}
}
for _, e := range diskEntries {
if !seen[e.URL] {
entries = append(entries, e)
seen[e.URL] = true
}
}
// 计算缺省权重:当条目数达到上限时,权重低于第 MaxURLsPerKey 名的条目使用缺省权重 // 计算缺省权重:当条目数达到上限时,权重低于第 MaxURLsPerKey 名的条目使用缺省权重
defVal := 1.0 / 10000 * float64(max(100, len(entries))) / float64(maxURLsPerKey) defVal := 1.0 / 10000 * float64(max(100, len(entries))) / float64(maxURLsPerKey)
if len(entries) >= maxURLsPerKey { if len(entries) >= maxURLsPerKey {
@@ -530,6 +551,7 @@ func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]sear
} }
tokenIndexes = append(tokenIndexes, tokenIndex{t, entries, defVal}) tokenIndexes = append(tokenIndexes, tokenIndex{t, entries, defVal})
} }
s.memMu.RUnlock()
// 构建 URL → (词 → 权重) 映射,收集所有候选 URL // 构建 URL → (词 → 权重) 映射,收集所有候选 URL
urlWeights := make(map[string]map[string]float64) urlWeights := make(map[string]map[string]float64)