diff --git a/storage/storage.go b/storage/storage.go index 13f7fd6..4b4893a 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -12,6 +12,7 @@ import ( "encoding/json" // JSON 序列化/反序列化 "fmt" // 格式化错误信息 "io" // io.EOF 常量 + "log" // 日志输出 "os" // 操作系统功能(创建目录等) "path/filepath" // 路径拼接 @@ -188,22 +189,47 @@ func (d *DB) SetIndex(keyword string, entries []IndexEntry) error { }) } -// BatchSetIndex 在一次事务中批量写入多个关键词→条目列表的映射。 -// 比多次调用 SetIndex 效率更高(减少事务开销)。 +// BatchSetIndex 分批写入多个关键词→条目列表的映射。 +// 大数据量时拆分为多个小事务,避免单个大事务导致的长时间阻塞。 func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error { - return d.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketIndex) - for keyword, entries := range batch { - data, err := marshalCompress(entries) - if err != nil { - return err - } - if err := b.Put([]byte(keyword), data); err != nil { - return err - } + const batchSize = 1000 + items := make([]struct { + keyword string + entries []IndexEntry + }, 0, len(batch)) + for k, v := range batch { + items = append(items, struct { + keyword string + entries []IndexEntry + }{k, v}) + } + totalBatches := (len(items) + batchSize - 1) / batchSize + for i := 0; i < len(items); i += batchSize { + end := i + batchSize + if end > len(items) { + end = len(items) } - return nil - }) + batchNum := i/batchSize + 1 + if err := d.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketIndex) + for _, item := range items[i:end] { + data, err := marshalCompress(item.entries) + if err != nil { + return err + } + if err := b.Put([]byte(item.keyword), data); err != nil { + return err + } + } + return nil + }); err != nil { + return err + } + if totalBatches > 1 { + log.Printf("[storage] BatchSetIndex progress: batch %d/%d (%d keys)", batchNum, totalBatches, end) + } + } + return nil } // ForEachIndex 遍历倒排索引中所有关键词及其关联条目,对每个条目调用 fn。