Files
sese-engine-go/storage/storage.go
T

734 lines
23 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package storage provides the persistent index and site-info storage backed by bbolt.
// storage 包提供基于 bbolt 的持久化存储,负责保存倒排索引、URL摘要缓存和网站元信息。
//
// 索引空间(index bucket):key = 关键词(string),value = brotli 压缩的 JSON 数组,每项为 [权重, URL] 对。
// 融合之门(gate bucket):key = URLstring),value = brotli 压缩的 JSON 数组 [标题, 描述, 正文, 时间戳]。
// 网站之门(site_gate bucket):key = 主机名(string),value = brotli 压缩的 JSON SiteInfo 结构体。
//
// Python 版使用自定义哈希桶结构;Go 版直接交由 bbolt 原生处理。
package storage
import (
"encoding/json" // JSON 序列化/反序列化
"fmt" // 格式化错误信息
"io" // io.EOF 常量
"log" // 日志输出
"os" // 操作系统功能(创建目录等)
"path/filepath" // 路径拼接
"sync" // 互斥锁(保护写缓冲)
"time" // bbolt 超时配置和写缓冲定时器
"github.com/andybalholm/brotli" // Brotli 无损压缩库(用于压缩存储数据)
bolt "go.etcd.io/bbolt" // BoltDB,纯 Go 嵌入式 KV 数据库
)
// IndexEntry 是倒排索引中的单个条目。
// 一条索引记录表示"某个 URL 与某个关键词的相关性权重"。
type IndexEntry struct {
Weight float32 `json:"w"` // 该 URL 在该关键词下的得分/权重
URL string `json:"u"` // 网页 URL
}
// SnippetEntry 是 URL 对应的摘要信息缓存。
// 包含页面标题、描述、正文片段、抓取时间戳和内容哈希(用于增量重爬检测)。
type SnippetEntry struct {
Title string `json:"title"` // 网页标题
Description string `json:"desc"` // meta description 或自动生成的描述
Text string `json:"text"` // 正文前 N 字符的文本片段
Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳
ContentHash string `json:"hash"` // 正文内容的 FNV-1a 哈希(用于增量重爬判断内容是否变化)
}
// 四个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节)
var (
bucketIndex = []byte("index") // 倒排索引 bucket
bucketGate = []byte("gate") // URL 摘要缓存 bucket
bucketSiteGate = []byte("site_gate") // 网站元信息 bucket
bucketPriority = []byte("priority") // 优先爬取 URL bucket
)
// writeOp 表示一个待写入的 snippet 操作。
type writeOp struct {
data []byte // marshalCompress 后的数据
}
// DB 封装一个 bbolt 数据库,提供类型化的存取接口。
// bbolt 内部已实现并发安全,无需额外加锁。
type DB struct {
db *bolt.DB // 底层 bbolt 数据库句柄
// 异步写缓冲:SetSnippet 先写到内存,定期批量刷入 bbolt。
writeBuf map[string]*writeOp // key → 待写入的操作
writeBufMu sync.Mutex
writeTicker *time.Ticker
writeDone chan struct{}
// SiteInfo 独立内存缓存:读全走缓存(零 bbolt 读事务),定期批量刷入 bbolt。
// 使用 RWMutex 允许多个读并发,写(UpdateSiteInfo)独占。
siteCache map[string]*SiteInfo // host → 最新 SiteInfo
siteCacheMu sync.RWMutex
siteDirty map[string]struct{} // 需要刷盘的 host 集合
siteDirtyMu sync.Mutex
siteTicker *time.Ticker
siteDone chan struct{}
}
// Open 在指定目录路径下创建或打开 bbolt 数据库文件。
// 如果目录不存在会自动创建。数据库文件名为 sese.db。
func Open(dir string) (*DB, error) {
// 确保存储目录存在(0775 权限:所有者读写执行,组用户读执行,其他读执行)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("storage.Open mkdir: %w", err)
}
// 拼接数据库文件路径:dir/sese.db
path := filepath.Join(dir, "sese.db")
// 打开/创建数据库文件,文件权限 0600(仅所有者可读写)
// NoSync: true — 不在每次写事务后 fsync,交由 OS 决定刷盘时机。
// 在高并发写入场景下大幅减少磁盘 I/O 阻塞,代价是极端断电可能丢失最近几秒数据(可接受)。
db, err := bolt.Open(path, 0o600, &bolt.Options{
NoSync: true,
Timeout: 5 * time.Second,
PageSize: 4096,
})
if err != nil {
return nil, fmt.Errorf("storage.Open bolt: %w", err)
}
// 启动时确保四个 bucket 都存在(不存在则创建)
err = db.Update(func(tx *bolt.Tx) error {
for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} {
if _, err := tx.CreateBucketIfNotExists(b); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("storage.Open create buckets: %w", err)
}
d := &DB{db: db}
// 启动时预热 SiteInfo 缓存:一次性从 bbolt 加载所有 SiteInfo 到内存
d.siteCache = make(map[string]*SiteInfo)
_ = db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error {
var info SiteInfo
if err := decompressUnmarshal(v, &info); err != nil {
return nil // 跳过损坏条目
}
if info.Languages == nil {
info.Languages = make(map[string]float64)
}
if info.Redirects == nil {
info.Redirects = make(map[string]string)
}
d.siteCache[string(k)] = &info
return nil
})
})
log.Printf("[storage] siteCache warmed: %d hosts loaded", len(d.siteCache))
return d, nil
}
// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutinesnippet 和 SiteInfo 各自独立)。
func (d *DB) StartWriteFlusher() {
// Snippet 写缓冲
d.writeBuf = make(map[string]*writeOp)
d.writeTicker = time.NewTicker(2 * time.Second)
d.writeDone = make(chan struct{})
go func() {
for {
select {
case <-d.writeTicker.C:
d.flushWriteBuf()
case <-d.writeDone:
return
}
}
}()
// SiteInfo 独立缓存
d.siteCache = make(map[string]*SiteInfo)
d.siteDirty = make(map[string]struct{})
d.siteTicker = time.NewTicker(5 * time.Second)
d.siteDone = make(chan struct{})
go func() {
for {
select {
case <-d.siteTicker.C:
d.flushSiteCache()
case <-d.siteDone:
return
}
}
}()
}
// flushWriteBuf 将写缓冲中的 snippet 操作批量刷入 bbolt。
func (d *DB) flushWriteBuf() {
d.writeBufMu.Lock()
if len(d.writeBuf) == 0 {
d.writeBufMu.Unlock()
return
}
// 快照并清空缓冲
snapshot := d.writeBuf
d.writeBuf = make(map[string]*writeOp)
d.writeBufMu.Unlock()
// 单个事务批量写入
if err := d.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketGate)
for k, op := range snapshot {
if err := b.Put([]byte(k), op.data); err != nil {
return err
}
}
return nil
}); err != nil {
log.Printf("[storage] flushWriteBuf error: %v", err)
}
}
// flushSiteCache 将脏的 SiteInfo 批量刷入 bbolt。
func (d *DB) flushSiteCache() {
d.siteDirtyMu.Lock()
if len(d.siteDirty) == 0 {
d.siteDirtyMu.Unlock()
return
}
dirty := d.siteDirty
d.siteDirty = make(map[string]struct{})
d.siteDirtyMu.Unlock()
// 在读锁下快照所有脏数据并预压缩
d.siteCacheMu.RLock()
type kv struct {
host string
data []byte
}
items := make([]kv, 0, len(dirty))
for host := range dirty {
if info, ok := d.siteCache[host]; ok {
data, err := marshalCompress(info)
if err != nil {
log.Printf("[storage] flushSiteCache marshal error for %s: %v", host, err)
continue
}
items = append(items, kv{host, data})
}
}
d.siteCacheMu.RUnlock()
if len(items) == 0 {
return
}
// 单个事务批量写入 bbolt
if err := d.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketSiteGate)
for _, item := range items {
if err := b.Put([]byte(item.host), item.data); err != nil {
return err
}
}
return nil
}); err != nil {
log.Printf("[storage] flushSiteCache error: %v", err)
}
}
// Close 关闭底层 bbolt 数据库连接。
// 先刷完写缓冲和 SiteInfo 缓存,再关闭定时器,最后关闭数据库。
func (d *DB) Close() error {
if d.writeTicker != nil {
d.writeTicker.Stop()
}
if d.siteTicker != nil {
d.siteTicker.Stop()
}
d.flushWriteBuf()
d.flushSiteCache()
if d.writeDone != nil {
close(d.writeDone)
}
if d.siteDone != nil {
close(d.siteDone)
}
return d.db.Close()
}
// ---- 辅助函数:压缩与解压、map 拷贝 ----
// copyMapF64 深拷贝 map[string]float64。
func copyMapF64(m map[string]float64) map[string]float64 {
cp := make(map[string]float64, len(m))
for k, v := range m {
cp[k] = v
}
return cp
}
// copyMapStr 深拷贝 map[string]string。
func copyMapStr(m map[string]string) map[string]string {
cp := make(map[string]string, len(m))
for k, v := range m {
cp[k] = v
}
return cp
}
// compress 将字节数组用 brotli 压缩后返回。
// brotli 压缩比高于 gzip,适合大量文本的存储空间优化。
func compress(data []byte) ([]byte, error) {
buf := make([]byte, 0, len(data)) // 预分配,避免反复扩容
w := brotli.NewWriterLevel((*appendWriter)(&buf), 3) // 压缩级别 3(优先速度,压缩比损失约 10-15%)
if _, err := w.Write(data); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
return buf, nil
}
// decompress 将 brotli 压缩的字节数组解压还原。
func decompress(data []byte) ([]byte, error) {
// brotli.NewReader 从字节数组创建读取器(通过 byteReader 适配 io.Reader 接口)
r := brotli.NewReader(
(*byteReader)(&data),
)
out := make([]byte, 0, len(data)*3) // 预分配约 3 倍空间(解压后通常更大)
tmp := make([]byte, 4096) // 每次最多读 4KB
for {
n, err := r.Read(tmp)
out = append(out, tmp[:n]...) // 追加本次读取的字节
if err != nil {
if err == io.EOF {
break // 正常读完
}
return out, err // 其他错误(非 EOF)则返回
}
}
return out, nil
}
// appendWriter 将 *[]byte 适配为 io.Writer 接口(写入时直接 append)。
type appendWriter []byte
// Write 将数据 p 追加到 appendWriter 末尾,返回写入字节数。
func (a *appendWriter) Write(p []byte) (int, error) {
*a = append(*a, p...)
return len(p), nil
}
// byteReader 将 []byte 适配为 io.Reader 接口(顺序读取,支持读完后返回 EOF)。
type byteReader []byte
// Read 从字节数组读取最多 len(p) 字节到 p 中,返回实际读取字节数和可能的错误。
// 当字节数组全部读完后返回 io.EOF。
func (b *byteReader) Read(p []byte) (int, error) {
if len(*b) == 0 {
return 0, io.EOF // 已读完
}
n := copy(p, *b) // 复制最多 len(p) 字节
*b = (*b)[n:] // 前进指针
return n, nil
}
// marshalCompress 将任意可序列化对象先 JSON 编码,再 brotli 压缩,返回压缩后的字节。
func marshalCompress(v any) ([]byte, error) {
raw, err := json.Marshal(v) // 先序列化为 JSON
if err != nil {
return nil, err
}
return compress(raw) // 再压缩
}
// decompressUnmarshal 将压缩字节先解压,再 JSON 反序列化到目标对象 v。
func decompressUnmarshal(data []byte, v any) error {
raw, err := decompress(data) // 先解压
if err != nil {
return err
}
return json.Unmarshal(raw, v) // 再反序列化
}
// ---- 倒排索引(Index)相关方法 ----
// GetIndex 根据关键词查询倒排索引,返回该词关联的所有 [权重, URL] 条目列表。
func (d *DB) GetIndex(keyword string) ([]IndexEntry, error) {
var entries []IndexEntry
err := d.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketIndex)
v := b.Get([]byte(keyword)) // 在 index bucket 中按关键词查询
if v == nil {
return nil // 不存在该词,返回空列表
}
return decompressUnmarshal(v, &entries)
})
return entries, err
}
// SetIndex 将某关键词的完整索引条目列表覆盖写入(替换旧数据)。
func (d *DB) SetIndex(keyword string, entries []IndexEntry) error {
data, err := marshalCompress(entries)
if err != nil {
return err
}
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketIndex).Put([]byte(keyword), data)
})
}
// BatchSetIndex 分批写入多个关键词→条目列表的映射。
// 大数据量时拆分为多个小事务,避免单个大事务导致的长时间阻塞。
func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error {
const batchSize = 1000
items := make([]struct {
keyword string
entries []IndexEntry
}, 0, len(batch))
for k, v := range batch {
items = append(items, struct {
keyword string
entries []IndexEntry
}{k, v})
}
totalBatches := (len(items) + batchSize - 1) / batchSize
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batchNum := i/batchSize + 1
// 事务外预先完成所有序列化和压缩,减少事务持锁时间
preItems := make([]struct {
keyword string
data []byte
}, 0, end-i)
for _, item := range items[i:end] {
data, err := marshalCompress(item.entries)
if err != nil {
return err
}
preItems = append(preItems, struct {
keyword string
data []byte
}{item.keyword, data})
}
// 事务内只做纯内存写入,持锁时间极短
if err := d.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketIndex)
for _, p := range preItems {
if err := b.Put([]byte(p.keyword), p.data); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if totalBatches > 1 {
log.Printf("[storage] BatchSetIndex progress: batch %d/%d (%d keys)", batchNum, totalBatches, end)
}
}
return nil
}
// ForEachIndex 遍历倒排索引中所有关键词及其关联条目,对每个条目调用 fn。
// 用于全量读取索引、做备份或重新计算等场景。
func (d *DB) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketIndex).ForEach(func(k, v []byte) error {
var entries []IndexEntry
if err := decompressUnmarshal(v, &entries); err != nil {
return nil // 跳过损坏条目,不中断遍历
}
return fn(string(k), entries)
})
})
}
// ---- 融合之门(Gate):URL 摘要缓存相关方法 ----
// GetSnippet 根据 URL 查询缓存的摘要信息(标题/描述/正文片段)。
// 若未命中返回 error。
func (d *DB) GetSnippet(url string) (*SnippetEntry, error) {
var entry SnippetEntry
err := d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketGate).Get([]byte(url))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &entry)
})
if err != nil {
return nil, err
}
return &entry, nil
}
// SetSnippet 将某 URL 的摘要信息写入写缓冲(异步批量刷入磁盘)。
func (d *DB) SetSnippet(url string, entry *SnippetEntry) error {
data, err := marshalCompress(entry)
if err != nil {
return err
}
d.writeBufMu.Lock()
d.writeBuf[url] = &writeOp{data: data}
// 缓冲过大时同步刷一次,防止内存膨胀
if len(d.writeBuf) >= 5000 {
d.writeBufMu.Unlock()
d.flushWriteBuf()
return nil
}
d.writeBufMu.Unlock()
return nil
}
// ---- 网站之门(SiteGate):网站元信息相关方法 ----
// SiteInfo 存放每个域名/主机的元信息,与 Python 版网站.py 的 dataclass 对应。
type SiteInfo struct {
VisitCount int `json:"visit_count"` // 累计访问该网站的次数
LastVisitTime int64 `json:"last_visit_time"` // 上次访问该网站的时间戳
Fingerprint any `json:"fingerprint,omitempty"` // 网站指纹(用于识别重复站点)
SuccessRate *float64 `json:"success_rate,omitempty"` // 访问成功率(成功次数/总访问次数)
HTMLStructure string `json:"html_structure,omitempty"` // HTML 结构特征摘要
IPs []string `json:"ips,omitempty"` // 该域名解析出的 IP 列表
Quality *float64 `json:"quality,omitempty"` // 网站质量评分(0~1
HTTPSAvailable *bool `json:"https_available,omitempty"` // 是否支持 HTTPS
Keywords []string `json:"keywords,omitempty"` // 该网站的高频关键词列表
OutLinks []string `json:"out_links,omitempty"` // 从该网站页面提取的出站链接列表
Languages map[string]float64 `json:"languages,omitempty"` // 网站语种分布(语种代码 → 占比)
Redirects map[string]string `json:"redirects,omitempty"` // 重定向链(URL → 最终 URL)
ServerTypes []string `json:"server_types,omitempty"` // 网站使用的 HTTP Server 类型列表
}
// GetSiteInfo 根据主机名查询网站元信息。
// 全程只读内存缓存(零 bbolt 读事务),适合高并发调用。
// 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。
func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) {
d.siteCacheMu.RLock()
info, ok := d.siteCache[host]
d.siteCacheMu.RUnlock()
if ok {
// 返回深拷贝,防止调用方修改缓存
cp := *info
if cp.Languages == nil {
cp.Languages = make(map[string]float64)
} else {
cp.Languages = copyMapF64(cp.Languages)
}
if cp.Redirects == nil {
cp.Redirects = make(map[string]string)
} else {
cp.Redirects = copyMapStr(cp.Redirects)
}
return &cp, nil
}
// 缓存未命中,从 bbolt 加载
// 注意:不能在 bbolt 事务回调(如 ForEachSnippet 的 fn)内调用此路径(bbolt 不支持嵌套事务)。
// 正常情况下预热已加载全部数据,不会走到这里。
var si SiteInfo
var err error
func() {
defer func() {
if r := recover(); r != nil {
// bbolt 嵌套事务 panic,返回空 SiteInfo
si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
err = nil
}
}()
err = d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &si)
})
}()
if err != nil {
return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil
}
if si.Languages == nil {
si.Languages = make(map[string]float64)
}
if si.Redirects == nil {
si.Redirects = make(map[string]string)
}
// 回填缓存
d.siteCacheMu.Lock()
if existing, exists := d.siteCache[host]; !exists {
d.siteCache[host] = &si
} else {
si = *existing // 用更新的版本
}
d.siteCacheMu.Unlock()
return &si, nil
}
// SetSiteInfo 将网站元信息直接写入内存缓存并标记为脏(异步刷入磁盘)。
func (d *DB) SetSiteInfo(host string, info *SiteInfo) error {
cp := *info // 浅拷贝,断开与调用方的引用
if cp.Languages != nil {
cp.Languages = copyMapF64(cp.Languages)
}
if cp.Redirects != nil {
cp.Redirects = copyMapStr(cp.Redirects)
}
d.siteCacheMu.Lock()
d.siteCache[host] = &cp
d.siteCacheMu.Unlock()
d.siteDirtyMu.Lock()
d.siteDirty[host] = struct{}{}
d.siteDirtyMu.Unlock()
return nil
}
// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回缓存。
// 只用 siteCacheMu(不碰 bbolt),不会与 flush 产生跨锁依赖。
func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error {
d.siteCacheMu.Lock()
// 从缓存读取(缓存未命中会返回 nil,后续初始化为空)
info, ok := d.siteCache[host]
if !ok {
// 从 bbolt 加载(注意:这是唯一会做 bbolt 读的地方,但只在首次 miss 时触发)
var si SiteInfo
err := d.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
if v == nil {
return fmt.Errorf("not found")
}
return decompressUnmarshal(v, &si)
})
if err != nil {
si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
} else {
if si.Languages == nil {
si.Languages = make(map[string]float64)
}
if si.Redirects == nil {
si.Redirects = make(map[string]string)
}
}
d.siteCache[host] = &si
info = &si
}
// 在锁内调用修改函数(直接修改缓存中的对象)
fn(info)
d.siteCacheMu.Unlock()
// 标记脏(锁外)
d.siteDirtyMu.Lock()
d.siteDirty[host] = struct{}{}
d.siteDirtyMu.Unlock()
return nil
}
// ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。
func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error {
var info SiteInfo
if err := decompressUnmarshal(v, &info); err != nil {
return nil // 跳过损坏条目
}
return fn(string(k), &info)
})
})
}
// ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。
func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error {
return d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error {
var entry SnippetEntry
if err := decompressUnmarshal(v, &entry); err != nil {
return nil // 跳过损坏条目
}
return fn(string(k), &entry)
})
})
}
// ---- 优先爬取队列(Priority Queue)相关方法 ----
// PriorityEntry 记录一条待优先爬取的 URL 或域名。
type PriorityEntry struct {
URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL)
IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL
AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳
Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记)
}
// GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。
func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) {
var entries []PriorityEntry
err := d.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil // 跳过损坏条目
}
if !e.Visited {
entries = append(entries, e)
}
return nil
})
})
return entries, err
}
// AddPriorityURL 添加一条 priority 条目(key = URLvalue = PriorityEntry)。
// 若已存在(且未访问)则忽略。
func (d *DB) AddPriorityURL(entry PriorityEntry) error {
return d.db.Update(func(tx *bolt.Tx) error {
k := []byte(entry.URL)
existing := tx.Bucket(bucketPriority).Get(k)
if existing != nil {
var e PriorityEntry
if err := decompressUnmarshal(existing, &e); err == nil && !e.Visited {
return nil // 已存在且未访问,忽略
}
}
data, err := marshalCompress(entry)
if err != nil {
return err
}
return tx.Bucket(bucketPriority).Put(k, data)
})
}
// RemovePriorityURL 删除指定 URL 的 priority 条目。
func (d *DB) RemovePriorityURL(url string) error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).Delete([]byte(url))
})
}
// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。
func (d *DB) ClearVisitedPriorityURLs() error {
return d.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error {
var e PriorityEntry
if err := decompressUnmarshal(v, &e); err != nil {
return nil
}
if e.Visited {
return tx.Bucket(bucketPriority).Delete(k)
}
return nil
})
})
}