fix语种bug
This commit is contained in:
+224
-116
@@ -46,11 +46,9 @@ var (
|
||||
bucketPriority = []byte("priority") // 优先爬取 URL bucket
|
||||
)
|
||||
|
||||
// writeOp 表示一个待写入的操作。
|
||||
// writeOp 表示一个待写入的 snippet 操作。
|
||||
type writeOp struct {
|
||||
opType int // 0 = set snippet, 1 = set site info
|
||||
key string // URL 或 host
|
||||
data []byte // marshalCompress 后的数据
|
||||
data []byte // marshalCompress 后的数据
|
||||
}
|
||||
|
||||
// DB 封装一个 bbolt 数据库,提供类型化的存取接口。
|
||||
@@ -58,11 +56,20 @@ type writeOp struct {
|
||||
type DB struct {
|
||||
db *bolt.DB // 底层 bbolt 数据库句柄
|
||||
|
||||
// 异步写缓冲:SetSnippet/SetSiteInfo 先写到内存,定期批量刷入 bbolt。
|
||||
// 异步写缓冲:SetSnippet 先写到内存,定期批量刷入 bbolt。
|
||||
writeBuf map[string]*writeOp // key → 待写入的操作
|
||||
writeBufMu sync.Mutex
|
||||
writeTicker *time.Ticker
|
||||
writeDone chan struct{}
|
||||
|
||||
// SiteInfo 独立内存缓存:读全走缓存(零 bbolt 读事务),定期批量刷入 bbolt。
|
||||
// 使用 RWMutex 允许多个读并发,写(UpdateSiteInfo)独占。
|
||||
siteCache map[string]*SiteInfo // host → 最新 SiteInfo
|
||||
siteCacheMu sync.RWMutex
|
||||
siteDirty map[string]struct{} // 需要刷盘的 host 集合
|
||||
siteDirtyMu sync.Mutex
|
||||
siteTicker *time.Ticker
|
||||
siteDone chan struct{}
|
||||
}
|
||||
|
||||
// Open 在指定目录路径下创建或打开 bbolt 数据库文件。
|
||||
@@ -97,13 +104,37 @@ func Open(dir string) (*DB, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage.Open create buckets: %w", err)
|
||||
}
|
||||
return &DB{db: db}, nil
|
||||
|
||||
d := &DB{db: db}
|
||||
|
||||
// 启动时预热 SiteInfo 缓存:一次性从 bbolt 加载所有 SiteInfo 到内存
|
||||
d.siteCache = make(map[string]*SiteInfo)
|
||||
_ = db.View(func(tx *bolt.Tx) error {
|
||||
return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error {
|
||||
var info SiteInfo
|
||||
if err := decompressUnmarshal(v, &info); err != nil {
|
||||
return nil // 跳过损坏条目
|
||||
}
|
||||
if info.Languages == nil {
|
||||
info.Languages = make(map[string]float64)
|
||||
}
|
||||
if info.Redirects == nil {
|
||||
info.Redirects = make(map[string]string)
|
||||
}
|
||||
d.siteCache[string(k)] = &info
|
||||
return nil
|
||||
})
|
||||
})
|
||||
log.Printf("[storage] siteCache warmed: %d hosts loaded", len(d.siteCache))
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine。
|
||||
// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine(snippet 和 SiteInfo 各自独立)。
|
||||
func (d *DB) StartWriteFlusher() {
|
||||
// Snippet 写缓冲
|
||||
d.writeBuf = make(map[string]*writeOp)
|
||||
d.writeTicker = time.NewTicker(2 * time.Second) // 每 2 秒刷一次
|
||||
d.writeTicker = time.NewTicker(2 * time.Second)
|
||||
d.writeDone = make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
@@ -115,9 +146,25 @@ func (d *DB) StartWriteFlusher() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// SiteInfo 独立缓存
|
||||
d.siteCache = make(map[string]*SiteInfo)
|
||||
d.siteDirty = make(map[string]struct{})
|
||||
d.siteTicker = time.NewTicker(5 * time.Second)
|
||||
d.siteDone = make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-d.siteTicker.C:
|
||||
d.flushSiteCache()
|
||||
case <-d.siteDone:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// flushWriteBuf 将写缓冲中的所有待写入操作批量刷入 bbolt。
|
||||
// flushWriteBuf 将写缓冲中的 snippet 操作批量刷入 bbolt。
|
||||
func (d *DB) flushWriteBuf() {
|
||||
d.writeBufMu.Lock()
|
||||
if len(d.writeBuf) == 0 {
|
||||
@@ -129,33 +176,12 @@ func (d *DB) flushWriteBuf() {
|
||||
d.writeBuf = make(map[string]*writeOp)
|
||||
d.writeBufMu.Unlock()
|
||||
|
||||
// 预先按 bucket 分组
|
||||
snippets := make(map[string][]byte)
|
||||
siteInfos := make(map[string][]byte)
|
||||
for key, op := range snapshot {
|
||||
if op.opType == 0 {
|
||||
snippets[key] = op.data
|
||||
} else {
|
||||
siteInfos[key] = op.data
|
||||
}
|
||||
}
|
||||
|
||||
// 单个事务批量写入
|
||||
if err := d.db.Update(func(tx *bolt.Tx) error {
|
||||
if len(snippets) > 0 {
|
||||
b := tx.Bucket(bucketGate)
|
||||
for k, v := range snippets {
|
||||
if err := b.Put([]byte(k), v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(siteInfos) > 0 {
|
||||
b := tx.Bucket(bucketSiteGate)
|
||||
for k, v := range siteInfos {
|
||||
if err := b.Put([]byte(k), v); err != nil {
|
||||
return err
|
||||
}
|
||||
b := tx.Bucket(bucketGate)
|
||||
for k, op := range snapshot {
|
||||
if err := b.Put([]byte(k), op.data); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -164,20 +190,93 @@ func (d *DB) flushWriteBuf() {
|
||||
}
|
||||
}
|
||||
|
||||
// flushSiteCache 将脏的 SiteInfo 批量刷入 bbolt。
|
||||
func (d *DB) flushSiteCache() {
|
||||
d.siteDirtyMu.Lock()
|
||||
if len(d.siteDirty) == 0 {
|
||||
d.siteDirtyMu.Unlock()
|
||||
return
|
||||
}
|
||||
dirty := d.siteDirty
|
||||
d.siteDirty = make(map[string]struct{})
|
||||
d.siteDirtyMu.Unlock()
|
||||
|
||||
// 在读锁下快照所有脏数据并预压缩
|
||||
d.siteCacheMu.RLock()
|
||||
type kv struct {
|
||||
host string
|
||||
data []byte
|
||||
}
|
||||
items := make([]kv, 0, len(dirty))
|
||||
for host := range dirty {
|
||||
if info, ok := d.siteCache[host]; ok {
|
||||
data, err := marshalCompress(info)
|
||||
if err != nil {
|
||||
log.Printf("[storage] flushSiteCache marshal error for %s: %v", host, err)
|
||||
continue
|
||||
}
|
||||
items = append(items, kv{host, data})
|
||||
}
|
||||
}
|
||||
d.siteCacheMu.RUnlock()
|
||||
|
||||
if len(items) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 单个事务批量写入 bbolt
|
||||
if err := d.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucketSiteGate)
|
||||
for _, item := range items {
|
||||
if err := b.Put([]byte(item.host), item.data); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Printf("[storage] flushSiteCache error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close 关闭底层 bbolt 数据库连接。
|
||||
// 先刷完写缓冲,再关闭 ticker,最后关闭数据库。
|
||||
// 先刷完写缓冲和 SiteInfo 缓存,再关闭定时器,最后关闭数据库。
|
||||
func (d *DB) Close() error {
|
||||
if d.writeTicker != nil {
|
||||
d.writeTicker.Stop()
|
||||
}
|
||||
d.flushWriteBuf() // 最后刷一次,确保数据不丢失
|
||||
if d.siteTicker != nil {
|
||||
d.siteTicker.Stop()
|
||||
}
|
||||
d.flushWriteBuf()
|
||||
d.flushSiteCache()
|
||||
if d.writeDone != nil {
|
||||
close(d.writeDone)
|
||||
}
|
||||
if d.siteDone != nil {
|
||||
close(d.siteDone)
|
||||
}
|
||||
return d.db.Close()
|
||||
}
|
||||
|
||||
// ---- 辅助函数:压缩与解压 ----
|
||||
// ---- 辅助函数:压缩与解压、map 拷贝 ----
|
||||
|
||||
// copyMapF64 深拷贝 map[string]float64。
|
||||
func copyMapF64(m map[string]float64) map[string]float64 {
|
||||
cp := make(map[string]float64, len(m))
|
||||
for k, v := range m {
|
||||
cp[k] = v
|
||||
}
|
||||
return cp
|
||||
}
|
||||
|
||||
// copyMapStr 深拷贝 map[string]string。
|
||||
func copyMapStr(m map[string]string) map[string]string {
|
||||
cp := make(map[string]string, len(m))
|
||||
for k, v := range m {
|
||||
cp[k] = v
|
||||
}
|
||||
return cp
|
||||
}
|
||||
|
||||
// compress 将字节数组用 brotli 压缩后返回。
|
||||
// brotli 压缩比高于 gzip,适合大量文本的存储空间优化。
|
||||
@@ -379,7 +478,7 @@ func (d *DB) SetSnippet(url string, entry *SnippetEntry) error {
|
||||
return err
|
||||
}
|
||||
d.writeBufMu.Lock()
|
||||
d.writeBuf["snippet:"+url] = &writeOp{opType: 0, key: url, data: data}
|
||||
d.writeBuf[url] = &writeOp{data: data}
|
||||
// 缓冲过大时同步刷一次,防止内存膨胀
|
||||
if len(d.writeBuf) >= 5000 {
|
||||
d.writeBufMu.Unlock()
|
||||
@@ -410,82 +509,97 @@ type SiteInfo struct {
|
||||
}
|
||||
|
||||
// GetSiteInfo 根据主机名查询网站元信息。
|
||||
// 优先从写缓冲中读取(保证与最近 SetSiteInfo 的数据一致),未命中再读 bbolt。
|
||||
// 全程只读内存缓存(零 bbolt 读事务),适合高并发调用。
|
||||
// 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。
|
||||
func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) {
|
||||
// 先检查写缓冲中是否有该 host 的最新待写入数据
|
||||
d.writeBufMu.Lock()
|
||||
if op, ok := d.writeBuf["site:"+host]; ok {
|
||||
d.writeBufMu.Unlock()
|
||||
var info SiteInfo
|
||||
if err := decompressUnmarshal(op.data, &info); err == nil {
|
||||
if info.Languages == nil {
|
||||
info.Languages = make(map[string]float64)
|
||||
}
|
||||
if info.Redirects == nil {
|
||||
info.Redirects = make(map[string]string)
|
||||
}
|
||||
return &info, nil
|
||||
d.siteCacheMu.RLock()
|
||||
info, ok := d.siteCache[host]
|
||||
d.siteCacheMu.RUnlock()
|
||||
|
||||
if ok {
|
||||
// 返回深拷贝,防止调用方修改缓存
|
||||
cp := *info
|
||||
if cp.Languages == nil {
|
||||
cp.Languages = make(map[string]float64)
|
||||
} else {
|
||||
cp.Languages = copyMapF64(cp.Languages)
|
||||
}
|
||||
// 反序列化失败(不应发生),fall through 到读 db
|
||||
} else {
|
||||
d.writeBufMu.Unlock()
|
||||
if cp.Redirects == nil {
|
||||
cp.Redirects = make(map[string]string)
|
||||
} else {
|
||||
cp.Redirects = copyMapStr(cp.Redirects)
|
||||
}
|
||||
return &cp, nil
|
||||
}
|
||||
|
||||
var info SiteInfo
|
||||
err := d.db.View(func(tx *bolt.Tx) error {
|
||||
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
|
||||
if v == nil {
|
||||
return fmt.Errorf("not found")
|
||||
}
|
||||
return decompressUnmarshal(v, &info)
|
||||
})
|
||||
// 缓存未命中,从 bbolt 加载
|
||||
// 注意:不能在 bbolt 事务回调(如 ForEachSnippet 的 fn)内调用此路径(bbolt 不支持嵌套事务)。
|
||||
// 正常情况下预热已加载全部数据,不会走到这里。
|
||||
var si SiteInfo
|
||||
var err error
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// bbolt 嵌套事务 panic,返回空 SiteInfo
|
||||
si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
|
||||
err = nil
|
||||
}
|
||||
}()
|
||||
err = d.db.View(func(tx *bolt.Tx) error {
|
||||
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
|
||||
if v == nil {
|
||||
return fmt.Errorf("not found")
|
||||
}
|
||||
return decompressUnmarshal(v, &si)
|
||||
})
|
||||
}()
|
||||
if err != nil {
|
||||
// 未找到时返回带默认空 map 的空结构,避免调用方空指针 panic
|
||||
return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil
|
||||
}
|
||||
if info.Languages == nil {
|
||||
info.Languages = make(map[string]float64)
|
||||
if si.Languages == nil {
|
||||
si.Languages = make(map[string]float64)
|
||||
}
|
||||
if info.Redirects == nil {
|
||||
info.Redirects = make(map[string]string)
|
||||
if si.Redirects == nil {
|
||||
si.Redirects = make(map[string]string)
|
||||
}
|
||||
return &info, nil
|
||||
// 回填缓存
|
||||
d.siteCacheMu.Lock()
|
||||
if existing, exists := d.siteCache[host]; !exists {
|
||||
d.siteCache[host] = &si
|
||||
} else {
|
||||
si = *existing // 用更新的版本
|
||||
}
|
||||
d.siteCacheMu.Unlock()
|
||||
return &si, nil
|
||||
}
|
||||
|
||||
// SetSiteInfo 将网站元信息写入写缓冲(异步批量刷入磁盘)。
|
||||
// SetSiteInfo 将网站元信息直接写入内存缓存并标记为脏(异步刷入磁盘)。
|
||||
func (d *DB) SetSiteInfo(host string, info *SiteInfo) error {
|
||||
data, err := marshalCompress(info)
|
||||
if err != nil {
|
||||
return err
|
||||
cp := *info // 浅拷贝,断开与调用方的引用
|
||||
if cp.Languages != nil {
|
||||
cp.Languages = copyMapF64(cp.Languages)
|
||||
}
|
||||
d.writeBufMu.Lock()
|
||||
d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data}
|
||||
if len(d.writeBuf) >= 5000 {
|
||||
d.writeBufMu.Unlock()
|
||||
d.flushWriteBuf()
|
||||
return nil
|
||||
if cp.Redirects != nil {
|
||||
cp.Redirects = copyMapStr(cp.Redirects)
|
||||
}
|
||||
d.writeBufMu.Unlock()
|
||||
d.siteCacheMu.Lock()
|
||||
d.siteCache[host] = &cp
|
||||
d.siteCacheMu.Unlock()
|
||||
d.siteDirtyMu.Lock()
|
||||
d.siteDirty[host] = struct{}{}
|
||||
d.siteDirtyMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回。
|
||||
// 整个读-改-写过程在 writeBufMu 锁内完成,消除并发 lost update 竞态。
|
||||
// 适用于多个 goroutine 对同一 host 的 SiteInfo 进行读-改-写的场景。
|
||||
// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回缓存。
|
||||
// 只用 siteCacheMu(不碰 bbolt),不会与 flush 产生跨锁依赖。
|
||||
func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error {
|
||||
d.writeBufMu.Lock()
|
||||
d.siteCacheMu.Lock()
|
||||
|
||||
// 从缓冲或 db 读取最新 SiteInfo
|
||||
var info *SiteInfo
|
||||
if op, ok := d.writeBuf["site:"+host]; ok {
|
||||
var si SiteInfo
|
||||
if err := decompressUnmarshal(op.data, &si); err == nil {
|
||||
info = &si
|
||||
}
|
||||
}
|
||||
|
||||
if info == nil {
|
||||
// 从缓存读取(缓存未命中会返回 nil,后续初始化为空)
|
||||
info, ok := d.siteCache[host]
|
||||
if !ok {
|
||||
// 从 bbolt 加载(注意:这是唯一会做 bbolt 读的地方,但只在首次 miss 时触发)
|
||||
var si SiteInfo
|
||||
err := d.db.View(func(tx *bolt.Tx) error {
|
||||
v := tx.Bucket(bucketSiteGate).Get([]byte(host))
|
||||
@@ -495,34 +609,28 @@ func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error {
|
||||
return decompressUnmarshal(v, &si)
|
||||
})
|
||||
if err != nil {
|
||||
info = &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
|
||||
si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}
|
||||
} else {
|
||||
info = &si
|
||||
if si.Languages == nil {
|
||||
si.Languages = make(map[string]float64)
|
||||
}
|
||||
if si.Redirects == nil {
|
||||
si.Redirects = make(map[string]string)
|
||||
}
|
||||
}
|
||||
}
|
||||
if info.Languages == nil {
|
||||
info.Languages = make(map[string]float64)
|
||||
}
|
||||
if info.Redirects == nil {
|
||||
info.Redirects = make(map[string]string)
|
||||
d.siteCache[host] = &si
|
||||
info = &si
|
||||
}
|
||||
|
||||
// 在锁内调用修改函数
|
||||
// 在锁内调用修改函数(直接修改缓存中的对象)
|
||||
fn(info)
|
||||
|
||||
// 序列化并写回缓冲
|
||||
data, err := marshalCompress(info)
|
||||
if err != nil {
|
||||
d.writeBufMu.Unlock()
|
||||
return err
|
||||
}
|
||||
d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data}
|
||||
if len(d.writeBuf) >= 5000 {
|
||||
d.writeBufMu.Unlock()
|
||||
d.flushWriteBuf()
|
||||
return nil
|
||||
}
|
||||
d.writeBufMu.Unlock()
|
||||
d.siteCacheMu.Unlock()
|
||||
|
||||
// 标记脏(锁外)
|
||||
d.siteDirtyMu.Lock()
|
||||
d.siteDirty[host] = struct{}{}
|
||||
d.siteDirtyMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user