From ce2a91d9f505c325eb3a6a5a3d87af03174151b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Thu, 9 Apr 2026 11:28:43 +0800 Subject: [PATCH] up --- main.go | 5 +- search/server.go | 96 ++++++++++++++++++++++++++++++----- storage/storage.go | 124 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 200 insertions(+), 25 deletions(-) diff --git a/main.go b/main.go index f15aede..f5d6f05 100644 --- a/main.go +++ b/main.go @@ -52,7 +52,7 @@ func initConfig() error { index: max_urls_per_key: 11000 # 每个索引词最多保存的 URL 数量上限 max_same_domain_per_key: 20 # 同一域名在每个索引词下最多出现的次数 - big_clean_threshold: 10000000 # 内存中累计多少条索引后触发一次刷盘清理 + big_clean_threshold: 2000000 # 内存中累计多少条索引后触发一次刷盘清理 max_new_urls_per_key: 10000 # 每次刷盘时,每个索引词最多写入的新 URL 数量上限 min_urls_for_new_key: 3 # 新索引词如果 URL 数少于该值则丢弃,不写入磁盘 @@ -76,7 +76,7 @@ search: consecutive_key_weight: 1.3 # 连续关键词命中权重:多词连续出现时加分 backlink_weight: 1.0 # 反向链接权重:指向该 URL 的链接越多得分越高 server_port: 80 # 搜索服务和收获服务的统一 HTTP 监听端口 - flush_interval_seconds: 60 # 定期刷盘间隔(秒):将内存索引批量写入磁盘 + flush_interval_seconds: 30 # 定期刷盘间隔(秒):将内存索引批量写入磁盘 # 反向链接(PageRank 类)计算相关配置 backlink: @@ -140,6 +140,7 @@ func main() { log.Fatalf("failed to open storage: %v", err) } defer db.Close() + db.StartWriteFlusher() // 启动异步写缓冲后台刷盘 // ---- 2. Info 服务:加载繁荣表、调整表和屏蔽词 ---- infoSvc := info.New(*storageDir) diff --git a/search/server.go b/search/server.go index 2be27d0..68b475a 100644 --- a/search/server.go +++ b/search/server.go @@ -40,6 +40,11 @@ type Server struct { memMu sync.RWMutex // 保护内存索引的读写(刷盘时读操作不阻塞) rowCount int64 // 内存中累计的索引条目总数(触发刷盘) flushMu sync.Mutex // 确保同一时刻只有一个 flush 在执行 + + // flush 期间的索引读缓存:避免 mergeKey 对每个 key 单独开读事务 + indexCache map[string][]storage.IndexEntry + indexCacheMu sync.RWMutex + indexCacheHits int64 // 缓存命中计数(原子) } // New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 @@ -1205,22 +1210,53 @@ func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { return } s.memMu.Lock() + // 阶段 1:收集需要阈值检查的 key,快速释放锁 + type keyCheck struct { + word string + weight float32 + memLen int + entries []storage.IndexEntry + } + toCheck := make([]keyCheck, 0, len(payload.Keywords)) for _, kw := range payload.Keywords { - key := kw.Word - entries := s.mem[key] + entries := s.mem[kw.Word] if len(entries) > 15 { - low := s.lowThreshold(key) - if float64(kw.Weight) < low { - continue - } + toCheck = append(toCheck, keyCheck{kw.Word, kw.Weight, len(entries), entries}) + } else { + s.mem[kw.Word] = append(entries, storage.IndexEntry{ + Weight: kw.Weight, + URL: payload.URL, + }) + atomic.AddInt64(&s.rowCount, 1) } - s.mem[key] = append(entries, storage.IndexEntry{ - Weight: kw.Weight, - URL: payload.URL, - }) - atomic.AddInt64(&s.rowCount, 1) } s.memMu.Unlock() + + // 阶段 2:锁外读 db 做阈值检查(避免持锁时做慢 I/O) + if len(toCheck) > 0 { + s.memMu.Lock() + for _, kc := range toCheck { + // 重新读取最新长度(可能被其他请求修改) + current := s.mem[kc.word] + if len(current) > kc.memLen { + // 条目被其他请求增加了,重新检查 + low := s.lowThreshold(kc.word) + if float64(kc.weight) < low { + continue + } + } + low := s.lowThreshold(kc.word) + if float64(kc.weight) < low { + continue + } + s.mem[kc.word] = append(s.mem[kc.word], storage.IndexEntry{ + Weight: kc.weight, + URL: payload.URL, + }) + atomic.AddInt64(&s.rowCount, 1) + } + s.memMu.Unlock() + } if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold()) { go s.Flush() } @@ -1235,7 +1271,7 @@ func (s *Server) handleFlush(w http.ResponseWriter, r *http.Request) { // lowThreshold 返回某关键词在已有大量条目时,新条目所需的最低权重阈值。 func (s *Server) lowThreshold(key string) float64 { - existing, _ := s.db.GetIndex(key) + existing := s.getCachedIndex(key) maxURLsPerKey := config.MaxURLsPerKey() if len(existing) < maxURLsPerKey { return -1 @@ -1252,7 +1288,13 @@ func (s *Server) flush() { if !s.flushMu.TryLock() { return } - defer s.flushMu.Unlock() + defer func() { + s.flushMu.Unlock() + // 清除索引读缓存 + s.indexCacheMu.Lock() + s.indexCache = nil + s.indexCacheMu.Unlock() + }() s.memMu.Lock() snapshot := s.mem s.mem = make(map[string][]storage.IndexEntry) @@ -1260,6 +1302,17 @@ func (s *Server) flush() { s.memMu.Unlock() totalKeys := len(snapshot) log.Printf("[harvester] flushing %d keys", totalKeys) + + // 预热索引读缓存:一次 ForEachIndex 读取全部索引,避免 mergeKey 逐个读事务 + indexCache := make(map[string][]storage.IndexEntry, totalKeys) + s.db.ForEachIndex(func(keyword string, entries []storage.IndexEntry) error { + indexCache[keyword] = entries + return nil + }) + s.indexCacheMu.Lock() + s.indexCache = indexCache + s.indexCacheMu.Unlock() + log.Printf("[harvester] index cache warmed: %d keys loaded", len(indexCache)) items := make([]struct { key string entries []storage.IndexEntry @@ -1309,9 +1362,24 @@ func (s *Server) flush() { log.Printf("[harvester] flush done, %d keys written", len(batch)) } +// getCachedIndex 优先从读缓存获取索引,缓存未命中则读 db。 +func (s *Server) getCachedIndex(key string) []storage.IndexEntry { + s.indexCacheMu.RLock() + if s.indexCache != nil { + if entries, ok := s.indexCache[key]; ok { + s.indexCacheMu.RUnlock() + atomic.AddInt64(&s.indexCacheHits, 1) + return entries + } + } + s.indexCacheMu.RUnlock() + entries, _ := s.db.GetIndex(key) + return entries +} + // mergeKey 将新条目和磁盘已有条目合并后返回最终列表。 func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { - existing, _ := s.db.GetIndex(key) + existing := s.getCachedIndex(key) if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey() { return nil } diff --git a/storage/storage.go b/storage/storage.go index ac5213b..c42bc84 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -15,6 +15,8 @@ import ( "log" // 日志输出 "os" // 操作系统功能(创建目录等) "path/filepath" // 路径拼接 + "sync" // 互斥锁(保护写缓冲) + "time" // bbolt 超时配置和写缓冲定时器 "github.com/andybalholm/brotli" // Brotli 无损压缩库(用于压缩存储数据) bolt "go.etcd.io/bbolt" // BoltDB,纯 Go 嵌入式 KV 数据库 @@ -44,10 +46,23 @@ var ( bucketPriority = []byte("priority") // 优先爬取 URL bucket ) +// writeOp 表示一个待写入的操作。 +type writeOp struct { + opType int // 0 = set snippet, 1 = set site info + key string // URL 或 host + data []byte // marshalCompress 后的数据 +} + // DB 封装一个 bbolt 数据库,提供类型化的存取接口。 // bbolt 内部已实现并发安全,无需额外加锁。 type DB struct { db *bolt.DB // 底层 bbolt 数据库句柄 + + // 异步写缓冲:SetSnippet/SetSiteInfo 先写到内存,定期批量刷入 bbolt。 + writeBuf map[string]*writeOp // key → 待写入的操作 + writeBufMu sync.Mutex + writeTicker *time.Ticker + writeDone chan struct{} } // Open 在指定目录路径下创建或打开 bbolt 数据库文件。 @@ -60,7 +75,13 @@ func Open(dir string) (*DB, error) { // 拼接数据库文件路径:dir/sese.db path := filepath.Join(dir, "sese.db") // 打开/创建数据库文件,文件权限 0600(仅所有者可读写) - db, err := bolt.Open(path, 0o600, nil) + // NoSync: true — 不在每次写事务后 fsync,交由 OS 决定刷盘时机。 + // 在高并发写入场景下大幅减少磁盘 I/O 阻塞,代价是极端断电可能丢失最近几秒数据(可接受)。 + db, err := bolt.Open(path, 0o600, &bolt.Options{ + NoSync: true, + Timeout: 5 * time.Second, + PageSize: 4096, + }) if err != nil { return nil, fmt.Errorf("storage.Open bolt: %w", err) } @@ -79,8 +100,80 @@ func Open(dir string) (*DB, error) { return &DB{db: db}, nil } +// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine。 +func (d *DB) StartWriteFlusher() { + d.writeBuf = make(map[string]*writeOp) + d.writeTicker = time.NewTicker(2 * time.Second) // 每 2 秒刷一次 + d.writeDone = make(chan struct{}) + go func() { + for { + select { + case <-d.writeTicker.C: + d.flushWriteBuf() + case <-d.writeDone: + return + } + } + }() +} + +// flushWriteBuf 将写缓冲中的所有待写入操作批量刷入 bbolt。 +func (d *DB) flushWriteBuf() { + d.writeBufMu.Lock() + if len(d.writeBuf) == 0 { + d.writeBufMu.Unlock() + return + } + // 快照并清空缓冲 + snapshot := d.writeBuf + d.writeBuf = make(map[string]*writeOp) + d.writeBufMu.Unlock() + + // 预先按 bucket 分组 + snippets := make(map[string][]byte) + siteInfos := make(map[string][]byte) + for key, op := range snapshot { + if op.opType == 0 { + snippets[key] = op.data + } else { + siteInfos[key] = op.data + } + } + + // 单个事务批量写入 + if err := d.db.Update(func(tx *bolt.Tx) error { + if len(snippets) > 0 { + b := tx.Bucket(bucketGate) + for k, v := range snippets { + if err := b.Put([]byte(k), v); err != nil { + return err + } + } + } + if len(siteInfos) > 0 { + b := tx.Bucket(bucketSiteGate) + for k, v := range siteInfos { + if err := b.Put([]byte(k), v); err != nil { + return err + } + } + } + return nil + }); err != nil { + log.Printf("[storage] flushWriteBuf error: %v", err) + } +} + // Close 关闭底层 bbolt 数据库连接。 +// 先刷完写缓冲,再关闭 ticker,最后关闭数据库。 func (d *DB) Close() error { + if d.writeTicker != nil { + d.writeTicker.Stop() + } + d.flushWriteBuf() // 最后刷一次,确保数据不丢失 + if d.writeDone != nil { + close(d.writeDone) + } return d.db.Close() } @@ -279,15 +372,22 @@ func (d *DB) GetSnippet(url string) (*SnippetEntry, error) { return &entry, nil } -// SetSnippet 将某 URL 的摘要信息写入缓存(覆盖已有数据)。 +// SetSnippet 将某 URL 的摘要信息写入写缓冲(异步批量刷入磁盘)。 func (d *DB) SetSnippet(url string, entry *SnippetEntry) error { data, err := marshalCompress(entry) if err != nil { return err } - return d.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketGate).Put([]byte(url), data) - }) + d.writeBufMu.Lock() + d.writeBuf["snippet:"+url] = &writeOp{opType: 0, key: url, data: data} + // 缓冲过大时同步刷一次,防止内存膨胀 + if len(d.writeBuf) >= 5000 { + d.writeBufMu.Unlock() + d.flushWriteBuf() + return nil + } + d.writeBufMu.Unlock() + return nil } // ---- 网站之门(SiteGate):网站元信息相关方法 ---- @@ -333,15 +433,21 @@ func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { return &info, nil } -// SetSiteInfo 将某主机名的网站元信息写入存储(覆盖已有数据)。 +// SetSiteInfo 将网站元信息写入写缓冲(异步批量刷入磁盘)。 func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { data, err := marshalCompress(info) if err != nil { return err } - return d.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketSiteGate).Put([]byte(host), data) - }) + d.writeBufMu.Lock() + d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data} + if len(d.writeBuf) >= 5000 { + d.writeBufMu.Unlock() + d.flushWriteBuf() + return nil + } + d.writeBufMu.Unlock() + return nil } // ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。