// Package storage provides the persistent index and site-info storage backed by bbolt. // storage 包提供基于 bbolt 的持久化存储,负责保存倒排索引、URL摘要缓存和网站元信息。 // // 索引空间(index bucket):key = 关键词(string),value = brotli 压缩的 JSON 数组,每项为 [权重, URL] 对。 // 融合之门(gate bucket):key = URL(string),value = brotli 压缩的 JSON 数组 [标题, 描述, 正文, 时间戳]。 // 网站之门(site_gate bucket):key = 主机名(string),value = brotli 压缩的 JSON SiteInfo 结构体。 // // Python 版使用自定义哈希桶结构;Go 版直接交由 bbolt 原生处理。 package storage import ( "encoding/json" // JSON 序列化/反序列化 "fmt" // 格式化错误信息 "io" // io.EOF 常量 "log" // 日志输出 "os" // 操作系统功能(创建目录等) "path/filepath" // 路径拼接 "sync" // 互斥锁(保护写缓冲) "time" // bbolt 超时配置和写缓冲定时器 "github.com/andybalholm/brotli" // Brotli 无损压缩库(用于压缩存储数据) bolt "go.etcd.io/bbolt" // BoltDB,纯 Go 嵌入式 KV 数据库 ) // IndexEntry 是倒排索引中的单个条目。 // 一条索引记录表示"某个 URL 与某个关键词的相关性权重"。 type IndexEntry struct { Weight float32 `json:"w"` // 该 URL 在该关键词下的得分/权重 URL string `json:"u"` // 网页 URL } // SnippetEntry 是 URL 对应的摘要信息缓存。 // 包含页面标题、描述、正文片段和抓取时间戳。 type SnippetEntry struct { Title string `json:"title"` // 网页标题 Description string `json:"desc"` // meta description 或自动生成的描述 Text string `json:"text"` // 正文前 N 字符的文本片段 Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳 } // 四个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节) var ( bucketIndex = []byte("index") // 倒排索引 bucket bucketGate = []byte("gate") // URL 摘要缓存 bucket bucketSiteGate = []byte("site_gate") // 网站元信息 bucket bucketPriority = []byte("priority") // 优先爬取 URL bucket ) // writeOp 表示一个待写入的 snippet 操作。 type writeOp struct { data []byte // marshalCompress 后的数据 } // DB 封装一个 bbolt 数据库,提供类型化的存取接口。 // bbolt 内部已实现并发安全,无需额外加锁。 type DB struct { db *bolt.DB // 底层 bbolt 数据库句柄 // 异步写缓冲:SetSnippet 先写到内存,定期批量刷入 bbolt。 writeBuf map[string]*writeOp // key → 待写入的操作 writeBufMu sync.Mutex writeTicker *time.Ticker writeDone chan struct{} // SiteInfo 独立内存缓存:读全走缓存(零 bbolt 读事务),定期批量刷入 bbolt。 // 使用 RWMutex 允许多个读并发,写(UpdateSiteInfo)独占。 siteCache map[string]*SiteInfo // host → 最新 SiteInfo siteCacheMu sync.RWMutex siteDirty map[string]struct{} // 需要刷盘的 host 集合 siteDirtyMu sync.Mutex siteTicker *time.Ticker siteDone chan struct{} } // Open 在指定目录路径下创建或打开 bbolt 数据库文件。 // 如果目录不存在会自动创建。数据库文件名为 sese.db。 func Open(dir string) (*DB, error) { // 确保存储目录存在(0775 权限:所有者读写执行,组用户读执行,其他读执行) if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("storage.Open mkdir: %w", err) } // 拼接数据库文件路径:dir/sese.db path := filepath.Join(dir, "sese.db") // 打开/创建数据库文件,文件权限 0600(仅所有者可读写) // NoSync: true — 不在每次写事务后 fsync,交由 OS 决定刷盘时机。 // 在高并发写入场景下大幅减少磁盘 I/O 阻塞,代价是极端断电可能丢失最近几秒数据(可接受)。 db, err := bolt.Open(path, 0o600, &bolt.Options{ NoSync: true, Timeout: 5 * time.Second, PageSize: 4096, }) if err != nil { return nil, fmt.Errorf("storage.Open bolt: %w", err) } // 启动时确保四个 bucket 都存在(不存在则创建) err = db.Update(func(tx *bolt.Tx) error { for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} { if _, err := tx.CreateBucketIfNotExists(b); err != nil { return err } } return nil }) if err != nil { return nil, fmt.Errorf("storage.Open create buckets: %w", err) } d := &DB{db: db} // 启动时预热 SiteInfo 缓存:一次性从 bbolt 加载所有 SiteInfo 到内存 d.siteCache = make(map[string]*SiteInfo) _ = db.View(func(tx *bolt.Tx) error { return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { var info SiteInfo if err := decompressUnmarshal(v, &info); err != nil { return nil // 跳过损坏条目 } if info.Languages == nil { info.Languages = make(map[string]float64) } if info.Redirects == nil { info.Redirects = make(map[string]string) } d.siteCache[string(k)] = &info return nil }) }) log.Printf("[storage] siteCache warmed: %d hosts loaded", len(d.siteCache)) return d, nil } // StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine(snippet 和 SiteInfo 各自独立)。 func (d *DB) StartWriteFlusher() { // Snippet 写缓冲 d.writeBuf = make(map[string]*writeOp) d.writeTicker = time.NewTicker(2 * time.Second) d.writeDone = make(chan struct{}) go func() { for { select { case <-d.writeTicker.C: d.flushWriteBuf() case <-d.writeDone: return } } }() // SiteInfo 独立缓存 d.siteCache = make(map[string]*SiteInfo) d.siteDirty = make(map[string]struct{}) d.siteTicker = time.NewTicker(5 * time.Second) d.siteDone = make(chan struct{}) go func() { for { select { case <-d.siteTicker.C: d.flushSiteCache() case <-d.siteDone: return } } }() } // flushWriteBuf 将写缓冲中的 snippet 操作批量刷入 bbolt。 func (d *DB) flushWriteBuf() { d.writeBufMu.Lock() if len(d.writeBuf) == 0 { d.writeBufMu.Unlock() return } // 快照并清空缓冲 snapshot := d.writeBuf d.writeBuf = make(map[string]*writeOp) d.writeBufMu.Unlock() // 单个事务批量写入 if err := d.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(bucketGate) for k, op := range snapshot { if err := b.Put([]byte(k), op.data); err != nil { return err } } return nil }); err != nil { log.Printf("[storage] flushWriteBuf error: %v", err) } } // flushSiteCache 将脏的 SiteInfo 批量刷入 bbolt。 func (d *DB) flushSiteCache() { d.siteDirtyMu.Lock() if len(d.siteDirty) == 0 { d.siteDirtyMu.Unlock() return } dirty := d.siteDirty d.siteDirty = make(map[string]struct{}) d.siteDirtyMu.Unlock() // 在读锁下快照所有脏数据并预压缩 d.siteCacheMu.RLock() type kv struct { host string data []byte } items := make([]kv, 0, len(dirty)) for host := range dirty { if info, ok := d.siteCache[host]; ok { data, err := marshalCompress(info) if err != nil { log.Printf("[storage] flushSiteCache marshal error for %s: %v", host, err) continue } items = append(items, kv{host, data}) } } d.siteCacheMu.RUnlock() if len(items) == 0 { return } // 单个事务批量写入 bbolt if err := d.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(bucketSiteGate) for _, item := range items { if err := b.Put([]byte(item.host), item.data); err != nil { return err } } return nil }); err != nil { log.Printf("[storage] flushSiteCache error: %v", err) } } // Close 关闭底层 bbolt 数据库连接。 // 先刷完写缓冲和 SiteInfo 缓存,再关闭定时器,最后关闭数据库。 func (d *DB) Close() error { if d.writeTicker != nil { d.writeTicker.Stop() } if d.siteTicker != nil { d.siteTicker.Stop() } d.flushWriteBuf() d.flushSiteCache() if d.writeDone != nil { close(d.writeDone) } if d.siteDone != nil { close(d.siteDone) } return d.db.Close() } // ---- 辅助函数:压缩与解压、map 拷贝 ---- // copyMapF64 深拷贝 map[string]float64。 func copyMapF64(m map[string]float64) map[string]float64 { cp := make(map[string]float64, len(m)) for k, v := range m { cp[k] = v } return cp } // copyMapStr 深拷贝 map[string]string。 func copyMapStr(m map[string]string) map[string]string { cp := make(map[string]string, len(m)) for k, v := range m { cp[k] = v } return cp } // compress 将字节数组用 brotli 压缩后返回。 // brotli 压缩比高于 gzip,适合大量文本的存储空间优化。 func compress(data []byte) ([]byte, error) { buf := make([]byte, 0, len(data)) // 预分配,避免反复扩容 w := brotli.NewWriterLevel((*appendWriter)(&buf), 3) // 压缩级别 3(优先速度,压缩比损失约 10-15%) if _, err := w.Write(data); err != nil { return nil, err } if err := w.Close(); err != nil { return nil, err } return buf, nil } // decompress 将 brotli 压缩的字节数组解压还原。 func decompress(data []byte) ([]byte, error) { // brotli.NewReader 从字节数组创建读取器(通过 byteReader 适配 io.Reader 接口) r := brotli.NewReader( (*byteReader)(&data), ) out := make([]byte, 0, len(data)*3) // 预分配约 3 倍空间(解压后通常更大) tmp := make([]byte, 4096) // 每次最多读 4KB for { n, err := r.Read(tmp) out = append(out, tmp[:n]...) // 追加本次读取的字节 if err != nil { if err == io.EOF { break // 正常读完 } return out, err // 其他错误(非 EOF)则返回 } } return out, nil } // appendWriter 将 *[]byte 适配为 io.Writer 接口(写入时直接 append)。 type appendWriter []byte // Write 将数据 p 追加到 appendWriter 末尾,返回写入字节数。 func (a *appendWriter) Write(p []byte) (int, error) { *a = append(*a, p...) return len(p), nil } // byteReader 将 []byte 适配为 io.Reader 接口(顺序读取,支持读完后返回 EOF)。 type byteReader []byte // Read 从字节数组读取最多 len(p) 字节到 p 中,返回实际读取字节数和可能的错误。 // 当字节数组全部读完后返回 io.EOF。 func (b *byteReader) Read(p []byte) (int, error) { if len(*b) == 0 { return 0, io.EOF // 已读完 } n := copy(p, *b) // 复制最多 len(p) 字节 *b = (*b)[n:] // 前进指针 return n, nil } // marshalCompress 将任意可序列化对象先 JSON 编码,再 brotli 压缩,返回压缩后的字节。 func marshalCompress(v any) ([]byte, error) { raw, err := json.Marshal(v) // 先序列化为 JSON if err != nil { return nil, err } return compress(raw) // 再压缩 } // decompressUnmarshal 将压缩字节先解压,再 JSON 反序列化到目标对象 v。 func decompressUnmarshal(data []byte, v any) error { raw, err := decompress(data) // 先解压 if err != nil { return err } return json.Unmarshal(raw, v) // 再反序列化 } // ---- 倒排索引(Index)相关方法 ---- // GetIndex 根据关键词查询倒排索引,返回该词关联的所有 [权重, URL] 条目列表。 func (d *DB) GetIndex(keyword string) ([]IndexEntry, error) { var entries []IndexEntry err := d.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(bucketIndex) v := b.Get([]byte(keyword)) // 在 index bucket 中按关键词查询 if v == nil { return nil // 不存在该词,返回空列表 } return decompressUnmarshal(v, &entries) }) return entries, err } // SetIndex 将某关键词的完整索引条目列表覆盖写入(替换旧数据)。 func (d *DB) SetIndex(keyword string, entries []IndexEntry) error { data, err := marshalCompress(entries) if err != nil { return err } return d.db.Update(func(tx *bolt.Tx) error { return tx.Bucket(bucketIndex).Put([]byte(keyword), data) }) } // BatchSetIndex 分批写入多个关键词→条目列表的映射。 // 大数据量时拆分为多个小事务,避免单个大事务导致的长时间阻塞。 func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error { const batchSize = 1000 items := make([]struct { keyword string entries []IndexEntry }, 0, len(batch)) for k, v := range batch { items = append(items, struct { keyword string entries []IndexEntry }{k, v}) } totalBatches := (len(items) + batchSize - 1) / batchSize for i := 0; i < len(items); i += batchSize { end := i + batchSize if end > len(items) { end = len(items) } batchNum := i/batchSize + 1 // 事务外预先完成所有序列化和压缩,减少事务持锁时间 preItems := make([]struct { keyword string data []byte }, 0, end-i) for _, item := range items[i:end] { data, err := marshalCompress(item.entries) if err != nil { return err } preItems = append(preItems, struct { keyword string data []byte }{item.keyword, data}) } // 事务内只做纯内存写入,持锁时间极短 if err := d.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(bucketIndex) for _, p := range preItems { if err := b.Put([]byte(p.keyword), p.data); err != nil { return err } } return nil }); err != nil { return err } if totalBatches > 1 { log.Printf("[storage] BatchSetIndex progress: batch %d/%d (%d keys)", batchNum, totalBatches, end) } } return nil } // ForEachIndex 遍历倒排索引中所有关键词及其关联条目,对每个条目调用 fn。 // 用于全量读取索引、做备份或重新计算等场景。 func (d *DB) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error { return d.db.View(func(tx *bolt.Tx) error { return tx.Bucket(bucketIndex).ForEach(func(k, v []byte) error { var entries []IndexEntry if err := decompressUnmarshal(v, &entries); err != nil { return nil // 跳过损坏条目,不中断遍历 } return fn(string(k), entries) }) }) } // ---- 融合之门(Gate):URL 摘要缓存相关方法 ---- // GetSnippet 根据 URL 查询缓存的摘要信息(标题/描述/正文片段)。 // 若未命中返回 error。 func (d *DB) GetSnippet(url string) (*SnippetEntry, error) { var entry SnippetEntry err := d.db.View(func(tx *bolt.Tx) error { v := tx.Bucket(bucketGate).Get([]byte(url)) if v == nil { return fmt.Errorf("not found") } return decompressUnmarshal(v, &entry) }) if err != nil { return nil, err } return &entry, nil } // SetSnippet 将某 URL 的摘要信息写入写缓冲(异步批量刷入磁盘)。 func (d *DB) SetSnippet(url string, entry *SnippetEntry) error { data, err := marshalCompress(entry) if err != nil { return err } d.writeBufMu.Lock() d.writeBuf[url] = &writeOp{data: data} // 缓冲过大时同步刷一次,防止内存膨胀 if len(d.writeBuf) >= 5000 { d.writeBufMu.Unlock() d.flushWriteBuf() return nil } d.writeBufMu.Unlock() return nil } // ---- 网站之门(SiteGate):网站元信息相关方法 ---- // SiteInfo 存放每个域名/主机的元信息,与 Python 版网站.py 的 dataclass 对应。 type SiteInfo struct { VisitCount int `json:"visit_count"` // 累计访问该网站的次数 LastVisitTime int64 `json:"last_visit_time"` // 上次访问该网站的时间戳 Fingerprint any `json:"fingerprint,omitempty"` // 网站指纹(用于识别重复站点) SuccessRate *float64 `json:"success_rate,omitempty"` // 访问成功率(成功次数/总访问次数) HTMLStructure string `json:"html_structure,omitempty"` // HTML 结构特征摘要 IPs []string `json:"ips,omitempty"` // 该域名解析出的 IP 列表 Quality *float64 `json:"quality,omitempty"` // 网站质量评分(0~1) HTTPSAvailable *bool `json:"https_available,omitempty"` // 是否支持 HTTPS Keywords []string `json:"keywords,omitempty"` // 该网站的高频关键词列表 OutLinks []string `json:"out_links,omitempty"` // 从该网站页面提取的出站链接列表 Languages map[string]float64 `json:"languages,omitempty"` // 网站语种分布(语种代码 → 占比) Redirects map[string]string `json:"redirects,omitempty"` // 重定向链(URL → 最终 URL) ServerTypes []string `json:"server_types,omitempty"` // 网站使用的 HTTP Server 类型列表 } // GetSiteInfo 根据主机名查询网站元信息。 // 全程只读内存缓存(零 bbolt 读事务),适合高并发调用。 // 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。 func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { d.siteCacheMu.RLock() info, ok := d.siteCache[host] d.siteCacheMu.RUnlock() if ok { // 返回深拷贝,防止调用方修改缓存 cp := *info if cp.Languages == nil { cp.Languages = make(map[string]float64) } else { cp.Languages = copyMapF64(cp.Languages) } if cp.Redirects == nil { cp.Redirects = make(map[string]string) } else { cp.Redirects = copyMapStr(cp.Redirects) } return &cp, nil } // 缓存未命中,从 bbolt 加载 // 注意:不能在 bbolt 事务回调(如 ForEachSnippet 的 fn)内调用此路径(bbolt 不支持嵌套事务)。 // 正常情况下预热已加载全部数据,不会走到这里。 var si SiteInfo var err error func() { defer func() { if r := recover(); r != nil { // bbolt 嵌套事务 panic,返回空 SiteInfo si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} err = nil } }() err = d.db.View(func(tx *bolt.Tx) error { v := tx.Bucket(bucketSiteGate).Get([]byte(host)) if v == nil { return fmt.Errorf("not found") } return decompressUnmarshal(v, &si) }) }() if err != nil { return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil } if si.Languages == nil { si.Languages = make(map[string]float64) } if si.Redirects == nil { si.Redirects = make(map[string]string) } // 回填缓存 d.siteCacheMu.Lock() if existing, exists := d.siteCache[host]; !exists { d.siteCache[host] = &si } else { si = *existing // 用更新的版本 } d.siteCacheMu.Unlock() return &si, nil } // SetSiteInfo 将网站元信息直接写入内存缓存并标记为脏(异步刷入磁盘)。 func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { cp := *info // 浅拷贝,断开与调用方的引用 if cp.Languages != nil { cp.Languages = copyMapF64(cp.Languages) } if cp.Redirects != nil { cp.Redirects = copyMapStr(cp.Redirects) } d.siteCacheMu.Lock() d.siteCache[host] = &cp d.siteCacheMu.Unlock() d.siteDirtyMu.Lock() d.siteDirty[host] = struct{}{} d.siteDirtyMu.Unlock() return nil } // UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回缓存。 // 只用 siteCacheMu(不碰 bbolt),不会与 flush 产生跨锁依赖。 func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { d.siteCacheMu.Lock() // 从缓存读取(缓存未命中会返回 nil,后续初始化为空) info, ok := d.siteCache[host] if !ok { // 从 bbolt 加载(注意:这是唯一会做 bbolt 读的地方,但只在首次 miss 时触发) var si SiteInfo err := d.db.View(func(tx *bolt.Tx) error { v := tx.Bucket(bucketSiteGate).Get([]byte(host)) if v == nil { return fmt.Errorf("not found") } return decompressUnmarshal(v, &si) }) if err != nil { si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} } else { if si.Languages == nil { si.Languages = make(map[string]float64) } if si.Redirects == nil { si.Redirects = make(map[string]string) } } d.siteCache[host] = &si info = &si } // 在锁内调用修改函数(直接修改缓存中的对象) fn(info) d.siteCacheMu.Unlock() // 标记脏(锁外) d.siteDirtyMu.Lock() d.siteDirty[host] = struct{}{} d.siteDirtyMu.Unlock() return nil } // ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。 func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error { return d.db.View(func(tx *bolt.Tx) error { return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { var info SiteInfo if err := decompressUnmarshal(v, &info); err != nil { return nil // 跳过损坏条目 } return fn(string(k), &info) }) }) } // ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。 func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error { return d.db.View(func(tx *bolt.Tx) error { return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error { var entry SnippetEntry if err := decompressUnmarshal(v, &entry); err != nil { return nil // 跳过损坏条目 } return fn(string(k), &entry) }) }) } // ---- 优先爬取队列(Priority Queue)相关方法 ---- // PriorityEntry 记录一条待优先爬取的 URL 或域名。 type PriorityEntry struct { URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL) IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL) AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳 Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记) } // GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。 func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) { var entries []PriorityEntry err := d.db.View(func(tx *bolt.Tx) error { return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { var e PriorityEntry if err := decompressUnmarshal(v, &e); err != nil { return nil // 跳过损坏条目 } if !e.Visited { entries = append(entries, e) } return nil }) }) return entries, err } // AddPriorityURL 添加一条 priority 条目(key = URL,value = PriorityEntry)。 // 若已存在(且未访问)则忽略。 func (d *DB) AddPriorityURL(entry PriorityEntry) error { return d.db.Update(func(tx *bolt.Tx) error { k := []byte(entry.URL) existing := tx.Bucket(bucketPriority).Get(k) if existing != nil { var e PriorityEntry if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited { return nil // 已存在且未访问,忽略 } } data, err := marshalCompress(entry) if err != nil { return err } return tx.Bucket(bucketPriority).Put(k, data) }) } // RemovePriorityURL 删除指定 URL 的 priority 条目。 func (d *DB) RemovePriorityURL(url string) error { return d.db.Update(func(tx *bolt.Tx) error { return tx.Bucket(bucketPriority).Delete([]byte(url)) }) } // ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。 func (d *DB) ClearVisitedPriorityURLs() error { return d.db.Update(func(tx *bolt.Tx) error { return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { var e PriorityEntry if err := decompressUnmarshal(v, &e); err != nil { return nil } if e.Visited { return tx.Bucket(bucketPriority).Delete(k) } return nil }) }) }