Files
sese-engine-go/storage/storage.go
T
2026-04-09 11:58:53 +08:00

625 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package storage provides the persistent index and site-info storage backed by bbolt.
// storage 包提供基于 bbolt 的持久化存储,负责保存倒排索引、URL摘要缓存和网站元信息。
//
// 索引空间(index bucket):key = 关键词(string),value = brotli 压缩的 JSON 数组,每项为 [权重, URL] 对。
// 融合之门(gate bucket):key = URLstring),value = brotli 压缩的 JSON 数组 [标题, 描述, 正文, 时间戳]。
// 网站之门(site_gate bucket):key = 主机名(string),value = brotli 压缩的 JSON SiteInfo 结构体。
//
// Python 版使用自定义哈希桶结构;Go 版直接交由 bbolt 原生处理。
package storage
import (
"encoding/json" // JSON 序列化/反序列化
"fmt" // 格式化错误信息
"io" // io.EOF 常量
"log" // 日志输出
"os" // 操作系统功能(创建目录等)
"path/filepath" // 路径拼接
"sync" // 互斥锁(保护写缓冲)
"time" // bbolt 超时配置和写缓冲定时器
"github.com/andybalholm/brotli" // Brotli 无损压缩库(用于压缩存储数据)
bolt "go.etcd.io/bbolt" // BoltDB,纯 Go 嵌入式 KV 数据库
)
// IndexEntry 是倒排索引中的单个条目。
// 一条索引记录表示"某个 URL 与某个关键词的相关性权重"。
type IndexEntry struct {
Weight float32 `json:"w"` // 该 URL 在该关键词下的得分/权重
URL string `json:"u"` // 网页 URL
}
// SnippetEntry 是 URL 对应的摘要信息缓存。
// 包含页面标题、描述、正文片段和抓取时间戳。
type SnippetEntry struct {
Title string `json:"title"` // 网页标题
Description string `json:"desc"` // meta description 或自动生成的描述
Text string `json:"text"` // 正文前 N 字符的文本片段
Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳
}
// 四个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节)
var (
bucketIndex = []byte("index") // 倒排索引 bucket
bucketGate = []byte("gate") // URL 摘要缓存 bucket
bucketSiteGate = []byte("site_gate") // 网站元信息 bucket
bucketPriority = []byte("priority") // 优先爬取 URL bucket
)
// writeOp 表示一个待写入的操作。
type writeOp struct {
opType int // 0 = set snippet, 1 = set site info
key string // URL 或 host
data []byte // marshalCompress 后的数据
}
// DB 封装一个 bbolt 数据库,提供类型化的存取接口。
// bbolt 内部已实现并发安全,无需额外加锁。
type DB struct {
db *bolt.DB // 底层 bbolt 数据库句柄
// 异步写缓冲:SetSnippet/SetSiteInfo 先写到内存,定期批量刷入 bbolt。
writeBuf map[string]*writeOp // key → 待写入的操作
writeBufMu sync.Mutex
writeTicker *time.Ticker
writeDone chan struct{}
}
// Open 在指定目录路径下创建或打开 bbolt 数据库文件。
// 如果目录不存在会自动创建。数据库文件名为 sese.db。
func Open(dir string) (*DB, error) {
// 确保存储目录存在(0775 权限:所有者读写执行,组用户读执行,其他读执行)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("storage.Open mkdir: %w", err)
}
// 拼接数据库文件路径:dir/sese.db
path := filepath.Join(dir, "sese.db")
// 打开/创建数据库文件,文件权限 0600(仅所有者可读写)
// NoSync: true — 不在每次写事务后 fsync,交由 OS 决定刷盘时机。
// 在高并发写入场景下大幅减少磁盘 I/O 阻塞,代价是极端断电可能丢失最近几秒数据(可接受)。
db, err := bolt.Open(path, 0o600, &bolt.Options{
NoSync: true,
Timeout: 5 * time.Second,
PageSize: 4096,
})
if err != nil {
return nil, fmt.Errorf("storage.Open bolt: %w", err)
}
// 启动时确保四个 bucket 都存在(不存在则创建)
err = db.Update(func(tx *bolt.Tx) error {
for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} {
if _, err := tx.CreateBucketIfNotExists(b); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("storage.Open create buckets: %w", err)
}
return &DB{db: db}, nil
}
// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine。
func (d *DB) StartWriteFlusher() {
d.writeBuf = make(map[string]*writeOp)
d.writeTicker = time.NewTicker(2 * time.Second) // 每 2 秒刷一次
d.writeDone = make(chan struct{})
go func() {
for {
select {
case <-d.writeTicker.C:
d.flushWriteBuf()
case <-d.writeDone:
return
}
}
}()
}
// flushWriteBuf 将写缓冲中的所有待写入操作批量刷入 bbolt。
func (d *DB) flushWriteBuf() {
d.writeBufMu.Lock()
if len(d.writeBuf) == 0 {
d.writeBufMu.Unlock()
return
}
// 快照并清空缓冲
snapshot := d.writeBuf
d.writeBuf = make(map[string]*writeOp)
d.writeBufMu.Unlock()
// 预先按 bucket 分组
snippets := make(map[string][]byte)
siteInfos := make(map[string][]byte)
for key, op := range snapshot {
if op.opType == 0 {
snippets[key] = op.data
} else {
siteInfos[key] = op.data
}
}
// 单个事务批量写入
if err := d.db.Update(func(tx *bolt.Tx) error {
if len(snippets) > 0 {
b := tx.Bucket(bucketGate)
for k, v := range snippets {
if err := b.Put([]byte(k), v); err != nil {
return err
}
}
}
if len(siteInfos) > 0 {
b := tx.Bucket(bucketSiteGate)
for k, v := range siteInfos {
if err := b.Put([]byte(k), v); err != nil {
return err
}
}
}
return nil
}); err != nil {
log.Printf("[storage] flushWriteBuf error: %v", err)
}
}
// Close 关闭底层 bbolt 数据库连接。
// 先刷完写缓冲,再关闭 ticker,最后关闭数据库。
func (d *DB) Close() error {
if d.writeTicker != nil {
d.writeTicker.Stop()
}
d.flushWriteBuf() // 最后刷一次,确保数据不丢失
if d.writeDone != nil {
close(d.writeDone)
}
return d.db.Close()
}
// ---- 辅助函数:压缩与解压 ----
// compress 将字节数组用 brotli 压缩后返回。
// brotli 压缩比高于 gzip,适合大量文本的存储空间优化。
func compress(data []byte) ([]byte, error) {
buf := make([]byte, 0, len(data)) // 预分配,避免反复扩容
w := brotli.NewWriterLevel((*appendWriter)(&buf), 3) // 压缩级别 3(优先速度,压缩比损失约 10-15%)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf, nil
}
// decompress 将 brotli 压缩的字节数组解压还原。
func decompress(data []byte) ([]byte, error) {
// brotli.NewReader 从字节数组创建读取器(通过 byteReader 适配 io.Reader 接口)
r := brotli.NewReader(
(*byteReader)(&data),
)
out := make([]byte, 0, len(data)*3) // 预分配约 3 倍空间(解压后通常更大)
tmp := make([]byte, 4096) // 每次最多读 4KB
for {
n, err := r.Read(tmp)
out = append(out, tmp[:n]...) // 追加本次读取的字节
if err != nil {
if err == io.EOF {
break // 正常读完
}
return out, err // 其他错误(非 EOF)则返回
}
}
return out, nil
}
// appendWriter 将 *[]byte 适配为 io.Writer 接口(写入时直接 append)。
type appendWriter []byte
// Write 将数据 p 追加到 appendWriter 末尾,返回写入字节数。
func (a *appendWriter) Write(p []byte) (int, error) {
*a = append(*a, p...)
return len(p), nil
}
// byteReader 将 []byte 适配为 io.Reader 接口(顺序读取,支持读完后返回 EOF)。
type byteReader []byte
// Read 从字节数组读取最多 len(p) 字节到 p 中,返回实际读取字节数和可能的错误。
// 当字节数组全部读完后返回 io.EOF。
func (b *byteReader) Read(p []byte) (int, error) {
if len(*b) == 0 {
return 0, io.EOF // 已读完
}
n := copy(p, *b) // 复制最多 len(p) 字节
*b = (*b)[n:] // 前进指针
return n, nil
}
// marshalCompress 将任意可序列化对象先 JSON 编码,再 brotli 压缩,返回压缩后的字节。
func marshalCompress(v any) ([]byte, error) {
raw, err := json.Marshal(v) // 先序列化为 JSON
if err != nil {
return nil, err
}
return compress(raw) // 再压缩
}
// decompressUnmarshal 将压缩字节先解压,再 JSON 反序列化到目标对象 v。
func decompressUnmarshal(data []byte, v any) error {
raw, err := decompress(data) // 先解压
if err != nil {
return err
}
return json.Unmarshal(raw, v) // 再反序列化
}
// ---- 倒排索引(Index)相关方法 ----
// GetIndex 根据关键词查询倒排索引,返回该词关联的所有 [权重, URL] 条目列表。
func (d *DB) GetIndex(keyword string) ([]IndexEntry, error) {
var entries []IndexEntry
err := d.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketIndex)
v := b.Get([]byte(keyword)) // 在 index bucket 中按关键词查询
if v == nil {
return nil // 不存在该词,返回空列表
}
return decompressUnmarshal(v, &entries)
})
return entries, err
}
// SetIndex 将某关键词的完整索引条目列表覆盖写入(替换旧数据)。
func (d *DB) SetIndex(keyword string, entries []IndexEntry) error {
data, err := marshalCompress(entries)
if err != nil {
return err
}
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketIndex).Put([]byte(keyword), data)
})
}
// BatchSetIndex 分批写入多个关键词→条目列表的映射。
// 大数据量时拆分为多个小事务,避免单个大事务导致的长时间阻塞。
func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error {
const batchSize = 1000
items := make([]struct {
keyword string
entries []IndexEntry
}, 0, len(batch))
for k, v := range batch {
items = append(items, struct {
keyword string
entries []IndexEntry
}{k, v})
}
totalBatches := (len(items) + batchSize - 1) / batchSize
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batchNum := i/batchSize + 1
// 事务外预先完成所有序列化和压缩,减少事务持锁时间
preItems := make([]struct {
keyword string
data []byte
}, 0, end-i)
for _, item := range items[i:end] {
data, err := marshalCompress(item.entries)
if err != nil {
return err
}
preItems = append(preItems, struct {
keyword string
data []byte
}{item.keyword, data})
}
// 事务内只做纯内存写入,持锁时间极短
if err := d.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketIndex)
for _, p := range preItems {
if err := b.Put([]byte(p.keyword), p.data); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if totalBatches > 1 {
log.Printf("[storage] BatchSetIndex progress: batch %d/%d (%d keys)", batchNum, totalBatches, end)
}
}
return nil
}
// ForEachIndex 遍历倒排索引中所有关键词及其关联条目,对每个条目调用 fn。
// 用于全量读取索引、做备份或重新计算等场景。
func (d *DB) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketIndex).ForEach(func(k, v []byte) error {
var entries []IndexEntry
if err := decompressUnmarshal(v, &entries); err != nil {
return nil // 跳过损坏条目,不中断遍历
}
return fn(string(k), entries)
})
})
}
// ---- 融合之门(Gate):URL 摘要缓存相关方法 ----
// GetSnippet 根据 URL 查询缓存的摘要信息(标题/描述/正文片段)。
// 若未命中返回 error。
func (d *DB) GetSnippet(url string) (*SnippetEntry, error) {
var entry SnippetEntry
err := d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketGate).Get([]byte(url))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &entry)
})
if err != nil {
return nil, err
}
return &entry, nil
}
// SetSnippet 将某 URL 的摘要信息写入写缓冲(异步批量刷入磁盘)。
func (d *DB) SetSnippet(url string, entry *SnippetEntry) error {
data, err := marshalCompress(entry)
if err != nil {
return err
}
d.writeBufMu.Lock()
d.writeBuf["snippet:"+url] = &writeOp{opType: 0, key: url, data: data}
// 缓冲过大时同步刷一次,防止内存膨胀
if len(d.writeBuf) >= 5000 {
d.writeBufMu.Unlock()
d.flushWriteBuf()
return nil
}
d.writeBufMu.Unlock()
return nil
}
// ---- 网站之门(SiteGate):网站元信息相关方法 ----
// SiteInfo 存放每个域名/主机的元信息,与 Python 版网站.py 的 dataclass 对应。
type SiteInfo struct {
VisitCount int `json:"visit_count"` // 累计访问该网站的次数
LastVisitTime int64 `json:"last_visit_time"` // 上次访问该网站的时间戳
Fingerprint any `json:"fingerprint,omitempty"` // 网站指纹(用于识别重复站点)
SuccessRate *float64 `json:"success_rate,omitempty"` // 访问成功率(成功次数/总访问次数)
HTMLStructure string `json:"html_structure,omitempty"` // HTML 结构特征摘要
IPs []string `json:"ips,omitempty"` // 该域名解析出的 IP 列表
Quality *float64 `json:"quality,omitempty"` // 网站质量评分(0~1
HTTPSAvailable *bool `json:"https_available,omitempty"` // 是否支持 HTTPS
Keywords []string `json:"keywords,omitempty"` // 该网站的高频关键词列表
OutLinks []string `json:"out_links,omitempty"` // 从该网站页面提取的出站链接列表
Languages map[string]float64 `json:"languages,omitempty"` // 网站语种分布(语种代码 → 占比)
Redirects map[string]string `json:"redirects,omitempty"` // 重定向链(URL → 最终 URL)
ServerTypes []string `json:"server_types,omitempty"` // 网站使用的 HTTP Server 类型列表
}
// GetSiteInfo 根据主机名查询网站元信息。
// 优先从写缓冲中读取(保证与最近 SetSiteInfo 的数据一致),未命中再读 bbolt。
// 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。
func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) {
// 先检查写缓冲中是否有该 host 的最新待写入数据
d.writeBufMu.Lock()
if op, ok := d.writeBuf["site:"+host]; ok {
d.writeBufMu.Unlock()
var info SiteInfo
if err := decompressUnmarshal(op.data, &info); err == nil {
if info.Languages == nil {
info.Languages = make(map[string]float64)
}
if info.Redirects == nil {
info.Redirects = make(map[string]string)
}
return &info, nil
}
// 反序列化失败(不应发生),fall through 到读 db
} else {
d.writeBufMu.Unlock()
}
var info SiteInfo
err := d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &info)
})
if err != nil {
// 未找到时返回带默认空 map 的空结构,避免调用方空指针 panic
return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil
}
if info.Languages == nil {
info.Languages = make(map[string]float64)
}
if info.Redirects == nil {
info.Redirects = make(map[string]string)
}
return &info, nil
}
// SetSiteInfo 将网站元信息写入写缓冲(异步批量刷入磁盘)。
func (d *DB) SetSiteInfo(host string, info *SiteInfo) error {
data, err := marshalCompress(info)
if err != nil {
return err
}
d.writeBufMu.Lock()
d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data}
if len(d.writeBuf) >= 5000 {
d.writeBufMu.Unlock()
d.flushWriteBuf()
return nil
}
d.writeBufMu.Unlock()
return nil
}
// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回。
// 整个读-改-写过程在 writeBufMu 锁内完成,消除并发 lost update 竞态。
// 适用于多个 goroutine 对同一 host 的 SiteInfo 进行读-改-写的场景。
func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error {
d.writeBufMu.Lock()
// 从缓冲或 db 读取最新 SiteInfo
var info *SiteInfo
if op, ok := d.writeBuf["site:"+host]; ok {
var si SiteInfo
if err := decompressUnmarshal(op.data, &si); err == nil {
info = &si
}
}
if info == nil {
var si SiteInfo
err := d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &si)
})
if err != nil {
info = &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
} else {
info = &si
}
}
if info.Languages == nil {
info.Languages = make(map[string]float64)
}
if info.Redirects == nil {
info.Redirects = make(map[string]string)
}
// 在锁内调用修改函数
fn(info)
// 序列化并写回缓冲
data, err := marshalCompress(info)
if err != nil {
d.writeBufMu.Unlock()
return err
}
d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data}
if len(d.writeBuf) >= 5000 {
d.writeBufMu.Unlock()
d.flushWriteBuf()
return nil
}
d.writeBufMu.Unlock()
return nil
}
// ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。
func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error {
var info SiteInfo
if err := decompressUnmarshal(v, &info); err != nil {
return nil // 跳过损坏条目
}
return fn(string(k), &info)
})
})
}
// ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。
func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error {
var entry SnippetEntry
if err := decompressUnmarshal(v, &entry); err != nil {
return nil // 跳过损坏条目
}
return fn(string(k), &entry)
})
})
}
// ---- 优先爬取队列(Priority Queue)相关方法 ----
// PriorityEntry 记录一条待优先爬取的 URL 或域名。
type PriorityEntry struct {
URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL)
IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL
AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳
Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记)
}
// GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。
func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) {
var entries []PriorityEntry
err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil // 跳过损坏条目
}
if !e.Visited {
entries = append(entries, e)
}
return nil
})
})
return entries, err
}
// AddPriorityURL 添加一条 priority 条目(key = URLvalue = PriorityEntry)。
// 若已存在(且未访问)则忽略。
func (d *DB) AddPriorityURL(entry PriorityEntry) error {
return d.db.Update(func(tx *bolt.Tx) error {
k := []byte(entry.URL)
existing := tx.Bucket(bucketPriority).Get(k)
if existing != nil {
var e PriorityEntry
if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited {
return nil // 已存在且未访问,忽略
}
}
data, err := marshalCompress(entry)
if err != nil {
return err
}
return tx.Bucket(bucketPriority).Put(k, data)
})
}
// RemovePriorityURL 删除指定 URL 的 priority 条目。
func (d *DB) RemovePriorityURL(url string) error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).Delete([]byte(url))
})
}
// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。
func (d *DB) ClearVisitedPriorityURLs() error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil
}
if e.Visited {
return tx.Bucket(bucketPriority).Delete(k)
}
return nil
})
})
}