up
This commit is contained in:
+82
-14
@@ -40,6 +40,11 @@ type Server struct {
|
||||
memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞)
|
||||
rowCount int64 // 内存中累计的索引条目总数(触发刷盘)
|
||||
flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行
|
||||
|
||||
// flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务
|
||||
indexCache map[string][]storage.IndexEntry
|
||||
indexCacheMu sync.RWMutex
|
||||
indexCacheHits int64 // 缓存命中计数(原子)
|
||||
}
|
||||
|
||||
// New 创建一个 search Server(内嵌收获服务,统一在同一端口)。
|
||||
@@ -1205,22 +1210,53 @@ func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
s.memMu.Lock()
|
||||
// 阶段 1:收集需要阈值检查的 key,快速释放锁
|
||||
type keyCheck struct {
|
||||
word string
|
||||
weight float32
|
||||
memLen int
|
||||
entries []storage.IndexEntry
|
||||
}
|
||||
toCheck := make([]keyCheck, 0, len(payload.Keywords))
|
||||
for _, kw := range payload.Keywords {
|
||||
key := kw.Word
|
||||
entries := s.mem[key]
|
||||
entries := s.mem[kw.Word]
|
||||
if len(entries) > 15 {
|
||||
low := s.lowThreshold(key)
|
||||
if float64(kw.Weight) < low {
|
||||
continue
|
||||
}
|
||||
toCheck = append(toCheck, keyCheck{kw.Word, kw.Weight, len(entries), entries})
|
||||
} else {
|
||||
s.mem[kw.Word] = append(entries, storage.IndexEntry{
|
||||
Weight: kw.Weight,
|
||||
URL: payload.URL,
|
||||
})
|
||||
atomic.AddInt64(&s.rowCount, 1)
|
||||
}
|
||||
s.mem[key] = append(entries, storage.IndexEntry{
|
||||
Weight: kw.Weight,
|
||||
URL: payload.URL,
|
||||
})
|
||||
atomic.AddInt64(&s.rowCount, 1)
|
||||
}
|
||||
s.memMu.Unlock()
|
||||
|
||||
// 阶段 2:锁外读 db 做阈值检查(避免持锁时做慢 I/O)
|
||||
if len(toCheck) > 0 {
|
||||
s.memMu.Lock()
|
||||
for _, kc := range toCheck {
|
||||
// 重新读取最新长度(可能被其他请求修改)
|
||||
current := s.mem[kc.word]
|
||||
if len(current) > kc.memLen {
|
||||
// 条目被其他请求增加了,重新检查
|
||||
low := s.lowThreshold(kc.word)
|
||||
if float64(kc.weight) < low {
|
||||
continue
|
||||
}
|
||||
}
|
||||
low := s.lowThreshold(kc.word)
|
||||
if float64(kc.weight) < low {
|
||||
continue
|
||||
}
|
||||
s.mem[kc.word] = append(s.mem[kc.word], storage.IndexEntry{
|
||||
Weight: kc.weight,
|
||||
URL: payload.URL,
|
||||
})
|
||||
atomic.AddInt64(&s.rowCount, 1)
|
||||
}
|
||||
s.memMu.Unlock()
|
||||
}
|
||||
if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold()) {
|
||||
go s.Flush()
|
||||
}
|
||||
@@ -1235,7 +1271,7 @@ func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。
|
||||
func (s *Server) lowThreshold(key string) float64 {
|
||||
existing, _ := s.db.GetIndex(key)
|
||||
existing := s.getCachedIndex(key)
|
||||
maxURLsPerKey := config.MaxURLsPerKey()
|
||||
if len(existing) < maxURLsPerKey {
|
||||
return -1
|
||||
@@ -1252,7 +1288,13 @@ func (s *Server) flush() {
|
||||
if !s.flushMu.TryLock() {
|
||||
return
|
||||
}
|
||||
defer s.flushMu.Unlock()
|
||||
defer func() {
|
||||
s.flushMu.Unlock()
|
||||
// 清除索引读缓存
|
||||
s.indexCacheMu.Lock()
|
||||
s.indexCache = nil
|
||||
s.indexCacheMu.Unlock()
|
||||
}()
|
||||
s.memMu.Lock()
|
||||
snapshot := s.mem
|
||||
s.mem = make(map[string][]storage.IndexEntry)
|
||||
@@ -1260,6 +1302,17 @@ func (s *Server) flush() {
|
||||
s.memMu.Unlock()
|
||||
totalKeys := len(snapshot)
|
||||
log.Printf("[harvester] flushing %d keys", totalKeys)
|
||||
|
||||
// 预热索引读缓存:一次 ForEachIndex 读取全部索引,避免 mergeKey 逐个读事务
|
||||
indexCache := make(map[string][]storage.IndexEntry, totalKeys)
|
||||
s.db.ForEachIndex(func(keyword string, entries []storage.IndexEntry) error {
|
||||
indexCache[keyword] = entries
|
||||
return nil
|
||||
})
|
||||
s.indexCacheMu.Lock()
|
||||
s.indexCache = indexCache
|
||||
s.indexCacheMu.Unlock()
|
||||
log.Printf("[harvester] index cache warmed: %d keys loaded", len(indexCache))
|
||||
items := make([]struct {
|
||||
key string
|
||||
entries []storage.IndexEntry
|
||||
@@ -1309,9 +1362,24 @@ func (s *Server) flush() {
|
||||
log.Printf("[harvester] flush done, %d keys written", len(batch))
|
||||
}
|
||||
|
||||
// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。
|
||||
func (s *Server) getCachedIndex(key string) []storage.IndexEntry {
|
||||
s.indexCacheMu.RLock()
|
||||
if s.indexCache != nil {
|
||||
if entries, ok := s.indexCache[key]; ok {
|
||||
s.indexCacheMu.RUnlock()
|
||||
atomic.AddInt64(&s.indexCacheHits, 1)
|
||||
return entries
|
||||
}
|
||||
}
|
||||
s.indexCacheMu.RUnlock()
|
||||
entries, _ := s.db.GetIndex(key)
|
||||
return entries
|
||||
}
|
||||
|
||||
// mergeKey 将新条目和磁盘已有条目合并后返回最终列表。
|
||||
func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry {
|
||||
existing, _ := s.db.GetIndex(key)
|
||||
existing := s.getCachedIndex(key)
|
||||
if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey() {
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user