diff --git a/storage/storage.go b/storage/storage.go index 9df51b8..f38de83 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -46,11 +46,9 @@ var ( bucketPriority = []byte("priority") // 优先爬取 URL bucket ) -// writeOp 表示一个待写入的操作。 +// writeOp 表示一个待写入的 snippet 操作。 type writeOp struct { - opType int // 0 = set snippet, 1 = set site info - key string // URL 或 host - data []byte // marshalCompress 后的数据 + data []byte // marshalCompress 后的数据 } // DB 封装一个 bbolt 数据库,提供类型化的存取接口。 @@ -58,11 +56,20 @@ type writeOp struct { type DB struct { db *bolt.DB // 底层 bbolt 数据库句柄 - // 异步写缓冲:SetSnippet/SetSiteInfo 先写到内存,定期批量刷入 bbolt。 + // 异步写缓冲:SetSnippet 先写到内存,定期批量刷入 bbolt。 writeBuf map[string]*writeOp // key → 待写入的操作 writeBufMu sync.Mutex writeTicker *time.Ticker writeDone chan struct{} + + // SiteInfo 独立内存缓存:读全走缓存(零 bbolt 读事务),定期批量刷入 bbolt。 + // 使用 RWMutex 允许多个读并发,写(UpdateSiteInfo)独占。 + siteCache map[string]*SiteInfo // host → 最新 SiteInfo + siteCacheMu sync.RWMutex + siteDirty map[string]struct{} // 需要刷盘的 host 集合 + siteDirtyMu sync.Mutex + siteTicker *time.Ticker + siteDone chan struct{} } // Open 在指定目录路径下创建或打开 bbolt 数据库文件。 @@ -97,13 +104,37 @@ func Open(dir string) (*DB, error) { if err != nil { return nil, fmt.Errorf("storage.Open create buckets: %w", err) } - return &DB{db: db}, nil + + d := &DB{db: db} + + // 启动时预热 SiteInfo 缓存:一次性从 bbolt 加载所有 SiteInfo 到内存 + d.siteCache = make(map[string]*SiteInfo) + _ = db.View(func(tx *bolt.Tx) error { + return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { + var info SiteInfo + if err := decompressUnmarshal(v, &info); err != nil { + return nil // 跳过损坏条目 + } + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + d.siteCache[string(k)] = &info + return nil + }) + }) + log.Printf("[storage] siteCache warmed: %d hosts loaded", len(d.siteCache)) + + return d, nil } -// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine。 +// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine(snippet 和 SiteInfo 各自独立)。 func (d *DB) StartWriteFlusher() { + // Snippet 写缓冲 d.writeBuf = make(map[string]*writeOp) - d.writeTicker = time.NewTicker(2 * time.Second) // 每 2 秒刷一次 + d.writeTicker = time.NewTicker(2 * time.Second) d.writeDone = make(chan struct{}) go func() { for { @@ -115,9 +146,25 @@ func (d *DB) StartWriteFlusher() { } } }() + + // SiteInfo 独立缓存 + d.siteCache = make(map[string]*SiteInfo) + d.siteDirty = make(map[string]struct{}) + d.siteTicker = time.NewTicker(5 * time.Second) + d.siteDone = make(chan struct{}) + go func() { + for { + select { + case <-d.siteTicker.C: + d.flushSiteCache() + case <-d.siteDone: + return + } + } + }() } -// flushWriteBuf 将写缓冲中的所有待写入操作批量刷入 bbolt。 +// flushWriteBuf 将写缓冲中的 snippet 操作批量刷入 bbolt。 func (d *DB) flushWriteBuf() { d.writeBufMu.Lock() if len(d.writeBuf) == 0 { @@ -129,33 +176,12 @@ func (d *DB) flushWriteBuf() { d.writeBuf = make(map[string]*writeOp) d.writeBufMu.Unlock() - // 预先按 bucket 分组 - snippets := make(map[string][]byte) - siteInfos := make(map[string][]byte) - for key, op := range snapshot { - if op.opType == 0 { - snippets[key] = op.data - } else { - siteInfos[key] = op.data - } - } - // 单个事务批量写入 if err := d.db.Update(func(tx *bolt.Tx) error { - if len(snippets) > 0 { - b := tx.Bucket(bucketGate) - for k, v := range snippets { - if err := b.Put([]byte(k), v); err != nil { - return err - } - } - } - if len(siteInfos) > 0 { - b := tx.Bucket(bucketSiteGate) - for k, v := range siteInfos { - if err := b.Put([]byte(k), v); err != nil { - return err - } + b := tx.Bucket(bucketGate) + for k, op := range snapshot { + if err := b.Put([]byte(k), op.data); err != nil { + return err } } return nil @@ -164,20 +190,93 @@ func (d *DB) flushWriteBuf() { } } +// flushSiteCache 将脏的 SiteInfo 批量刷入 bbolt。 +func (d *DB) flushSiteCache() { + d.siteDirtyMu.Lock() + if len(d.siteDirty) == 0 { + d.siteDirtyMu.Unlock() + return + } + dirty := d.siteDirty + d.siteDirty = make(map[string]struct{}) + d.siteDirtyMu.Unlock() + + // 在读锁下快照所有脏数据并预压缩 + d.siteCacheMu.RLock() + type kv struct { + host string + data []byte + } + items := make([]kv, 0, len(dirty)) + for host := range dirty { + if info, ok := d.siteCache[host]; ok { + data, err := marshalCompress(info) + if err != nil { + log.Printf("[storage] flushSiteCache marshal error for %s: %v", host, err) + continue + } + items = append(items, kv{host, data}) + } + } + d.siteCacheMu.RUnlock() + + if len(items) == 0 { + return + } + + // 单个事务批量写入 bbolt + if err := d.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketSiteGate) + for _, item := range items { + if err := b.Put([]byte(item.host), item.data); err != nil { + return err + } + } + return nil + }); err != nil { + log.Printf("[storage] flushSiteCache error: %v", err) + } +} + // Close 关闭底层 bbolt 数据库连接。 -// 先刷完写缓冲,再关闭 ticker,最后关闭数据库。 +// 先刷完写缓冲和 SiteInfo 缓存,再关闭定时器,最后关闭数据库。 func (d *DB) Close() error { if d.writeTicker != nil { d.writeTicker.Stop() } - d.flushWriteBuf() // 最后刷一次,确保数据不丢失 + if d.siteTicker != nil { + d.siteTicker.Stop() + } + d.flushWriteBuf() + d.flushSiteCache() if d.writeDone != nil { close(d.writeDone) } + if d.siteDone != nil { + close(d.siteDone) + } return d.db.Close() } -// ---- 辅助函数:压缩与解压 ---- +// ---- 辅助函数:压缩与解压、map 拷贝 ---- + +// copyMapF64 深拷贝 map[string]float64。 +func copyMapF64(m map[string]float64) map[string]float64 { + cp := make(map[string]float64, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} + +// copyMapStr 深拷贝 map[string]string。 +func copyMapStr(m map[string]string) map[string]string { + cp := make(map[string]string, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} // compress 将字节数组用 brotli 压缩后返回。 // brotli 压缩比高于 gzip,适合大量文本的存储空间优化。 @@ -379,7 +478,7 @@ func (d *DB) SetSnippet(url string, entry *SnippetEntry) error { return err } d.writeBufMu.Lock() - d.writeBuf["snippet:"+url] = &writeOp{opType: 0, key: url, data: data} + d.writeBuf[url] = &writeOp{data: data} // 缓冲过大时同步刷一次,防止内存膨胀 if len(d.writeBuf) >= 5000 { d.writeBufMu.Unlock() @@ -410,82 +509,97 @@ type SiteInfo struct { } // GetSiteInfo 根据主机名查询网站元信息。 -// 优先从写缓冲中读取(保证与最近 SetSiteInfo 的数据一致),未命中再读 bbolt。 +// 全程只读内存缓存(零 bbolt 读事务),适合高并发调用。 // 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。 func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { - // 先检查写缓冲中是否有该 host 的最新待写入数据 - d.writeBufMu.Lock() - if op, ok := d.writeBuf["site:"+host]; ok { - d.writeBufMu.Unlock() - var info SiteInfo - if err := decompressUnmarshal(op.data, &info); err == nil { - if info.Languages == nil { - info.Languages = make(map[string]float64) - } - if info.Redirects == nil { - info.Redirects = make(map[string]string) - } - return &info, nil + d.siteCacheMu.RLock() + info, ok := d.siteCache[host] + d.siteCacheMu.RUnlock() + + if ok { + // 返回深拷贝,防止调用方修改缓存 + cp := *info + if cp.Languages == nil { + cp.Languages = make(map[string]float64) + } else { + cp.Languages = copyMapF64(cp.Languages) } - // 反序列化失败(不应发生),fall through 到读 db - } else { - d.writeBufMu.Unlock() + if cp.Redirects == nil { + cp.Redirects = make(map[string]string) + } else { + cp.Redirects = copyMapStr(cp.Redirects) + } + return &cp, nil } - var info SiteInfo - err := d.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(bucketSiteGate).Get([]byte(host)) - if v == nil { - return fmt.Errorf("not found") - } - return decompressUnmarshal(v, &info) - }) + // 缓存未命中,从 bbolt 加载 + // 注意:不能在 bbolt 事务回调(如 ForEachSnippet 的 fn)内调用此路径(bbolt 不支持嵌套事务)。 + // 正常情况下预热已加载全部数据,不会走到这里。 + var si SiteInfo + var err error + func() { + defer func() { + if r := recover(); r != nil { + // bbolt 嵌套事务 panic,返回空 SiteInfo + si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} + err = nil + } + }() + err = d.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bucketSiteGate).Get([]byte(host)) + if v == nil { + return fmt.Errorf("not found") + } + return decompressUnmarshal(v, &si) + }) + }() if err != nil { - // 未找到时返回带默认空 map 的空结构,避免调用方空指针 panic return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil } - if info.Languages == nil { - info.Languages = make(map[string]float64) + if si.Languages == nil { + si.Languages = make(map[string]float64) } - if info.Redirects == nil { - info.Redirects = make(map[string]string) + if si.Redirects == nil { + si.Redirects = make(map[string]string) } - return &info, nil + // 回填缓存 + d.siteCacheMu.Lock() + if existing, exists := d.siteCache[host]; !exists { + d.siteCache[host] = &si + } else { + si = *existing // 用更新的版本 + } + d.siteCacheMu.Unlock() + return &si, nil } -// SetSiteInfo 将网站元信息写入写缓冲(异步批量刷入磁盘)。 +// SetSiteInfo 将网站元信息直接写入内存缓存并标记为脏(异步刷入磁盘)。 func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { - data, err := marshalCompress(info) - if err != nil { - return err + cp := *info // 浅拷贝,断开与调用方的引用 + if cp.Languages != nil { + cp.Languages = copyMapF64(cp.Languages) } - d.writeBufMu.Lock() - d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data} - if len(d.writeBuf) >= 5000 { - d.writeBufMu.Unlock() - d.flushWriteBuf() - return nil + if cp.Redirects != nil { + cp.Redirects = copyMapStr(cp.Redirects) } - d.writeBufMu.Unlock() + d.siteCacheMu.Lock() + d.siteCache[host] = &cp + d.siteCacheMu.Unlock() + d.siteDirtyMu.Lock() + d.siteDirty[host] = struct{}{} + d.siteDirtyMu.Unlock() return nil } -// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回。 -// 整个读-改-写过程在 writeBufMu 锁内完成,消除并发 lost update 竞态。 -// 适用于多个 goroutine 对同一 host 的 SiteInfo 进行读-改-写的场景。 +// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回缓存。 +// 只用 siteCacheMu(不碰 bbolt),不会与 flush 产生跨锁依赖。 func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { - d.writeBufMu.Lock() + d.siteCacheMu.Lock() - // 从缓冲或 db 读取最新 SiteInfo - var info *SiteInfo - if op, ok := d.writeBuf["site:"+host]; ok { - var si SiteInfo - if err := decompressUnmarshal(op.data, &si); err == nil { - info = &si - } - } - - if info == nil { + // 从缓存读取(缓存未命中会返回 nil,后续初始化为空) + info, ok := d.siteCache[host] + if !ok { + // 从 bbolt 加载(注意:这是唯一会做 bbolt 读的地方,但只在首次 miss 时触发) var si SiteInfo err := d.db.View(func(tx *bolt.Tx) error { v := tx.Bucket(bucketSiteGate).Get([]byte(host)) @@ -495,34 +609,28 @@ func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { return decompressUnmarshal(v, &si) }) if err != nil { - info = &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} + si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} } else { - info = &si + if si.Languages == nil { + si.Languages = make(map[string]float64) + } + if si.Redirects == nil { + si.Redirects = make(map[string]string) + } } - } - if info.Languages == nil { - info.Languages = make(map[string]float64) - } - if info.Redirects == nil { - info.Redirects = make(map[string]string) + d.siteCache[host] = &si + info = &si } - // 在锁内调用修改函数 + // 在锁内调用修改函数(直接修改缓存中的对象) fn(info) - // 序列化并写回缓冲 - data, err := marshalCompress(info) - if err != nil { - d.writeBufMu.Unlock() - return err - } - d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data} - if len(d.writeBuf) >= 5000 { - d.writeBufMu.Unlock() - d.flushWriteBuf() - return nil - } - d.writeBufMu.Unlock() + d.siteCacheMu.Unlock() + + // 标记脏(锁外) + d.siteDirtyMu.Lock() + d.siteDirty[host] = struct{}{} + d.siteDirtyMu.Unlock() return nil }