From a9cb0b248131376b8ea4cb09d1679e52937de491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Mon, 20 Apr 2026 18:26:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=88=90redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 82 ++--- backlink/backlink.go | 12 +- config/config.example.yml | 115 ++++++ config/config.go | 193 ++++++++++ crawler/crawler.go | 38 +- crawler/fetcher.go | 11 +- go.mod | 10 +- go.sum | 22 +- main.go | 61 +++- mysql/flusher.go | 445 ++++++++++++++++++++++ mysql/init_db.sql | 98 +++++ mysql/mysql.go | 416 +++++++++++++++++++++ redis/redis.go | 82 +++++ search/server.go | 318 ++++++++++++---- storage/storage.go | 750 -------------------------------------- storage/store.go | 651 +++++++++++++++++++++++++++++++++ storage/types.go | 37 ++ 17 files changed, 2408 insertions(+), 933 deletions(-) create mode 100644 config/config.example.yml create mode 100644 mysql/flusher.go create mode 100644 mysql/init_db.sql create mode 100644 mysql/mysql.go create mode 100644 redis/redis.go delete mode 100644 storage/storage.go create mode 100644 storage/store.go create mode 100644 storage/types.go diff --git a/README.md b/README.md index 270dd29..2804039 100644 --- a/README.md +++ b/README.md @@ -5,85 +5,61 @@ Python 原版的 Go 语言重构,使用标准英文命名,单二进制部署 ## 目录结构 ``` -golang/ +sese-engine-go/ ├── main.go # 主入口,goroutine 启动所有模块 ├── go.mod ├── config/ -│ └── config.go # 全局配置参数(对应 配置.py) +│ └── config.go # 全局配置参数 ├── storage/ -│ └── storage.go # bbolt 持久化层(对应 存储.py,替换 rimo_storage) +│ ├── types.go # 核心数据类型定义 +│ └── store.go # Redis + MySQL 存储层 ├── crawler/ -│ ├── crawler.go # BFS 爬虫调度(对应 上网.py) -│ └── fetcher.go # HTTP 获取 + robots.txt + 限流(对应 虫.py) +│ ├── crawler.go # BFS 爬虫调度 +│ └── fetcher.go # HTTP 获取 + robots.txt + 限流 ├── parser/ -│ └── parser.go # HTML 解析(对应 文.py) +│ └── parser.go # HTML 解析 ├── analyzer/ -│ └── analyzer.go # 分词 + 关键词权重(对应 分析.py + utils.py 分词部分) -│ 使用 gojieba(中文)+ gofasttext(语言检测) -├── harvester/ -│ └── harvester.go # 索引写入服务,监听 :5000(对应 收获服务器.py) +│ └── analyzer.go # 分词 + 关键词权重 + 语言检测 ├── search/ -│ └── server.go # 搜索 API,监听 :80(对应 人服务器.py) +│ └── server.go # 搜索 API + 收获服务 ├── backlink/ -│ └── backlink.go # 反向链接计算,每 48h 运行(对应 回.py) +│ └── backlink.go # 反向链接计算 └── info/ - └── info.go # 繁荣表 / 调整表 / 屏蔽词加载(对应 信息.py) + └── info.go # 繁荣表 / 调整表 / 屏蔽词 ``` ## 依赖项 -| Go 包 | 替代 Python 包 | 用途 | -|-------|--------------|------| -| `github.com/yanyiwu/gojieba` | `jieba` | 中文分词 | -| `github.com/nicholasgasior/gofasttext` | `fasttext` | 语言检测 | -| `go.etcd.io/bbolt` | `rimo_storage` | KV 存储 / 倒排索引 | -| `github.com/andybalholm/brotli` | `brotli` | 压缩 | -| `golang.org/x/net/html` | `lxml` | HTML 解析 | -| `golang.org/x/net/html/charset` | chardet | 编码检测 | +| Go 包 | 用途 | +|-------|------| +| `github.com/yanyiwu/gojieba` | 中文分词 | +| `github.com/pemistahl/lingua-go` | 语言检测(内置模型) | +| `github.com/redis/go-redis/v9` | Redis 客户端(高性能内存存储) | +| `github.com/go-sql-driver/mysql` | MySQL 驱动(持久化存储) | +| `github.com/andybalholm/brotli` | Brotli 压缩 | +| `golang.org/x/net/html` | HTML 解析 | ## 构建与运行 ```bash -cd golang - -# 下载依赖(需要 CGo 编译器,用于 gojieba / gofasttext) +# 下载依赖(需要 CGo 编译器,用于 gojieba) go mod tidy # 构建 go build -o sese-engine . -# 运行(在 sese-engine 项目根目录下) -cd .. -./golang/sese-engine \ - --storage ./savedata \ - --entry https://zh.wikipedia.org/ \ - --fasttext ./lid.176.ftz \ - --stopwords ./data/标点符号.json +# 运行 +./sese-engine --storage ./savedata --entry https://zh.wikipedia.org/ ``` -一个进程启动所有模块: -- `:5000` — 收获服务器(爬虫推送关键词) -- `:80` — 搜索 API(`GET /search?q=关键词`) -- 后台 goroutine — BFS 爬虫 -- 后台 goroutine — 每 48 小时反向链接计算 +## 架构说明 -## 与 Python 版的主要差异 - -| 方面 | Python 版 | Go 版 | -|------|---------|-------| -| 并发 | GIL + 线程池(假并发) | goroutine 真并发 | -| 存储 | rimo_storage(自研)| bbolt(嵌入式 KV) | -| 部署 | 需要 Python 环境 | 单二进制,无运行时依赖 | -| 命名 | 全中文 | 标准英文 | -| 进程数 | 3~4 个进程 | 1 个进程多 goroutine | -| 编码检测 | requests 自动检测 | `golang.org/x/net/html/charset` | -| Prometheus | 可选 | 暂未集成(可后续添加) | +- **Redis**:高性能内存存储,用于实时索引读写 +- **MySQL**:持久化存储,用于数据备份和复杂查询 +- **内存聚合**:写入先到 Redis,定期合并到 MySQL ## 注意事项 -1. **CGo 依赖**:gojieba 和 gofasttext 均需要 C/C++ 编译器(gcc/clang)。 - Windows 下建议使用 MinGW 或 WSL。 -2. **fasttext 模型**:`lid.176.ftz` 需要与 Python 版共用,路径通过 `--fasttext` 指定。 -3. **数据迁移**:存储格式(bbolt JSON)与 Python 版(rimo_storage 二进制)不兼容, - 需要全新爬取,或编写迁移脚本。 -4. **stop words 文件**:复用 Python 版的 `data/标点符号.json`。 +1. **CGo 依赖**:gojieba 需要 C/C++ 编译器(gcc/clang)。 +2. **Redis + MySQL**:需要提前部署并配置连接参数。 +3. **配置文件**:`config/config.yml` 中配置数据库连接信息。 diff --git a/backlink/backlink.go b/backlink/backlink.go index 8d47007..f18dc20 100644 --- a/backlink/backlink.go +++ b/backlink/backlink.go @@ -21,7 +21,7 @@ import ( // Runner 管理反向链接计算循环。 type Runner struct { - db *storage.DB + store *storage.RedisStoreV2 storagePath string // 存储根目录(用于写入 prosper.json) mu sync.Mutex running bool // 是否正在计算中 @@ -31,8 +31,8 @@ type Runner struct { } // New 创建一个 Runner 实例。 -func New(db *storage.DB, storagePath string) *Runner { - return &Runner{db: db, storagePath: storagePath} +func New(store *storage.RedisStoreV2, storagePath string) *Runner { + return &Runner{store: store, storagePath: storagePath} } // Status 返回反链计算器的当前状态快照。 @@ -179,7 +179,7 @@ func (r *Runner) collectStats() *siteStats { serverCount: make(map[string]int), } - _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + _ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error { super := superDomain(host) stats.subdomainCount[super]++ @@ -235,7 +235,7 @@ func (r *Runner) aggregate(filter func(*storage.SiteInfo) bool, stats *siteStats pruneThreshold := 0.02 i := 0 - _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + _ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error { if filter != nil && !filter(info) { return nil } @@ -336,7 +336,7 @@ func (r *Runner) aggregateWithScores(scores map[string]float64, stats *siteStats serverTable := buildServerTable(stats.serverCount) vectors := make(map[string][]float32) - _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + _ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error { score, ok := scores[host] if !ok || strings.Contains(host, "/") { return nil diff --git a/config/config.example.yml b/config/config.example.yml new file mode 100644 index 0000000..757a66d --- /dev/null +++ b/config/config.example.yml @@ -0,0 +1,115 @@ +# ============================================ +# sese-engine 配置文件示例 +# ============================================ + +index: + max_urls_per_key: 11000 # 每个关键词最大URL数量 + max_same_domain_per_key: 20 # 每个关键词同域名最大数量 + big_clean_threshold: 2000000 # 大清理阈值 + max_new_urls_per_key: 10000 # 每个关键词最大新增URL + min_urls_for_new_key: 3 # 新关键词最小URL数量 + +crawler: + spider_name: "Haibara_AI_spider" # 爬虫名称 + cooldown: 3 # 请求间隔(秒) + workers: 22 # 并发数 + crawl_focus: 0.7 # 爬取聚焦系数 + max_keywords_per_page: 250 # 每页最大关键词数 + max_epoch: 100 # 最大轮数 + expected_prosper_ratio: 0.6 # 期望繁荣值比例 + entry_url: "https://haibara.ai/" # 入口URL + max_page_size: 5242880 # 单页最大5MB + recrawl_max_age: 2592000 # 重爬过期30天 + recrawl_check_interval: 3600 # 检查间隔1小时 + recrawl_batch_size: 500 # 每批500个 + max_priority_children: 100 # 优先队列子链接上限 + +search: + use_online_snippet: true + online_snippet_timeout: 3 + weight_daily_decay: 0.996 + language_weight: 0.5 + consecutive_key_weight: 1.3 + backlink_weight: 1.0 + server_port: 50082 + flush_interval_seconds: 300 + stats_refresh_interval: 30 + miss_penalty: 0.15 + unix_socket: "" # Unix Socket路径(可选) + +backlink: + baseline: 200000 # 反链基准值 + +storage: + path: "./savedata" # 存储路径 + +# ============================================ +# MySQL 配置(默认关闭,启用时需设置 enabled: true) +# ============================================ +# 支持两种连接方式:Unix Socket 和 TCP +# 优先级:UnixSocket > TCP(如果UnixSocket非空则优先使用) +mysql: + # 是否启用 MySQL(默认 false,不启用则不连接、不刷盘) + enabled: false + + # 连接方式(可选,默认tcp,可设为 "socket" 或 "tcp") + # network: "tcp" + + # ----- Unix Socket 连接(推荐,本地MySQL性能更好)----- + # 设置为 MySQL socket 路径即可,TCP配置将被忽略 + unix_socket: "/var/run/mysqld/mysqld.sock" + # unix_socket: "/tmp/mysql.sock" # macOS 常见路径 + # unix_socket: "" # 留空则使用TCP连接 + + # ----- TCP 连接 ----- + host: "localhost" + port: 3306 + + # ----- 认证信息 ----- + user: "root" + password: "your_password_here" + database: "sese_engine" + + # ----- 连接池配置 ----- + conn_max_lifetime: 3600 # 连接最大生命周期(秒),默认1小时 + max_idle_conns: 10 # 最大空闲连接数 + max_open_conns: 100 # 最大打开连接数 + +# ============================================ +# Redis 配置 +# ============================================ +# 支持两种连接方式:Unix Socket 和 TCP +# 优先级:UnixSocket > TCP(如果UnixSocket非空则优先使用) +redis: + # 连接方式(可选,默认tcp) + # network: "tcp" + + # ----- Unix Socket 连接(推荐,本地Redis性能更好)----- + # 设置为 Redis socket 路径即可,TCP配置将被忽略 + unix_socket: "/var/run/redis/redis.sock" + # unix_socket: "/tmp/redis.sock" # macOS 常见路径 + # unix_socket: "" # 留空则使用TCP连接 + + # ----- TCP 连接 ----- + host: "localhost" + port: 6379 + + # ----- 认证信息(无密码则留空)----- + password: "" + + # ----- 数据库配置 ----- + # 数据库编号(0-15),默认使用15号数据库 + db: 15 + + # ----- 连接池配置 ----- + pool_size: 100 # 连接池最大连接数 + min_idle_conns: 10 # 最小空闲连接数 + + # ----- 超时配置(毫秒)----- + read_timeout: 500 # 读超时 + write_timeout: 500 # 写超时 + +prometheus: + crawler_port: 14950 + backlink_port: 14952 + search_port: 14953 diff --git a/config/config.go b/config/config.go index 0609e98..4259221 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,8 @@ type Config struct { Search SearchConfig `yaml:"search"` Backlink BacklinkConfig `yaml:"backlink"` Storage StorageConfig `yaml:"storage"` + MySQL MySQLConfig `yaml:"mysql"` + Redis RedisConfig `yaml:"redis"` Prometheus PrometheusConfig `yaml:"prometheus"` } @@ -77,6 +79,62 @@ type StorageConfig struct { Path string `yaml:"path"` } +// MySQLConfig MySQL数据库连接配置 +// 支持两种连接方式:Unix Socket 和 TCP +// 优先级:UnixSocket > TCP(如果UnixSocket非空则优先使用) +type MySQLConfig struct { + // 是否启用 MySQL(默认关闭) + Enabled bool `yaml:"enabled"` + // 连接方式: "socket" 或 "tcp"(自动推断,可不填) + Network string `yaml:"network"` + // Unix Socket 路径(Linux/macOS),优先使用 + // 示例: "/var/run/mysqld/mysqld.sock" 或 "/tmp/mysql.sock" + UnixSocket string `yaml:"unix_socket"` + // TCP 连接方式:服务器地址 + Host string `yaml:"host"` + // TCP 连接方式:端口号 + Port int `yaml:"port"` + // 用户名 + User string `yaml:"user"` + // 密码 + Password string `yaml:"password"` + // 数据库名 + Database string `yaml:"database"` + // 连接超时时间(秒) + ConnMaxLifetime int `yaml:"conn_max_lifetime"` // 秒 + // 最大空闲连接数 + MaxIdleConns int `yaml:"max_idle_conns"` + // 最大打开连接数 + MaxOpenConns int `yaml:"max_open_conns"` +} + +// RedisConfig Redis连接配置 +// 支持两种连接方式:Unix Socket 和 TCP +// 优先级:UnixSocket > TCP(如果UnixSocket非空则优先使用) +type RedisConfig struct { + // 连接方式: "socket" 或 "tcp"(自动推断,可不填) + Network string `yaml:"network"` + // Unix Socket 路径,优先使用 + // 示例: "/var/run/redis/redis.sock" 或 "/tmp/redis.sock" + UnixSocket string `yaml:"unix_socket"` + // TCP 连接方式:服务器地址 + Host string `yaml:"host"` + // TCP 连接方式:端口号 + Port int `yaml:"port"` + // 密码(无密码则留空) + Password string `yaml:"password"` + // 数据库编号(0-15),默认 15 + DB int `yaml:"db"` + // 池大小(最大连接数) + PoolSize int `yaml:"pool_size"` + // 最小空闲连接数 + MinIdleConns int `yaml:"min_idle_conns"` + // 读超时时间(毫秒) + ReadTimeout int `yaml:"read_timeout"` // 毫秒 + // 写超时时间(毫秒) + WriteTimeout int `yaml:"write_timeout"` // 毫秒 +} + // PrometheusConfig Prometheus监控端口配置 type PrometheusConfig struct { CrawlerPort int `yaml:"crawler_port"` @@ -217,6 +275,31 @@ func GetDefaultConfig() Config { Storage: StorageConfig{ Path: "./savedata", }, + MySQL: MySQLConfig{ + Enabled: false, + Network: "tcp", + UnixSocket: "", + Host: "localhost", + Port: 3306, + User: "root", + Password: "", + Database: "sese_engine", + ConnMaxLifetime: 3600, // 1小时 + MaxIdleConns: 10, + MaxOpenConns: 100, + }, + Redis: RedisConfig{ + Network: "tcp", + UnixSocket: "", + Host: "localhost", + Port: 6379, + Password: "", + DB: 15, // 默认使用15号数据库 + PoolSize: 100, + MinIdleConns: 10, + ReadTimeout: 500, // 毫秒 + WriteTimeout: 500, // 毫秒 + }, Prometheus: PrometheusConfig{ CrawlerPort: 14950, BacklinkPort: 14952, @@ -357,3 +440,113 @@ func MaxPriorityChildren() int { // 为了向后兼容,保留 StoragePath 常量 const StoragePath = "./savedata" + +// ---- MySQL 配置访问函数 ---- + +// MySQLEnabled 返回是否启用 MySQL(默认关闭) +func MySQLEnabled() bool { + return Global.MySQL.Enabled +} + +// MySQLDSN 返回 MySQL 连接字符串(DSN) +// 根据配置自动选择 Unix Socket 或 TCP 方式 +func MySQLDSN() string { + cfg := Global.MySQL + if cfg.UnixSocket != "" { + // 使用 Unix Socket 连接(推荐,本地连接性能更好) + return cfg.User + ":" + cfg.Password + "@unix(" + cfg.UnixSocket + ")/" + cfg.Database + "?parseTime=true&loc=Local" + } + // 使用 TCP 连接 + return cfg.User + ":" + cfg.Password + "@tcp(" + cfg.Host + ":" + itoa(cfg.Port) + ")/" + cfg.Database + "?parseTime=true&loc=Local" +} + +// MySQLConnMaxLifetime 返回连接最大生命周期(秒) +func MySQLConnMaxLifetime() int { + if Global.MySQL.ConnMaxLifetime <= 0 { + return 3600 + } + return Global.MySQL.ConnMaxLifetime +} + +// MySQLMaxIdleConns 返回最大空闲连接数 +func MySQLMaxIdleConns() int { + if Global.MySQL.MaxIdleConns <= 0 { + return 10 + } + return Global.MySQL.MaxIdleConns +} + +// MySQLMaxOpenConns 返回最大打开连接数 +func MySQLMaxOpenConns() int { + if Global.MySQL.MaxOpenConns <= 0 { + return 100 + } + return Global.MySQL.MaxOpenConns +} + +// ---- Redis 配置访问函数 ---- + +// RedisAddr 返回 Redis 连接地址 +// 根据配置自动选择 Unix Socket 或 TCP 方式 +func RedisAddr() string { + cfg := Global.Redis + if cfg.UnixSocket != "" { + return cfg.UnixSocket + } + return cfg.Host + ":" + itoa(cfg.Port) +} + +// RedisPoolSize 返回连接池大小 +func RedisPoolSize() int { + if Global.Redis.PoolSize <= 0 { + return 100 + } + return Global.Redis.PoolSize +} + +// RedisMinIdleConns 返回最小空闲连接数 +func RedisMinIdleConns() int { + if Global.Redis.MinIdleConns <= 0 { + return 10 + } + return Global.Redis.MinIdleConns +} + +// RedisDB 返回数据库编号 +func RedisDB() int { + return Global.Redis.DB +} + +// RedisPassword 返回密码(空字符串表示无密码) +func RedisPassword() string { + return Global.Redis.Password +} + +// RedisReadTimeout 返回读超时时间(毫秒) +func RedisReadTimeout() int { + if Global.Redis.ReadTimeout <= 0 { + return 500 + } + return Global.Redis.ReadTimeout +} + +// RedisWriteTimeout 返回写超时时间(毫秒) +func RedisWriteTimeout() int { + if Global.Redis.WriteTimeout <= 0 { + return 500 + } + return Global.Redis.WriteTimeout +} + +// itoa 将 int 转换为字符串(避免导入 strconv) +func itoa(n int) string { + if n == 0 { + return "0" + } + result := "" + for n > 0 { + result = string(rune('0'+n%10)) + result + n /= 10 + } + return result +} diff --git a/crawler/crawler.go b/crawler/crawler.go index b47e2fd..cbc1412 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -51,11 +51,11 @@ const ( // Crawler 编排整个 BFS 爬取流程。 type Crawler struct { - fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流) - db *storage.DB // 持久化数据库 - analyzer *analyzer.Analyzer // 分词和关键词分析 - prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值") - stats Stats // 原子计数器 + fetcher *Fetcher // HTTP 抓取器(含 robots.txt 和限流) + store *storage.RedisStoreV2 // 持久化存储 + analyzer *analyzer.Analyzer // 分词和关键词分析 + prosperMap map[string]float64 // 域名 → 反向链接繁荣值(来自 info 模块,越大越"有价值") + stats Stats // 原子计数器 // visited 记录已访问的 URL 集合(跨 epoch 持久,启动时从 DB 预热) visited map[string]bool @@ -164,10 +164,10 @@ func DecrementPriorityLevel2Inflight(n int64) { // New 创建一个 Crawler 实例。 // prosperMap 由 info 模块加载,传入域名繁荣值用于调度优先级计算。 -func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler { +func New(store *storage.RedisStoreV2, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler { c := &Crawler{ fetcher: NewFetcher(config.SpiderName(), time.Duration(config.CrawlerCooldown())*time.Second), - db: db, + store: store, analyzer: a, prosperMap: prosperMap, visited: make(map[string]bool), @@ -192,7 +192,7 @@ func (c *Crawler) warmVisited() { expired := 0 maxAge := int64(config.RecrawlMaxAge()) now := time.Now().Unix() - _ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { + _ = c.store.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { if now-entry.Timestamp < maxAge { c.visited[u] = true // 未过期,仍然跳过 count++ @@ -221,7 +221,7 @@ func (c *Crawler) startRecrawlTicker() { removed := 0 c.visitedMu.Lock() - _ = c.db.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { + _ = c.store.ForEachSnippet(func(u string, entry *storage.SnippetEntry) error { if removed >= batchSize { return fmt.Errorf("batch full") // 提前终止遍历 } @@ -365,7 +365,7 @@ func (c *Crawler) priorityCrawlLoop(rawURL string, level int) { }() // 标记 DB 中该 URL 为已访问,防止重启后再次被调度 - _ = c.db.MarkPriorityURLVisited(rawURL) + _ = c.store.MarkPriorityURLVisited(rawURL) // 两级都不限制子链接数量 children := c.visitURLUnlimited(rawURL) @@ -441,7 +441,7 @@ func (c *Crawler) isVisited(url string) bool { // 将未访问的插入队列前端(prepend),已爬取的条目从存储中清除。 // 返回本次插入队列的 URL 数量。 func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int { - entries, err := c.db.GetPriorityURLs() + entries, err := c.store.GetPriorityURLs() if err != nil || len(entries) == 0 { return 0 } @@ -449,14 +449,14 @@ func (c *Crawler) fetchAndApplyPriorityURLs(queue *[]string) int { added := 0 for _, e := range entries { if c.isVisited(e.URL) { - _ = c.db.RemovePriorityURL(e.URL) + _ = c.store.RemovePriorityURL(e.URL) continue } *queue = append([]string{e.URL}, *queue...) added++ } - _ = c.db.ClearVisitedPriorityURLs() + _ = c.store.ClearVisitedPriorityURLs() return added } @@ -713,7 +713,7 @@ func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text // 增量重爬检测:查询上次爬取的哈希,内容未变则跳过关键词提取 isRecrawl := false - oldEntry, _ := c.db.GetSnippet(res.FinalURL) + oldEntry, _ := c.store.GetSnippet(res.FinalURL) if !forceIndex && oldEntry != nil && oldEntry.ContentHash != "" && oldEntry.ContentHash == contentHash { isRecrawl = true //log.Printf("[crawler] unchanged (recrawl skip): %s", res.FinalURL) @@ -721,7 +721,7 @@ func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text // 缓存 URL 摘要(仅对短 URL 缓存,防止超长 URL 浪费空间) if len(res.FinalURL) < 250 { - _ = c.db.SetSnippet(res.FinalURL, &storage.SnippetEntry{ + _ = c.store.SetSnippet(res.FinalURL, &storage.SnippetEntry{ Title: title, Description: truncate(desc, 256), Text: truncate(text, 256), @@ -756,7 +756,7 @@ func (c *Crawler) visitURLRaw(rawURL string, forceIndex bool) (title, desc, text if fromHost == "" { continue } - _ = c.db.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) { + _ = c.store.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) { if info.Redirects == nil { info.Redirects = make(map[string]string) } @@ -799,7 +799,7 @@ func (c *Crawler) updateSiteFailure(rawURL string) { if host == "" { return } - _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { + _ = c.store.UpdateSiteInfo(host, func(info *storage.SiteInfo) { if info.SuccessRate == nil { zero := 0.0 info.SuccessRate = &zero @@ -831,7 +831,7 @@ func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, } sampled := sampleStrings(external, 10) - _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { + _ = c.store.UpdateSiteInfo(host, func(info *storage.SiteInfo) { // 访问计数 +1,更新最后访问时间 info.VisitCount++ info.LastVisitTime = now @@ -978,7 +978,7 @@ func (c *Crawler) schedule(links []URLWeight) []string { wg.Add(1) go func(host string) { defer wg.Done() - info, _ := c.db.GetSiteInfo(host) + info, _ := c.store.GetSiteInfo(host) mu.Lock() siteCache[host] = info mu.Unlock() diff --git a/crawler/fetcher.go b/crawler/fetcher.go index ee6e57e..5e9553e 100644 --- a/crawler/fetcher.go +++ b/crawler/fetcher.go @@ -13,7 +13,8 @@ import ( "sync" // 互斥锁(保护限流表和 robots.txt 缓存) "time" // 时间(限流间隔计算、robots.txt 缓存过期) - "golang.org/x/net/html/charset" // HTML 字符集自动检测(将各种编码转为 UTF-8) + "golang.org/x/net/html/charset" // HTML 字符集自动检测(将各种编码转为 UTF-8) + "golang.org/x/text/encoding/simplifiedchinese" // GBK → UTF-8 转换兜底 ) // ErrCrawl 表示爬取过程中的预期错误(404、被 robots.txt 禁止、非 HTML 类型等)。 @@ -341,11 +342,17 @@ func decodeBody(r io.Reader, contentType string, sizeLimit int) (string, error) // 使用 golang.org/x/net/html/charset 自动检测 HTML 编码并转为 UTF-8 utf8Reader, err := charset.NewReader(reader, contentType) if err != nil { - // 备选方案:直接以 UTF-8 读取(可能乱码但不崩溃) + // charset 检测失败时,先读取原始字节,再尝试 GBK 兜底 data, readErr := io.ReadAll(reader) if readErr != nil { return "", readErr } + // 将 GBK 字节流转为 UTF-8 字符串 + utf8Bytes, convErr := simplifiedchinese.GBK.NewDecoder().Bytes(data) + if convErr == nil { + return string(utf8Bytes), nil + } + // 转换失败则返回原始字节(可能乱码但不崩溃) return string(data), nil } data, err := io.ReadAll(utf8Reader) diff --git a/go.mod b/go.mod index 98d531d..fac2bab 100644 --- a/go.mod +++ b/go.mod @@ -3,18 +3,20 @@ module sese-engine go 1.21 require ( - github.com/andybalholm/brotli v1.1.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/pemistahl/lingua-go v1.4.0 + github.com/redis/go-redis/v9 v9.5.1 github.com/yanyiwu/gojieba v1.4.4 - go.etcd.io/bbolt v1.3.9 golang.org/x/net v0.23.0 + golang.org/x/text v0.14.0 gopkg.in/yaml.v3 v3.0.1 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/shopspring/decimal v1.3.1 // indirect golang.org/x/exp v0.0.0-20221106115401-f9659909a136 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.31.0 // indirect ) diff --git a/go.sum b/go.sum index 08983f9..f452a1f 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,17 @@ -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= @@ -10,22 +20,18 @@ github.com/pemistahl/lingua-go v1.4.0 h1:ifYhthrlW7iO4icdubwlduYnmwU37V1sbNrwhKB github.com/pemistahl/lingua-go v1.4.0/go.mod h1:ECuM1Hp/3hvyh7k8aWSqNCPlTxLemFZsRjocUf3KgME= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yanyiwu/gojieba v1.4.4 h1:Iukkf8WlIfqAKtsGZjUhGR1ArKa7DtLDNmW8bvUI8JI= github.com/yanyiwu/gojieba v1.4.4/go.mod h1:JUq4DddFVGdHXJHxxepxRmhrKlDpaBxR8O28v6fKYLY= -go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= -go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= golang.org/x/exp v0.0.0-20221106115401-f9659909a136 h1:Fq7F/w7MAa1KJ5bt2aJ62ihqp9HDcRuyILskkpIAurw= golang.org/x/exp v0.0.0-20221106115401-f9659909a136/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index c7b0d14..f13b5ae 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ package main import ( + "context" "flag" // 命令行参数解析 "fmt" // 格式化(搜索服务端口) "log" // 日志输出 @@ -17,12 +18,15 @@ import ( "os/signal" // 信号捕获 "path/filepath" // 路径处理 "syscall" // 系统调用(SIGTERM) + "time" // 时间(刷盘间隔) "sese-engine/analyzer" // 文本分析和关键词提取 "sese-engine/backlink" // 反向链接(繁荣值)计算 "sese-engine/config" // 全局配置 "sese-engine/crawler" // BFS 爬虫 "sese-engine/info" // info 服务(繁荣表、调整表、屏蔽词) + "sese-engine/mysql" // MySQL 数据库连接 + "sese-engine/redis" // Redis 连接 "sese-engine/search" // 搜索服务器(内嵌收获服务) "sese-engine/storage" // 持久化存储 ) @@ -138,13 +142,44 @@ func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) log.Printf("sese-engine starting storage=%s entry=%s", *storageDir, *entryURL) - // ---- 1. 存储层:打开 bbolt 数据库 ---- - db, err := storage.Open(*storageDir) - if err != nil { - log.Fatalf("failed to open storage: %v", err) + // ---- 1. Redis 连接(高性能内存存储)---- + if err := redis.Open(); err != nil { + log.Fatalf("failed to open redis: %v", err) } - defer db.Close() - db.StartWriteFlusher() // 启动异步写缓冲后台刷盘 + defer redis.Close() + + // ---- 1.1 MySQL 连接(持久化存储,默认关闭)---- + var flusher *mysql.Flusher + if config.MySQLEnabled() { + if err := mysql.Open(); err != nil { + log.Fatalf("failed to open mysql: %v", err) + } + defer mysql.Close() + + // 从 MySQL 恢复数据到 Redis(如 Redis 数据丢失) + // 仅当 Redis 为空时才执行恢复,避免覆盖已有数据 + ctx := context.Background() + size, _ := redis.Client.DBSize(ctx).Result() + if size > 0 { + log.Printf("[mysql-restore] Redis has %d keys, skipping restore", size) + } else if err := mysql.RestoreFromMySQLToRedis(redis.Client); err != nil { + log.Printf("[mysql-restore] warning: %v", err) + } + + // MySQL 刷盘器:定期将 Redis 数据刷到 MySQL + flusher = mysql.NewFlusher(redis.Client, 5*time.Minute, 1000) + flusher.Start() + defer flusher.Stop() + } else { + log.Println("[mysql] disabled in config, skipping MySQL init") + } + + // ---- 1.3 存储层:初始化 Redis 存储 ---- + store := storage.NewRedisStoreV2() + if err := store.Init(); err != nil { + log.Fatalf("failed to init redis store: %v", err) + } + defer store.Close() // ---- 2. Info 服务:加载繁荣表、调整表和屏蔽词 ---- infoSvc := info.New(*storageDir) @@ -158,7 +193,7 @@ func main() { defer anal.Close() // ---- 4. 搜索服务器(默认 :80):对外提供搜索 API,同时内嵌收获服务(统一端口) - searchSrv := search.New(db, infoSvc, anal) + searchSrv := search.New(store, infoSvc, anal) go func() { addr := fmt.Sprintf(":%d", config.SearchServerPort()) if err := searchSrv.ListenAndServe(addr, config.UnixSocket()); err != nil { @@ -167,14 +202,14 @@ func main() { }() // ---- 6. 反向链接计算器:每 48 小时运行一次 ---- - bl := backlink.New(db, *storageDir) + bl := backlink.New(store, *storageDir) searchSrv.SetBacklinkRunner(bl) go bl.Run() // ---- 7. 爬虫:从入口 URL 开始 BFS 爬取 ---- // 从 info 服务获取繁荣表快照,用于调度优先级决策 prosperMap := infoSvc.ProsperMap() - crawl := crawler.New(db, anal, prosperMap) + crawl := crawler.New(store, anal, prosperMap) searchSrv.SetCrawler(crawl) // 注入爬虫用于 Priority URL 立即触发 go crawl.Run(*entryURL, config.MaxEpoch()) @@ -194,8 +229,12 @@ func main() { crawl.WaitUntilStopped() log.Println("crawler stopped") - // 最后刷盘,确保数据不丢失 - log.Println("flushing index...") + // 最后刷盘(Redis → Disk,条件刷 MySQL),确保数据不丢失 + log.Println("flushing index to disk...") searchSrv.Flush() + if flusher != nil { + log.Println("flushing data to mysql...") + flusher.RunAll() // 同步执行一次 MySQL 刷盘 + } log.Println("shutdown complete") } diff --git a/mysql/flusher.go b/mysql/flusher.go new file mode 100644 index 0000000..f21bf31 --- /dev/null +++ b/mysql/flusher.go @@ -0,0 +1,445 @@ +package mysql + +import ( + "context" + "fmt" + "log" + "strings" + "sync" + "time" + + goredis "github.com/redis/go-redis/v9" +) + +// Flusher 管理 Redis → MySQL 刷盘任务 +type Flusher struct { + redisDB *goredis.Client // Redis 客户端引用 + interval time.Duration // 刷盘间隔 + batchSize int // 每批次处理数量 + mu sync.Mutex // 防止并发刷盘 + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewFlusher 创建刷盘器 +func NewFlusher(redisDB *goredis.Client, interval time.Duration, batchSize int) *Flusher { + if interval <= 0 { + interval = 5 * time.Minute + } + if batchSize <= 0 { + batchSize = 1000 + } + return &Flusher{ + redisDB: redisDB, + interval: interval, + batchSize: batchSize, + stopCh: make(chan struct{}), + } +} + +// Start 启动后台刷盘任务 +func (f *Flusher) Start() { + f.wg.Add(1) + go func() { + defer f.wg.Done() + ticker := time.NewTicker(f.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + f.RunAll() + case <-f.stopCh: + log.Printf("[mysql-flusher] stopped") + return + } + } + }() + log.Printf("[mysql-flusher] started (interval=%v, batchSize=%d)", f.interval, f.batchSize) +} + +// Stop 停止刷盘任务 +func (f *Flusher) Stop() { + close(f.stopCh) + f.wg.Wait() +} + +// RunAll 执行所有类型的刷盘 +func (f *Flusher) RunAll() { + f.mu.Lock() + defer f.mu.Unlock() + + start := time.Now() + log.Printf("[mysql-flusher] === starting flush ===") + + // 刷盘顺序:snippet → site → index(按数据量从小到大) + f.flushSnippets() + f.flushSites() + f.flushIndex() + f.flushPriorityURLs() + + log.Printf("[mysql-flusher] === flush done (took %v) ===", time.Since(start)) +} + +// flushSnippets 将 Redis gate:* 数据刷到 url_snippets 表 +func (f *Flusher) flushSnippets() { + ctx := context.Background() + var cursor uint64 + total := 0 + + for { + keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "gate:*", int64(f.batchSize)).Result() + if err != nil { + log.Printf("[mysql-flusher][snippets] scan error: %v", err) + return + } + + if len(keys) > 0 { + f.batchUpsertSnippets(ctx, keys) + total += len(keys) + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + if total > 0 { + log.Printf("[mysql-flusher][snippets] flushed %d entries", total) + } +} + +// batchUpsertSnippets 批量 upsert url_snippets +func (f *Flusher) batchUpsertSnippets(ctx context.Context, keys []string) { + if len(keys) == 0 || DB == nil { + return + } + + query := `INSERT INTO url_snippets (url, url_hash, title, description, text, timestamp, content_hash) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + title = VALUES(title), + description = VALUES(description), + text = VALUES(text), + timestamp = VALUES(timestamp), + content_hash = VALUES(content_hash)` + + tx, err := DB.BeginTx(ctx, nil) + if err != nil { + log.Printf("[mysql-flusher][snippets] begin tx error: %v", err) + return + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, query) + if err != nil { + log.Printf("[mysql-flusher][snippets] prepare error: %v", err) + return + } + defer stmt.Close() + + for _, key := range keys { + data, err := f.redisDB.HGetAll(ctx, key).Result() + if err != nil || len(data) == 0 { + continue + } + + url := data["url"] + urlHash := data["url_hash"] + if urlHash == "" { + // 从 key 中提取 hash(key 格式:gate:) + urlHash = strings.TrimPrefix(key, "gate:") + } + + title := data["title"] + description := data["desc"] + text := data["text"] + ts := parseInt64(data["ts"]) + contentHash := data["hash"] + + _, err = stmt.ExecContext(ctx, url, urlHash, title, description, text, ts, contentHash) + if err != nil { + log.Printf("[mysql-flusher][snippets] exec error for %s: %v", url, err) + } + } + + if err := tx.Commit(); err != nil { + log.Printf("[mysql-flusher][snippets] commit error: %v", err) + } +} + +// flushSites 将 Redis site:* 数据刷到 site_info 表 +func (f *Flusher) flushSites() { + ctx := context.Background() + var cursor uint64 + total := 0 + + for { + keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "site:*", int64(f.batchSize)).Result() + if err != nil { + log.Printf("[mysql-flusher][sites] scan error: %v", err) + return + } + + if len(keys) > 0 { + f.batchUpsertSites(ctx, keys) + total += len(keys) + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + if total > 0 { + log.Printf("[mysql-flusher][sites] flushed %d entries", total) + } +} + +// batchUpsertSites 批量 upsert site_info +func (f *Flusher) batchUpsertSites(ctx context.Context, keys []string) { + if len(keys) == 0 || DB == nil { + return + } + + query := `INSERT INTO site_info (host, visit_count, last_visit_time, fingerprint, success_rate, + html_structure, ips, quality, https_available, keywords, out_links, + languages, redirects, server_types) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + visit_count = VALUES(visit_count), + last_visit_time = VALUES(last_visit_time), + fingerprint = VALUES(fingerprint), + success_rate = VALUES(success_rate), + html_structure = VALUES(html_structure), + ips = VALUES(ips), + quality = VALUES(quality), + https_available = VALUES(https_available), + keywords = VALUES(keywords), + out_links = VALUES(out_links), + languages = VALUES(languages), + redirects = VALUES(redirects), + server_types = VALUES(server_types)` + + tx, err := DB.BeginTx(ctx, nil) + if err != nil { + log.Printf("[mysql-flusher][sites] begin tx error: %v", err) + return + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, query) + if err != nil { + log.Printf("[mysql-flusher][sites] prepare error: %v", err) + return + } + defer stmt.Close() + + for _, key := range keys { + data, err := f.redisDB.HGetAll(ctx, key).Result() + if err != nil || len(data) == 0 { + continue + } + + host := strings.TrimPrefix(key, "site:") + + visitCount := int(parseInt64(data["visit_count"])) + lastVisitTime := parseInt64(data["last_visit_time"]) + fingerprint := data["fingerprint"] + htmlStructure := data["html_structure"] + + var successRate *float64 + if v := data["success_rate"]; v != "" { + f := parseFloat(v) + successRate = &f + } + + var quality *float64 + if v := data["quality"]; v != "" { + q := parseFloat(v) + quality = &q + } + + var httpsAvailable *int8 + if v := data["https_available"]; v != "" { + i := int8(parseInt64(v)) + httpsAvailable = &i + } + + // JSON 字段:空字符串转为 NULL 或 "[]" + // MySQL JSON 类型不接受空字符串 + ips := data["ips"] + if ips == "" { + ips = "[]" + } + keywords := data["keywords"] + if keywords == "" { + keywords = "[]" + } + outLinks := data["out_links"] + if outLinks == "" { + outLinks = "[]" + } + languages := data["languages"] + if languages == "" { + languages = "[]" + } + redirects := data["redirects"] + if redirects == "" { + redirects = "[]" + } + serverTypes := data["server_types"] + if serverTypes == "" { + serverTypes = "[]" + } + + _, err = stmt.ExecContext(ctx, host, visitCount, lastVisitTime, fingerprint, successRate, + htmlStructure, ips, quality, httpsAvailable, keywords, outLinks, + languages, redirects, serverTypes) + if err != nil { + log.Printf("[mysql-flusher][sites] exec error for %s: %v", host, err) + } + } + + if err := tx.Commit(); err != nil { + log.Printf("[mysql-flusher][sites] commit error: %v", err) + } +} + +// flushIndex 将 Redis idx:* 数据刷到 index_entries 表 +func (f *Flusher) flushIndex() { + ctx := context.Background() + var cursor uint64 + total := 0 + + for { + keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "idx:*", int64(f.batchSize)).Result() + if err != nil { + log.Printf("[mysql-flusher][index] scan error: %v", err) + return + } + + if len(keys) > 0 { + f.batchUpsertIndex(ctx, keys) + total += len(keys) + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + if total > 0 { + log.Printf("[mysql-flusher][index] flushed %d keywords", total) + } +} + +// batchUpsertIndex 批量 upsert index_entries +func (f *Flusher) batchUpsertIndex(ctx context.Context, keys []string) { + if len(keys) == 0 || DB == nil { + return + } + + tx, err := DB.BeginTx(ctx, nil) + if err != nil { + log.Printf("[mysql-flusher][index] begin tx error: %v", err) + return + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, `INSERT INTO index_entries (keyword, url, weight) + VALUES (?, ?, ?) + ON DUPLICATE KEY UPDATE weight = VALUES(weight)`) + if err != nil { + log.Printf("[mysql-flusher][index] prepare error: %v", err) + return + } + defer stmt.Close() + + for _, key := range keys { + keyword := strings.TrimPrefix(key, "idx:") + + // 获取有序集合中的所有成员 + entries, err := f.redisDB.ZRevRangeWithScores(ctx, key, 0, -1).Result() + if err != nil { + log.Printf("[mysql-flusher][index] zrange error for %s: %v", keyword, err) + continue + } + + for _, entry := range entries { + url := entry.Member.(string) + weight := float32(entry.Score) + _, err = stmt.ExecContext(ctx, keyword, url, weight) + if err != nil { + log.Printf("[mysql-flusher][index] exec error for %s/%s: %v", keyword, url, err) + } + } + } + + if err := tx.Commit(); err != nil { + log.Printf("[mysql-flusher][index] commit error: %v", err) + } +} + +// flushPriorityURLs 将 Redis priority:* 数据刷到 priority_urls 表 +func (f *Flusher) flushPriorityURLs() { + ctx := context.Background() + + keys, err := f.redisDB.Keys(ctx, "priority:*").Result() + if err != nil { + log.Printf("[mysql-flusher][priority] keys error: %v", err) + return + } + + if len(keys) == 0 || DB == nil { + return + } + + tx, err := DB.BeginTx(ctx, nil) + if err != nil { + log.Printf("[mysql-flusher][priority] begin tx error: %v", err) + return + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, `INSERT IGNORE INTO priority_urls (url) VALUES (?)`) + if err != nil { + log.Printf("[mysql-flusher][priority] prepare error: %v", err) + return + } + defer stmt.Close() + + for _, key := range keys { + url := strings.TrimPrefix(key, "priority:") + _, err = stmt.ExecContext(ctx, url) + if err != nil { + log.Printf("[mysql-flusher][priority] exec error for %s: %v", url, err) + } + } + + if err := tx.Commit(); err != nil { + log.Printf("[mysql-flusher][priority] commit error: %v", err) + return + } + + log.Printf("[mysql-flusher][priority] flushed %d entries", len(keys)) +} + +// ============================================ +// 辅助函数 +// ============================================ + +func parseInt64(s string) int64 { + var v int64 + fmt.Sscanf(s, "%d", &v) + return v +} + +func parseFloat(s string) float64 { + var v float64 + fmt.Sscanf(s, "%f", &v) + return v +} diff --git a/mysql/init_db.sql b/mysql/init_db.sql new file mode 100644 index 0000000..35eeaab --- /dev/null +++ b/mysql/init_db.sql @@ -0,0 +1,98 @@ +-- ============================================ +-- sese-engine MySQL 初始化脚本 +-- 自动创建数据库(如果不存在)并创建表结构 +-- ============================================ + +-- 创建数据库(如果不存在) +CREATE DATABASE IF NOT EXISTS sese_engine DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +CREATE DATABASE IF NOT EXISTS sese_test2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +-- ============================================ +-- 倒排索引表 +-- 存储关键词到URL的映射及其权重 +-- ============================================ +CREATE TABLE IF NOT EXISTS index_entries ( + keyword VARCHAR(255) NOT NULL COMMENT '关键词', + url VARCHAR(2048) NOT NULL COMMENT 'URL地址', + weight FLOAT NOT NULL COMMENT '权重分数', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (keyword(255), url(255)), + INDEX idx_keyword (keyword), + INDEX idx_weight (weight), + INDEX idx_url (url(255)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='倒排索引表'; + +-- ============================================ +-- URL 摘要表 +-- 存储每个URL的标题、描述、正文片段等 +-- ============================================ +CREATE TABLE IF NOT EXISTS url_snippets ( + id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + url VARCHAR(2048) NOT NULL COMMENT 'URL地址(唯一)', + url_hash VARCHAR(64) NOT NULL COMMENT 'URL的MD5哈希(用于快速查询)', + title VARCHAR(512) COMMENT '页面标题', + description TEXT COMMENT 'meta description或自动生成的描述', + text MEDIUMTEXT COMMENT '正文文本片段', + timestamp BIGINT COMMENT '抓取时间戳', + content_hash VARCHAR(64) COMMENT '内容哈希(用于增量检测)', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY uk_url (url(255)), + INDEX idx_url_hash (url_hash), + INDEX idx_timestamp (timestamp) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='URL摘要缓存表'; + +-- ============================================ +-- 网站信息表 +-- 存储每个域名的元信息 +-- ============================================ +CREATE TABLE IF NOT EXISTS site_info ( + host VARCHAR(255) PRIMARY KEY COMMENT '域名/主机名', + visit_count INT UNSIGNED DEFAULT 0 COMMENT '访问次数', + last_visit_time BIGINT COMMENT '最后访问时间戳', + fingerprint TEXT COMMENT '网站指纹', + success_rate FLOAT COMMENT '成功率', + html_structure TEXT COMMENT 'HTML结构特征', + ips JSON COMMENT 'IP地址列表', + quality FLOAT COMMENT '质量评分', + https_available TINYINT COMMENT '是否支持HTTPS(1=是,0=否)', + keywords JSON COMMENT '高频关键词列表', + out_links JSON COMMENT '出站链接列表', + languages JSON COMMENT '语种分布', + redirects JSON COMMENT '重定向链', + server_types JSON COMMENT 'Server类型', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='网站元信息表'; + +-- ============================================ +-- 刷盘记录表 +-- 用于断点续传,记录刷盘进度 +-- ============================================ +CREATE TABLE IF NOT EXISTS flush_marker ( + id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + flush_type VARCHAR(50) NOT NULL COMMENT '刷盘类型:index, gate, site', + last_key VARCHAR(255) COMMENT '最后处理的key', + last_cursor BIGINT DEFAULT 0 COMMENT 'Redis SCAN游标', + processed_count INT UNSIGNED DEFAULT 0 COMMENT '本批次处理数量', + flush_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '刷盘时间', + INDEX idx_type (flush_type), + INDEX idx_time (flush_time) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='刷盘进度记录表'; + +-- ============================================ +-- 优先爬取URL表 +-- 存储需要优先爬取的URL +-- ============================================ +CREATE TABLE IF NOT EXISTS priority_urls ( + url VARCHAR(2048) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (url(255)), + INDEX idx_created (created_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='优先爬取URL表'; diff --git a/mysql/mysql.go b/mysql/mysql.go new file mode 100644 index 0000000..a40733a --- /dev/null +++ b/mysql/mysql.go @@ -0,0 +1,416 @@ +// Package mysql 提供 MySQL 数据库连接和管理功能。 +// 支持 Unix Socket 和 TCP 两种连接方式,自动初始化数据表和恢复数据。 +package mysql + +import ( + "context" + "database/sql" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "time" + + goredis "github.com/redis/go-redis/v9" + _ "github.com/go-sql-driver/mysql" + "sese-engine/config" +) + +// DB 是 MySQL 数据库连接池 +var DB *sql.DB + +// Open 初始化 MySQL 连接 +// 根据配置自动选择 Unix Socket 或 TCP 连接 +func Open() error { + dsn := config.MySQLDSN() + + db, err := sql.Open("mysql", dsn) + if err != nil { + return fmt.Errorf("mysql.Open: %w", err) + } + + // 配置连接池 + db.SetConnMaxLifetime(time.Duration(config.MySQLConnMaxLifetime()) * time.Second) + db.SetMaxIdleConns(config.MySQLMaxIdleConns()) + db.SetMaxOpenConns(config.MySQLMaxOpenConns()) + + // 验证连接 + if err := db.Ping(); err != nil { + return fmt.Errorf("mysql.Ping: %w", err) + } + + DB = db + log.Printf("[mysql] connected via %s", formatDSN(dsn)) + + // 自动初始化数据表 + if err := initSchema(); err != nil { + return fmt.Errorf("mysql init schema: %w", err) + } + + return nil +} + +// initSchema 自动执行 init_db.sql 初始化数据表 +func initSchema() error { + // 查找 init_db.sql 文件 + execPath, err := os.Executable() + if err != nil { + execPath = os.Args[0] + } + sqlFile := filepath.Join(filepath.Dir(execPath), "mysql", "init_db.sql") + if _, err := os.Stat(sqlFile); os.IsNotExist(err) { + // 尝试从当前工作目录查找 + cwd, _ := os.Getwd() + sqlFile = filepath.Join(cwd, "mysql", "init_db.sql") + } + + data, err := os.ReadFile(sqlFile) + if err != nil { + return fmt.Errorf("read init_db.sql: %w", err) + } + + // 获取配置的数据库名 + dbName := config.Global.MySQL.Database + if dbName == "" { + dbName = "sese_engine" + } + log.Printf("[mysql] init schema: database=%s", dbName) + + // 先切换到目标数据库 + if _, err := DB.Exec("USE " + dbName); err != nil { + return fmt.Errorf("mysql USE database: %w", err) + } + + // 分割 SQL 语句(按分号分割) + statements := splitStatements(string(data)) + log.Printf("[mysql] found %d SQL statements to execute", len(statements)) + + execed := 0 + for i, stmt := range statements { + trimmed := strings.TrimSpace(stmt) + // 跳过空行和注释 + if trimmed == "" || strings.HasPrefix(trimmed, "--") || strings.HasPrefix(trimmed, "/*") { + log.Printf("[mysql] [%d/%d] SKIP (empty/comment): %s", i+1, len(statements), truncate(trimmed, 60)) + continue + } + if _, err := DB.Exec(trimmed); err != nil { + log.Printf("[mysql] [%d/%d] FAILED: %v\n SQL: %s", i+1, len(statements), err, truncate(trimmed, 200)) + continue + } + execed++ + log.Printf("[mysql] [%d/%d] OK: %s", i+1, len(statements), truncate(trimmed, 60)) + } + + log.Printf("[mysql] init schema done, executed=%d statements", execed) + return nil +} + +// splitStatements 按分号分割 SQL 语句(处理多行 CREATE TABLE) +func splitStatements(sql string) []string { + var statements []string + var buf strings.Builder + inComment := false + + for _, line := range strings.Split(sql, "\n") { + trimmed := strings.TrimSpace(line) + + // 单行注释 + if strings.HasPrefix(trimmed, "--") || strings.HasPrefix(trimmed, "//") { + continue + } + + // 多行注释开始/结束 + if strings.Contains(trimmed, "/*") { + inComment = true + } + if inComment { + if strings.Contains(trimmed, "*/") { + inComment = false + } + continue + } + + // 空行跳过 + if trimmed == "" { + continue + } + + buf.WriteString(line) + buf.WriteString("\n") + + // 检查是否以分号结尾 + trimmed = strings.TrimSpace(buf.String()) + if strings.HasSuffix(trimmed, ";") { + statements = append(statements, trimmed) + buf.Reset() + } + } + + // 处理最后一条(可能没有分号) + if buf.Len() > 0 { + trimmed := strings.TrimSpace(buf.String()) + if trimmed != "" { + statements = append(statements, trimmed) + } + } + + return statements +} + +// truncate 截断字符串 +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +// Close 关闭 MySQL 连接 +func Close() error { + if DB != nil { + return DB.Close() + } + return nil +} + +// Ping 检查 MySQL 连接是否正常 +func Ping() error { + if DB == nil { + return fmt.Errorf("mysql not initialized") + } + return DB.Ping() +} + +// formatDSN 格式化 DSN 用于日志(隐藏密码) +func formatDSN(dsn string) string { + // 简化日志输出 + cfg := config.Global.MySQL + if cfg.UnixSocket != "" { + return fmt.Sprintf("unix_socket=%s database=%s", cfg.UnixSocket, cfg.Database) + } + return fmt.Sprintf("tcp=%s:%d database=%s", cfg.Host, cfg.Port, cfg.Database) +} + +// RestoreFromMySQLToRedis 从 MySQL 恢复数据到 Redis +// 用于 Redis 数据丢失后重建索引 +func RestoreFromMySQLToRedis(redisDB *goredis.Client) error { + if DB == nil { + return fmt.Errorf("mysql not initialized") + } + + start := time.Now() + log.Printf("[mysql-restore] starting restoration from MySQL to Redis...") + + ctx := context.Background() + + // 1. 恢复 index_entries → Redis idx:* ZSet + if err := restoreIndexEntries(ctx, redisDB); err != nil { + return fmt.Errorf("restore index_entries: %w", err) + } + + // 2. 恢复 url_snippets → Redis gate:* + url2hash:* + if err := restoreUrlSnippets(ctx, redisDB); err != nil { + return fmt.Errorf("restore url_snippets: %w", err) + } + + // 3. 恢复 site_info → Redis site:* + if err := restoreSiteInfo(ctx, redisDB); err != nil { + return fmt.Errorf("restore site_info: %w", err) + } + + // 4. 恢复 priority_urls → Redis priority:* + if err := restorePriorityURLs(ctx, redisDB); err != nil { + return fmt.Errorf("restore priority_urls: %w", err) + } + + log.Printf("[mysql-restore] restoration completed in %v", time.Since(start)) + return nil +} + +// restoreIndexEntries 恢复倒排索引 +func restoreIndexEntries(ctx context.Context, redisDB *goredis.Client) error { + rows, err := DB.Query("SELECT keyword, url, weight FROM index_entries") + if err != nil { + // 表不存在时跳过 + log.Printf("[mysql-restore][index] skip: %v", err) + return nil + } + defer rows.Close() + + // 按 keyword 分组 + type indexRow struct { + URL string + Weight float32 + } + keywordMap := make(map[string][]indexRow) + count := 0 + + for rows.Next() { + var keyword, url string + var weight float32 + if err := rows.Scan(&keyword, &url, &weight); err != nil { + continue + } + keywordMap[keyword] = append(keywordMap[keyword], indexRow{URL: url, Weight: weight}) + count++ + } + + // 批量写入 Redis + for keyword, entries := range keywordMap { + if len(entries) == 0 { + continue + } + zSlice := make([]goredis.Z, len(entries)) + for i, e := range entries { + zSlice[i] = goredis.Z{Score: float64(e.Weight), Member: e.URL} + } + if err := redisDB.ZAdd(ctx, "idx:"+keyword, zSlice...).Err(); err != nil { + log.Printf("[mysql-restore][index] failed to restore %s: %v", keyword, err) + } + } + + log.Printf("[mysql-restore][index] restored %d entries (%d keywords)", count, len(keywordMap)) + return nil +} + +// restoreUrlSnippets 恢复 URL 摘要 +func restoreUrlSnippets(ctx context.Context, redisDB *goredis.Client) error { + rows, err := DB.Query("SELECT url, url_hash, title, description, text, timestamp, content_hash FROM url_snippets") + if err != nil { + // 表不存在时跳过 + log.Printf("[mysql-restore][snippets] skip: %v", err) + return nil + } + defer rows.Close() + + count := 0 + for rows.Next() { + var url, urlHash, title, description, text, contentHash sql.NullString + var timestamp sql.NullInt64 + if err := rows.Scan(&url, &urlHash, &title, &description, &text, ×tamp, &contentHash); err != nil { + continue + } + if !url.Valid || urlHash.Valid == false { + continue + } + + fields := map[string]interface{}{ + "url": url.String, + "title": nullString(title), + "desc": nullString(description), + "text": nullString(text), + "ts": nullInt64(timestamp), + "hash": nullString(contentHash), + } + + if err := redisDB.HMSet(ctx, "gate:"+urlHash.String, fields).Err(); err != nil { + continue + } + // 同时写入 URL→hash 映射 + redisDB.Set(ctx, "url2hash:"+url.String, urlHash.String, 0) + count++ + } + + log.Printf("[mysql-restore][snippets] restored %d entries", count) + return nil +} + +// restoreSiteInfo 恢复网站信息 +func restoreSiteInfo(ctx context.Context, redisDB *goredis.Client) error { + rows, err := DB.Query("SELECT host, visit_count, last_visit_time, success_rate, https_available FROM site_info") + if err != nil { + // 表不存在时跳过 + log.Printf("[mysql-restore][site] skip: %v", err) + return nil + } + defer rows.Close() + + count := 0 + for rows.Next() { + var host string + var visitCount sql.NullInt64 + var lastVisitTime sql.NullInt64 + var successRate sql.NullFloat64 + var httpsAvailable sql.NullInt64 + + if err := rows.Scan(&host, &visitCount, &lastVisitTime, &successRate, &httpsAvailable); err != nil { + continue + } + if host == "" { + continue + } + + fields := map[string]interface{}{ + "visit_count": nullInt64(visitCount), + "last_visit_time": nullInt64(lastVisitTime), + } + if successRate.Valid { + fields["success_rate"] = successRate.Float64 + } + if httpsAvailable.Valid { + fields["https_available"] = httpsAvailable.Int64 + } + + if err := redisDB.HMSet(ctx, "site:"+host, fields).Err(); err != nil { + continue + } + count++ + } + + log.Printf("[mysql-restore][site] restored %d entries", count) + return nil +} + +// restorePriorityURLs 恢复优先 URL +func restorePriorityURLs(ctx context.Context, redisDB *goredis.Client) error { + rows, err := DB.Query("SELECT url FROM priority_urls") + if err != nil { + // 表不存在时跳过 + log.Printf("[mysql-restore][priority] skip: %v", err) + return nil + } + defer rows.Close() + + count := 0 + for rows.Next() { + var url string + if err := rows.Scan(&url); err != nil { + continue + } + if url == "" { + continue + } + + fields := map[string]interface{}{ + "url": url, + "is_domain": "0", + "added_at": time.Now().Unix(), + "visited": "0", + } + + if err := redisDB.HMSet(ctx, "priority:"+url, fields).Err(); err != nil { + continue + } + count++ + } + + log.Printf("[mysql-restore][priority] restored %d entries", count) + return nil +} + +// ---- 辅助函数 ---- + +func nullString(v sql.NullString) string { + if v.Valid { + return v.String + } + return "" +} + +func nullInt64(v sql.NullInt64) int64 { + if v.Valid { + return v.Int64 + } + return 0 +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..f6a450c --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,82 @@ +// Package redis 提供 Redis 数据库连接和管理功能。 +// 支持 Unix Socket 和 TCP 两种连接方式。 +// 注意:此包作为纯内存缓存使用,不开启持久化。 +package redis + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/redis/go-redis/v9" + "sese-engine/config" +) + +// Client 是 Redis 客户端连接 +var Client *redis.Client + +// Open 初始化 Redis 连接 +// 根据配置自动选择 Unix Socket 或 TCP 连接 +// 注意:不开持久化,作为纯内存缓存使用 +func Open() error { + cfg := config.Global.Redis + + opt := &redis.Options{ + Addr: config.RedisAddr(), + Password: config.RedisPassword(), + DB: config.RedisDB(), + PoolSize: config.RedisPoolSize(), + MinIdleConns: config.RedisMinIdleConns(), + ReadTimeout: time.Duration(config.RedisReadTimeout()) * time.Millisecond, + WriteTimeout: time.Duration(config.RedisWriteTimeout()) * time.Millisecond, + // 禁用持久化 - 作为纯内存缓存 + // 不设置任何 save 策略即可禁用 RDB + } + + client := redis.NewClient(opt) + Client = client + + // 验证连接 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + return fmt.Errorf("redis.Ping: %w", err) + } + + log.Printf("[redis] connected via %s (DB=%d)", formatAddr(opt), cfg.DB) + return nil +} + +// Close 关闭 Redis 连接 +func Close() error { + if Client != nil { + return Client.Close() + } + return nil +} + +// Ping 检查 Redis 连接是否正常 +func Ping() error { + if Client == nil { + return fmt.Errorf("redis not initialized") + } + return Client.Ping(context.Background()).Err() +} + +// FlushDB 清空当前数据库(谨慎使用,仅用于测试或重置) +func FlushDB() error { + if Client == nil { + return fmt.Errorf("redis not initialized") + } + return Client.FlushDB(context.Background()).Err() +} + +// formatAddr 格式化地址用于日志 +func formatAddr(opt *redis.Options) string { + if opt.Addr != "" { + return opt.Addr + } + return fmt.Sprintf("%s:%d", opt.Addr, 6379) +} diff --git a/search/server.go b/search/server.go index f4d43a4..47448aa 100644 --- a/search/server.go +++ b/search/server.go @@ -5,6 +5,7 @@ package search import ( "container/heap" // 堆结构(域名交错排序) "container/list" // 双向链表(LRU 缓存) + "context" // 超时控制(Redis 操作) "encoding/json" // JSON 序列化(响应输出) "fmt" // 错误格式化 "io" // 读取请求体 @@ -24,27 +25,34 @@ import ( "sync/atomic" // 原子操作(计数器) "time" // 时间戳 - "sese-engine/analyzer" // 分词和语言检测 - "sese-engine/config" // 排序权重配置 - "sese-engine/crawler" // 爬虫(读取活跃线程数) - "sese-engine/info" // info 服务 - "sese-engine/parser" // HTML 解析(在线摘要) - "sese-engine/storage" // 持久化存储 + goredis "github.com/redis/go-redis/v9" // Redis 客户端 + "sese-engine/analyzer" // 分词和语言检测 + "sese-engine/config" // 排序权重配置 + "sese-engine/crawler" // 爬虫(读取活跃线程数) + "sese-engine/info" // info 服务 + "sese-engine/parser" // HTML 解析(在线摘要) + sredis "sese-engine/redis" // 本地 Redis 连接(用于访问 sredis.Client) + "sese-engine/storage" // 持久化存储 ) -// urlKeywordsCache URL→关键词 的 LRU 缓存 +// urlKeywordsCache URL→关键词 的 LRU 缓存(内存)+ Redis 镜像(TTL 2小时) type urlKeywordsCache struct { mu sync.RWMutex items map[string]*list.Element // URL → list 节点 - order *list.List // 按访问时间排序的双向链表 + order *list.List // 按访问时间排序的双向链表 maxSize int + + // Redis 双写 + rdb *goredis.Client // Redis 客户端(懒初始化) + redisKey string // Redis Hash key + ttl time.Duration // TTL,默认 2 小时 } // urlKeywordsEntry LRU 缓存条目 type urlKeywordsEntry struct { URL string // URL(用于删除时从 map 中移除) - Title string // 页面标题(从 bbolt Snippet 缓存获取) - Snippet string // 摘要(从 bbolt Snippet 缓存获取) + Title string // 页面标题(从 Redis Snippet 缓存获取) + Snippet string // 摘要(从 Redis Snippet 缓存获取) Keywords []urlKeywordInfo // 关键词列表 } @@ -54,102 +62,250 @@ type urlKeywordInfo struct { Weight float32 `json:"weight"` // 权重 } -// newURLKeywordsCache 创建一个新的 LRU 缓存 +// newURLKeywordsCache 创建一个新的 LRU 缓存(Redis 镜像通过 AttachRedis 注入) func newURLKeywordsCache(maxSize int) *urlKeywordsCache { return &urlKeywordsCache{ - items: make(map[string]*list.Element), - order: list.New(), - maxSize: maxSize, + items: make(map[string]*list.Element), + order: list.New(), + maxSize: maxSize, + redisKey: "url_keywords:cache", + ttl: 2 * time.Hour, } } -// Put 写入或更新缓存 +// AttachRedis 将 Redis 客户端注入缓存(用于双写) +func (c *urlKeywordsCache) AttachRedis(rdb *goredis.Client) { + c.rdb = rdb +} + +// Put 写入或更新缓存(内存 LRU + Redis 双写,2小时 TTL) func (c *urlKeywordsCache) Put(url string, title, snippet string, keywords []urlKeywordInfo) { c.mu.Lock() defer c.mu.Unlock() + var entry *urlKeywordsEntry + // 已存在,移到队尾(更新新鲜度) if elem, ok := c.items[url]; ok { c.order.MoveToBack(elem) - entry := elem.Value.(*urlKeywordsEntry) - entry.Keywords = keywords - entry.Title = title - entry.Snippet = snippet - return - } + e := elem.Value.(*urlKeywordsEntry) + e.Keywords = keywords + e.Title = title + e.Snippet = snippet + entry = e + } else { + // 新增到队尾 + entry = &urlKeywordsEntry{URL: url, Title: title, Snippet: snippet, Keywords: keywords} + elem := c.order.PushBack(entry) + c.items[url] = elem - // 新增到队尾 - entry := &urlKeywordsEntry{URL: url, Title: title, Snippet: snippet, Keywords: keywords} - elem := c.order.PushBack(entry) - c.items[url] = elem - - // 超过上限,删除队首(最旧) - if c.order.Len() > c.maxSize { - oldest := c.order.Front() - if oldest != nil { - c.order.Remove(oldest) - delete(c.items, oldest.Value.(*urlKeywordsEntry).URL) + // 超过上限,删除队首(最旧) + if c.order.Len() > c.maxSize { + oldest := c.order.Front() + if oldest != nil { + c.order.Remove(oldest) + delete(c.items, oldest.Value.(*urlKeywordsEntry).URL) + } } } + + // Redis 双写(异步,不阻塞主流程) + go c.redisPut(url, entry) } -// Get 读取缓存,同时更新新鲜度 -func (c *urlKeywordsCache) Get(url string) (*urlKeywordsEntry, bool) { - c.mu.Lock() - defer c.mu.Unlock() +// redisPut 将条目写入 Redis Hash(TTL 2小时) +func (c *urlKeywordsCache) redisPut(url string, entry *urlKeywordsEntry) { + if c.rdb == nil { + return + } + data, err := json.Marshal(entry) + if err != nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + pipe := c.rdb.Pipeline() + pipe.HSet(ctx, c.redisKey, url, string(data)) + pipe.Expire(ctx, c.redisKey, c.ttl) + pipe.Exec(ctx) +} - if elem, ok := c.items[url]; ok { - c.order.MoveToBack(elem) // 访问过,移到队尾 +// Get 读取缓存(内存优先,Redis 回填) +func (c *urlKeywordsCache) Get(url string) (*urlKeywordsEntry, bool) { + // 先查内存 + c.mu.RLock() + elem, ok := c.items[url] + if ok { entry := elem.Value.(*urlKeywordsEntry) + c.mu.RUnlock() + // 异步更新 Redis TTL + go c.touchRedisTTL(url) return entry, true } - return nil, false + c.mu.RUnlock() + + // 内存 miss,查 Redis + entry, ok := c.redisGet(url) + if !ok { + return nil, false + } + + // 回填内存(写锁) + c.mu.Lock() + // 二次检查(可能已有其他协程写入) + if _, exists := c.items[url]; !exists { + // 超过上限时,先删最旧的再加入 + if c.order.Len() >= c.maxSize { + oldest := c.order.Front() + if oldest != nil { + c.order.Remove(oldest) + delete(c.items, oldest.Value.(*urlKeywordsEntry).URL) + } + } + elem := c.order.PushBack(entry) + c.items[url] = elem + } + c.mu.Unlock() + return entry, true } -// Stats 返回缓存统计信息 +// redisGet 从 Redis Hash 获取单条缓存 +func (c *urlKeywordsCache) redisGet(url string) (*urlKeywordsEntry, bool) { + if c.rdb == nil { + return nil, false + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + data, err := c.rdb.HGet(ctx, c.redisKey, url).Result() + if err != nil { + return nil, false + } + var entry urlKeywordsEntry + if err := json.Unmarshal([]byte(data), &entry); err != nil { + return nil, false + } + // 续命 Redis TTL + go c.touchRedisTTL(url) + return &entry, true +} + +// touchRedisTTL 续命 Redis 条目 TTL +func (c *urlKeywordsCache) touchRedisTTL(url string) { + if c.rdb == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c.rdb.Expire(ctx, c.redisKey, c.ttl) +} + +// Stats 返回缓存统计信息(优先 Redis 总数,包含 TTL 内的冷数据) func (c *urlKeywordsCache) Stats() (size int, maxSize int) { c.mu.RLock() - defer c.mu.RUnlock() - return len(c.items), c.maxSize -} + memSize := len(c.items) + maxSize = c.maxSize + c.mu.RUnlock() -// ListAll 返回所有缓存条目列表(按访问时间从旧到新) -func (c *urlKeywordsCache) ListAll() []*urlKeywordsEntry { - c.mu.RLock() - defer c.mu.RUnlock() - entries := make([]*urlKeywordsEntry, 0, len(c.items)) - for elem := c.order.Front(); elem != nil; elem = elem.Next() { - entries = append(entries, elem.Value.(*urlKeywordsEntry)) + // 尝试获取 Redis 总数(更准确,包含未淘汰的冷数据) + if c.rdb != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + n, err := c.rdb.HLen(ctx, c.redisKey).Result() + if err == nil && n > 0 { + return int(n), maxSize + } } - return entries + return memSize, maxSize } -// ListPage 返回分页缓存条目(按访问时间从新到旧,最新访问的在前) -func (c *urlKeywordsCache) ListPage(page, pageSize int) []*urlKeywordsEntry { +// ListAll 返回所有缓存条目列表(内存 + Redis 冷数据合并) +func (c *urlKeywordsCache) ListAll() []*urlKeywordsEntry { + // 收集内存数据 c.mu.RLock() - defer c.mu.RUnlock() - total := len(c.items) + memEntries := make([]*urlKeywordsEntry, 0, len(c.items)) + for elem := c.order.Front(); elem != nil; elem = elem.Next() { + memEntries = append(memEntries, elem.Value.(*urlKeywordsEntry)) + } + c.mu.RUnlock() + + // 尝试从 Redis 获取补充数据 + if c.rdb == nil { + return memEntries + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + all, err := c.rdb.HGetAll(ctx, c.redisKey).Result() + if err != nil || len(all) == 0 { + return memEntries + } + + // 去重合并(内存优先) + seen := make(map[string]bool, len(memEntries)) + for _, e := range memEntries { + seen[e.URL] = true + } + for _, v := range all { + var entry urlKeywordsEntry + if err := json.Unmarshal([]byte(v), &entry); err != nil { + continue + } + if !seen[entry.URL] { + memEntries = append(memEntries, &entry) + seen[entry.URL] = true + } + } + return memEntries +} + +// ListPage 返回分页缓存条目(内存 + Redis 冷数据合并,按 URL 字典序排列) +func (c *urlKeywordsCache) ListPage(page, pageSize int) []*urlKeywordsEntry { + // 收集内存数据 + c.mu.RLock() + memEntries := make([]*urlKeywordsEntry, 0, len(c.items)) + for elem := c.order.Front(); elem != nil; elem = elem.Next() { + memEntries = append(memEntries, elem.Value.(*urlKeywordsEntry)) + } + c.mu.RUnlock() + + // 从 Redis 补充冷数据 + if c.rdb != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + all, err := c.rdb.HGetAll(ctx, c.redisKey).Result() + if err == nil && len(all) > 0 { + seen := make(map[string]bool, len(memEntries)) + for _, e := range memEntries { + seen[e.URL] = true + } + for _, v := range all { + var entry urlKeywordsEntry + if err := json.Unmarshal([]byte(v), &entry); err != nil { + continue + } + if !seen[entry.URL] { + memEntries = append(memEntries, &entry) + seen[entry.URL] = true + } + } + } + } + + // 分页(从合并数据中按字典序取 page 页) + total := len(memEntries) offset := (page - 1) * pageSize if offset >= total { return []*urlKeywordsEntry{} } - // 从 Back(最新)向前遍历,跳过 offset 个 - elem := c.order.Back() - for i := 0; i < offset && elem != nil; i++ { - elem = elem.Prev() + end := offset + pageSize + if end > total { + end = total } - // 收集 pageSize 条 - entries := make([]*urlKeywordsEntry, 0, pageSize) - for i := 0; i < pageSize && elem != nil; i++ { - entries = append(entries, elem.Value.(*urlKeywordsEntry)) - elem = elem.Prev() - } - return entries + return memEntries[offset:end] } // Server 是搜索 HTTP 服务器,同时内嵌收获服务(统一在同一端口)。 type Server struct { - db *storage.DB + db *storage.RedisStoreV2 infoSvc *info.Service analyzer *analyzer.Analyzer httpCli *http.Client // 在线摘要抓取(无 robots.txt 检查) @@ -166,11 +322,11 @@ type Server struct { indexCacheMu sync.RWMutex indexCacheHits int64 // 缓存命中计数(原子) - // stats 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt + // stats 快照缓存:后台定时刷新,避免每次请求全量遍历 Redis statsCache map[string]any statsCacheMu sync.RWMutex - // recent 快照缓存:后台定时刷新,避免每次请求全量遍历 bbolt + // recent 快照缓存:后台定时刷新,避免每次请求全量遍历 Redis recentCache map[int][]recentItem // limit → 预截取的结果列表 recentCacheMu sync.RWMutex recentTotal int // 总条目数(不截取) @@ -192,7 +348,7 @@ type Server struct { } // New 创建一个 search Server(内嵌收获服务,统一在同一端口)。 -func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { +func New(db *storage.RedisStoreV2, infoSvc *info.Service, a *analyzer.Analyzer) *Server { s := &Server{ db: db, infoSvc: infoSvc, @@ -203,6 +359,8 @@ func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { }, urlKeywords: newURLKeywordsCache(10000), // 缓存 1 万个 URL 的关键词 } + // 注入 Redis 客户端到 urlKeywords 缓存(用于双写,2小时 TTL) + s.urlKeywords.AttachRedis(sredis.Client) // 启动定期刷盘 goroutine go s.runPeriodicFlush() // 启动 stats + recent 缓存定期刷新 goroutine @@ -383,7 +541,7 @@ type recentItem struct { } // handleAdminRecent 返回最近爬取的条目列表,按爬取时间倒序。 -// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。 +// 直接返回缓存快照,不阻塞 Redis,响应时间 <1ms。 // 参数:limit(默认50,最大200)。 func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -419,7 +577,7 @@ func (s *Server) handleAdminRecent(w http.ResponseWriter, r *http.Request) { }) } -// refreshRecentCache 全量遍历 bbolt 计算 recent 快照,预截取常用 limit,存入 recentCache。 +// refreshRecentCache 全量遍历 Redis 计算 recent 快照,预截取常用 limit,存入 recentCache。 func (s *Server) refreshRecentCache() { type entry struct { url string @@ -495,7 +653,7 @@ func (s *Server) refreshRecentCache() { } // handleAdminStats 返回全局统计:域名分布、语种分布、总 URL 数、总词数。 -// 直接返回缓存快照,不阻塞 bbolt,响应时间 <1ms。 +// 直接返回缓存快照,不阻塞 Redis,响应时间 <1ms。 func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -523,7 +681,7 @@ func (s *Server) handleAdminStats(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(cached) } -// refreshStatsCache 全量遍历 bbolt 计算统计快照,存入 statsCache。 +// refreshStatsCache 全量遍历 Redis 计算统计快照,存入 statsCache。 func (s *Server) refreshStatsCache() { domainCount := make(map[string]int) langCount := make(map[string]int) @@ -556,7 +714,7 @@ func (s *Server) refreshStatsCache() { return nil }) - // 遍历结束后批量查 SiteInfo(避免 ForEachSnippet 回调中嵌套 bbolt 事务) + // 遍历结束后批量查 SiteInfo(避免 ForEachSnippet 回调中嵌套 Redis 事务) for _, domain := range snippetDomains { siteInfo, _ := s.db.GetSiteInfo(domain) if siteInfo != nil { @@ -616,7 +774,7 @@ func (s *Server) refreshStatsCache() { } // runCacheRefresher 后台定时刷新 stats 和 recent 缓存。 -// 统一由一个 goroutine 交替刷新,避免同时全量遍历 bbolt 造成压力。 +// 统一由一个 goroutine 交替刷新,避免同时全量遍历 Redis 造成压力。 func (s *Server) runCacheRefresher() { interval := time.Duration(config.StatsRefreshInterval()) * time.Second ticker := time.NewTicker(interval) @@ -1487,8 +1645,8 @@ func timeMul(si *storage.SiteInfo, now int64) float64 { } // urlTimeMul 根据该 URL 的摘要抓取时间计算时间衰减倍数(30 天内不衰减)。 -func urlTimeMul(db *storage.DB, rawURL string, now int64) float64 { - entry, err := db.GetSnippet(rawURL) +func urlTimeMul(store *storage.RedisStoreV2, rawURL string, now int64) float64 { + entry, err := store.GetSnippet(rawURL) if err != nil || entry == nil { return 1.0 } @@ -1941,7 +2099,7 @@ func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { go s.Flush() } - // 更新 URL→关键词 LRU 缓存(从 bbolt 获取标题和摘要) + // 更新 URL→关键词 LRU 缓存(从 Redis 获取标题和摘要) keywords := make([]urlKeywordInfo, len(payload.Keywords)) for i, kw := range payload.Keywords { keywords[i] = urlKeywordInfo{ @@ -1950,7 +2108,7 @@ func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { } } title, snippet := "", "" - if snippetEntry, err := s.db.GetSnippet(payload.URL); err == nil { + if snippetEntry, err := s.db.GetSnippet(payload.URL); err == nil && snippetEntry != nil { title = snippetEntry.Title snippet = snippetEntry.Description if snippet == "" { diff --git a/storage/storage.go b/storage/storage.go deleted file mode 100644 index a052ad4..0000000 --- a/storage/storage.go +++ /dev/null @@ -1,750 +0,0 @@ -// Package storage provides the persistent index and site-info storage backed by bbolt. -// storage 包提供基于 bbolt 的持久化存储,负责保存倒排索引、URL摘要缓存和网站元信息。 -// -// 索引空间(index bucket):key = 关键词(string),value = brotli 压缩的 JSON 数组,每项为 [权重, URL] 对。 -// 融合之门(gate bucket):key = URL(string),value = brotli 压缩的 JSON 数组 [标题, 描述, 正文, 时间戳]。 -// 网站之门(site_gate bucket):key = 主机名(string),value = brotli 压缩的 JSON SiteInfo 结构体。 -// -// Python 版使用自定义哈希桶结构;Go 版直接交由 bbolt 原生处理。 -package storage - -import ( - "encoding/json" // JSON 序列化/反序列化 - "fmt" // 格式化错误信息 - "io" // io.EOF 常量 - "log" // 日志输出 - "os" // 操作系统功能(创建目录等) - "path/filepath" // 路径拼接 - "sync" // 互斥锁(保护写缓冲) - "time" // bbolt 超时配置和写缓冲定时器 - - "github.com/andybalholm/brotli" // Brotli 无损压缩库(用于压缩存储数据) - bolt "go.etcd.io/bbolt" // BoltDB,纯 Go 嵌入式 KV 数据库 -) - -// IndexEntry 是倒排索引中的单个条目。 -// 一条索引记录表示"某个 URL 与某个关键词的相关性权重"。 -type IndexEntry struct { - Weight float32 `json:"w"` // 该 URL 在该关键词下的得分/权重 - URL string `json:"u"` // 网页 URL -} - -// SnippetEntry 是 URL 对应的摘要信息缓存。 -// 包含页面标题、描述、正文片段、抓取时间戳和内容哈希(用于增量重爬检测)。 -type SnippetEntry struct { - Title string `json:"title"` // 网页标题 - Description string `json:"desc"` // meta description 或自动生成的描述 - Text string `json:"text"` // 正文前 N 字符的文本片段 - Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳 - ContentHash string `json:"hash"` // 正文内容的 FNV-1a 哈希(用于增量重爬判断内容是否变化) -} - -// 四个 bbolt bucket 的名称(以字节数组存储,bbolt 要求 key/value 均为字节) -var ( - bucketIndex = []byte("index") // 倒排索引 bucket - bucketGate = []byte("gate") // URL 摘要缓存 bucket - bucketSiteGate = []byte("site_gate") // 网站元信息 bucket - bucketPriority = []byte("priority") // 优先爬取 URL bucket -) - -// writeOp 表示一个待写入的 snippet 操作。 -type writeOp struct { - data []byte // marshalCompress 后的数据 -} - -// DB 封装一个 bbolt 数据库,提供类型化的存取接口。 -// bbolt 内部已实现并发安全,无需额外加锁。 -type DB struct { - db *bolt.DB // 底层 bbolt 数据库句柄 - - // 异步写缓冲:SetSnippet 先写到内存,定期批量刷入 bbolt。 - writeBuf map[string]*writeOp // key → 待写入的操作 - writeBufMu sync.Mutex - writeTicker *time.Ticker - writeDone chan struct{} - - // SiteInfo 独立内存缓存:读全走缓存(零 bbolt 读事务),定期批量刷入 bbolt。 - // 使用 RWMutex 允许多个读并发,写(UpdateSiteInfo)独占。 - siteCache map[string]*SiteInfo // host → 最新 SiteInfo - siteCacheMu sync.RWMutex - siteDirty map[string]struct{} // 需要刷盘的 host 集合 - siteDirtyMu sync.Mutex - siteTicker *time.Ticker - siteDone chan struct{} -} - -// Open 在指定目录路径下创建或打开 bbolt 数据库文件。 -// 如果目录不存在会自动创建。数据库文件名为 sese.db。 -func Open(dir string) (*DB, error) { - // 确保存储目录存在(0775 权限:所有者读写执行,组用户读执行,其他读执行) - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, fmt.Errorf("storage.Open mkdir: %w", err) - } - // 拼接数据库文件路径:dir/sese.db - path := filepath.Join(dir, "sese.db") - // 打开/创建数据库文件,文件权限 0600(仅所有者可读写) - // NoSync: true — 不在每次写事务后 fsync,交由 OS 决定刷盘时机。 - // 在高并发写入场景下大幅减少磁盘 I/O 阻塞,代价是极端断电可能丢失最近几秒数据(可接受)。 - db, err := bolt.Open(path, 0o600, &bolt.Options{ - NoSync: true, - Timeout: 5 * time.Second, - PageSize: 4096, - }) - if err != nil { - return nil, fmt.Errorf("storage.Open bolt: %w", err) - } - // 启动时确保四个 bucket 都存在(不存在则创建) - err = db.Update(func(tx *bolt.Tx) error { - for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate, bucketPriority} { - if _, err := tx.CreateBucketIfNotExists(b); err != nil { - return err - } - } - return nil - }) - if err != nil { - return nil, fmt.Errorf("storage.Open create buckets: %w", err) - } - - d := &DB{db: db} - - // 启动时预热 SiteInfo 缓存:一次性从 bbolt 加载所有 SiteInfo 到内存 - d.siteCache = make(map[string]*SiteInfo) - _ = db.View(func(tx *bolt.Tx) error { - return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { - var info SiteInfo - if err := decompressUnmarshal(v, &info); err != nil { - return nil // 跳过损坏条目 - } - if info.Languages == nil { - info.Languages = make(map[string]float64) - } - if info.Redirects == nil { - info.Redirects = make(map[string]string) - } - d.siteCache[string(k)] = &info - return nil - }) - }) - log.Printf("[storage] siteCache warmed: %d hosts loaded", len(d.siteCache)) - - return d, nil -} - -// StartWriteFlusher 启动后台写缓冲定时刷盘 goroutine(snippet 和 SiteInfo 各自独立)。 -func (d *DB) StartWriteFlusher() { - // Snippet 写缓冲 - d.writeBuf = make(map[string]*writeOp) - d.writeTicker = time.NewTicker(2 * time.Second) - d.writeDone = make(chan struct{}) - go func() { - for { - select { - case <-d.writeTicker.C: - d.flushWriteBuf() - case <-d.writeDone: - return - } - } - }() - - // SiteInfo 独立缓存 - d.siteCache = make(map[string]*SiteInfo) - d.siteDirty = make(map[string]struct{}) - d.siteTicker = time.NewTicker(5 * time.Second) - d.siteDone = make(chan struct{}) - go func() { - for { - select { - case <-d.siteTicker.C: - d.flushSiteCache() - case <-d.siteDone: - return - } - } - }() -} - -// flushWriteBuf 将写缓冲中的 snippet 操作批量刷入 bbolt。 -func (d *DB) flushWriteBuf() { - d.writeBufMu.Lock() - if len(d.writeBuf) == 0 { - d.writeBufMu.Unlock() - return - } - // 快照并清空缓冲 - snapshot := d.writeBuf - d.writeBuf = make(map[string]*writeOp) - d.writeBufMu.Unlock() - - // 单个事务批量写入 - if err := d.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketGate) - for k, op := range snapshot { - if err := b.Put([]byte(k), op.data); err != nil { - return err - } - } - return nil - }); err != nil { - log.Printf("[storage] flushWriteBuf error: %v", err) - } -} - -// flushSiteCache 将脏的 SiteInfo 批量刷入 bbolt。 -func (d *DB) flushSiteCache() { - d.siteDirtyMu.Lock() - if len(d.siteDirty) == 0 { - d.siteDirtyMu.Unlock() - return - } - dirty := d.siteDirty - d.siteDirty = make(map[string]struct{}) - d.siteDirtyMu.Unlock() - - // 在读锁下快照所有脏数据并预压缩 - d.siteCacheMu.RLock() - type kv struct { - host string - data []byte - } - items := make([]kv, 0, len(dirty)) - for host := range dirty { - if info, ok := d.siteCache[host]; ok { - data, err := marshalCompress(info) - if err != nil { - log.Printf("[storage] flushSiteCache marshal error for %s: %v", host, err) - continue - } - items = append(items, kv{host, data}) - } - } - d.siteCacheMu.RUnlock() - - if len(items) == 0 { - return - } - - // 单个事务批量写入 bbolt - if err := d.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketSiteGate) - for _, item := range items { - if err := b.Put([]byte(item.host), item.data); err != nil { - return err - } - } - return nil - }); err != nil { - log.Printf("[storage] flushSiteCache error: %v", err) - } -} - -// Close 关闭底层 bbolt 数据库连接。 -// 先刷完写缓冲和 SiteInfo 缓存,再关闭定时器,最后关闭数据库。 -func (d *DB) Close() error { - if d.writeTicker != nil { - d.writeTicker.Stop() - } - if d.siteTicker != nil { - d.siteTicker.Stop() - } - d.flushWriteBuf() - d.flushSiteCache() - if d.writeDone != nil { - close(d.writeDone) - } - if d.siteDone != nil { - close(d.siteDone) - } - return d.db.Close() -} - -// ---- 辅助函数:压缩与解压、map 拷贝 ---- - -// copyMapF64 深拷贝 map[string]float64。 -func copyMapF64(m map[string]float64) map[string]float64 { - cp := make(map[string]float64, len(m)) - for k, v := range m { - cp[k] = v - } - return cp -} - -// copyMapStr 深拷贝 map[string]string。 -func copyMapStr(m map[string]string) map[string]string { - cp := make(map[string]string, len(m)) - for k, v := range m { - cp[k] = v - } - return cp -} - -// compress 将字节数组用 brotli 压缩后返回。 -// brotli 压缩比高于 gzip,适合大量文本的存储空间优化。 -func compress(data []byte) ([]byte, error) { - buf := make([]byte, 0, len(data)) // 预分配,避免反复扩容 - w := brotli.NewWriterLevel((*appendWriter)(&buf), 3) // 压缩级别 3(优先速度,压缩比损失约 10-15%) - if _, err := w.Write(data); err != nil { - return nil, err - } - if err := w.Close(); err != nil { - return nil, err - } - return buf, nil -} - -// decompress 将 brotli 压缩的字节数组解压还原。 -func decompress(data []byte) ([]byte, error) { - // brotli.NewReader 从字节数组创建读取器(通过 byteReader 适配 io.Reader 接口) - r := brotli.NewReader( - (*byteReader)(&data), - ) - out := make([]byte, 0, len(data)*3) // 预分配约 3 倍空间(解压后通常更大) - tmp := make([]byte, 4096) // 每次最多读 4KB - for { - n, err := r.Read(tmp) - out = append(out, tmp[:n]...) // 追加本次读取的字节 - if err != nil { - if err == io.EOF { - break // 正常读完 - } - return out, err // 其他错误(非 EOF)则返回 - } - } - return out, nil -} - -// appendWriter 将 *[]byte 适配为 io.Writer 接口(写入时直接 append)。 -type appendWriter []byte - -// Write 将数据 p 追加到 appendWriter 末尾,返回写入字节数。 -func (a *appendWriter) Write(p []byte) (int, error) { - *a = append(*a, p...) - return len(p), nil -} - -// byteReader 将 []byte 适配为 io.Reader 接口(顺序读取,支持读完后返回 EOF)。 -type byteReader []byte - -// Read 从字节数组读取最多 len(p) 字节到 p 中,返回实际读取字节数和可能的错误。 -// 当字节数组全部读完后返回 io.EOF。 -func (b *byteReader) Read(p []byte) (int, error) { - if len(*b) == 0 { - return 0, io.EOF // 已读完 - } - n := copy(p, *b) // 复制最多 len(p) 字节 - *b = (*b)[n:] // 前进指针 - return n, nil -} - -// marshalCompress 将任意可序列化对象先 JSON 编码,再 brotli 压缩,返回压缩后的字节。 -func marshalCompress(v any) ([]byte, error) { - raw, err := json.Marshal(v) // 先序列化为 JSON - if err != nil { - return nil, err - } - return compress(raw) // 再压缩 -} - -// decompressUnmarshal 将压缩字节先解压,再 JSON 反序列化到目标对象 v。 -func decompressUnmarshal(data []byte, v any) error { - raw, err := decompress(data) // 先解压 - if err != nil { - return err - } - return json.Unmarshal(raw, v) // 再反序列化 -} - -// ---- 倒排索引(Index)相关方法 ---- - -// GetIndex 根据关键词查询倒排索引,返回该词关联的所有 [权重, URL] 条目列表。 -func (d *DB) GetIndex(keyword string) ([]IndexEntry, error) { - var entries []IndexEntry - err := d.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketIndex) - v := b.Get([]byte(keyword)) // 在 index bucket 中按关键词查询 - if v == nil { - return nil // 不存在该词,返回空列表 - } - return decompressUnmarshal(v, &entries) - }) - return entries, err -} - -// SetIndex 将某关键词的完整索引条目列表覆盖写入(替换旧数据)。 -func (d *DB) SetIndex(keyword string, entries []IndexEntry) error { - data, err := marshalCompress(entries) - if err != nil { - return err - } - return d.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketIndex).Put([]byte(keyword), data) - }) -} - -// BatchSetIndex 分批写入多个关键词→条目列表的映射。 -// 大数据量时拆分为多个小事务,避免单个大事务导致的长时间阻塞。 -func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error { - const batchSize = 1000 - items := make([]struct { - keyword string - entries []IndexEntry - }, 0, len(batch)) - for k, v := range batch { - items = append(items, struct { - keyword string - entries []IndexEntry - }{k, v}) - } - totalBatches := (len(items) + batchSize - 1) / batchSize - for i := 0; i < len(items); i += batchSize { - end := i + batchSize - if end > len(items) { - end = len(items) - } - batchNum := i/batchSize + 1 - - // 事务外预先完成所有序列化和压缩,减少事务持锁时间 - preItems := make([]struct { - keyword string - data []byte - }, 0, end-i) - for _, item := range items[i:end] { - data, err := marshalCompress(item.entries) - if err != nil { - return err - } - preItems = append(preItems, struct { - keyword string - data []byte - }{item.keyword, data}) - } - - // 事务内只做纯内存写入,持锁时间极短 - if err := d.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(bucketIndex) - for _, p := range preItems { - if err := b.Put([]byte(p.keyword), p.data); err != nil { - return err - } - } - return nil - }); err != nil { - return err - } - if totalBatches > 1 { - log.Printf("[storage] BatchSetIndex progress: batch %d/%d (%d keys)", batchNum, totalBatches, end) - } - } - return nil -} - -// ForEachIndex 遍历倒排索引中所有关键词及其关联条目,对每个条目调用 fn。 -// 用于全量读取索引、做备份或重新计算等场景。 -func (d *DB) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error { - return d.db.View(func(tx *bolt.Tx) error { - return tx.Bucket(bucketIndex).ForEach(func(k, v []byte) error { - var entries []IndexEntry - if err := decompressUnmarshal(v, &entries); err != nil { - return nil // 跳过损坏条目,不中断遍历 - } - return fn(string(k), entries) - }) - }) -} - -// ---- 融合之门(Gate):URL 摘要缓存相关方法 ---- - -// GetSnippet 根据 URL 查询缓存的摘要信息(标题/描述/正文片段)。 -// 若未命中返回 error。 -func (d *DB) GetSnippet(url string) (*SnippetEntry, error) { - var entry SnippetEntry - err := d.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(bucketGate).Get([]byte(url)) - if v == nil { - return fmt.Errorf("not found") - } - return decompressUnmarshal(v, &entry) - }) - if err != nil { - return nil, err - } - return &entry, nil -} - -// SetSnippet 将某 URL 的摘要信息写入写缓冲(异步批量刷入磁盘)。 -func (d *DB) SetSnippet(url string, entry *SnippetEntry) error { - data, err := marshalCompress(entry) - if err != nil { - return err - } - d.writeBufMu.Lock() - d.writeBuf[url] = &writeOp{data: data} - // 缓冲过大时同步刷一次,防止内存膨胀 - if len(d.writeBuf) >= 5000 { - d.writeBufMu.Unlock() - d.flushWriteBuf() - return nil - } - d.writeBufMu.Unlock() - return nil -} - -// ---- 网站之门(SiteGate):网站元信息相关方法 ---- - -// SiteInfo 存放每个域名/主机的元信息,与 Python 版网站.py 的 dataclass 对应。 -type SiteInfo struct { - VisitCount int `json:"visit_count"` // 累计访问该网站的次数 - LastVisitTime int64 `json:"last_visit_time"` // 上次访问该网站的时间戳 - Fingerprint any `json:"fingerprint,omitempty"` // 网站指纹(用于识别重复站点) - SuccessRate *float64 `json:"success_rate,omitempty"` // 访问成功率(成功次数/总访问次数) - HTMLStructure string `json:"html_structure,omitempty"` // HTML 结构特征摘要 - IPs []string `json:"ips,omitempty"` // 该域名解析出的 IP 列表 - Quality *float64 `json:"quality,omitempty"` // 网站质量评分(0~1) - HTTPSAvailable *bool `json:"https_available,omitempty"` // 是否支持 HTTPS - Keywords []string `json:"keywords,omitempty"` // 该网站的高频关键词列表 - OutLinks []string `json:"out_links,omitempty"` // 从该网站页面提取的出站链接列表 - Languages map[string]float64 `json:"languages,omitempty"` // 网站语种分布(语种代码 → 占比) - Redirects map[string]string `json:"redirects,omitempty"` // 重定向链(URL → 最终 URL) - ServerTypes []string `json:"server_types,omitempty"` // 网站使用的 HTTP Server 类型列表 -} - -// GetSiteInfo 根据主机名查询网站元信息。 -// 全程只读内存缓存(零 bbolt 读事务),适合高并发调用。 -// 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。 -func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { - d.siteCacheMu.RLock() - info, ok := d.siteCache[host] - d.siteCacheMu.RUnlock() - - if ok { - // 返回深拷贝,防止调用方修改缓存 - cp := *info - if cp.Languages == nil { - cp.Languages = make(map[string]float64) - } else { - cp.Languages = copyMapF64(cp.Languages) - } - if cp.Redirects == nil { - cp.Redirects = make(map[string]string) - } else { - cp.Redirects = copyMapStr(cp.Redirects) - } - return &cp, nil - } - - // 缓存未命中,从 bbolt 加载 - // 注意:不能在 bbolt 事务回调(如 ForEachSnippet 的 fn)内调用此路径(bbolt 不支持嵌套事务)。 - // 正常情况下预热已加载全部数据,不会走到这里。 - var si SiteInfo - var err error - func() { - defer func() { - if r := recover(); r != nil { - // bbolt 嵌套事务 panic,返回空 SiteInfo - si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} - err = nil - } - }() - err = d.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(bucketSiteGate).Get([]byte(host)) - if v == nil { - return fmt.Errorf("not found") - } - return decompressUnmarshal(v, &si) - }) - }() - if err != nil { - return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil - } - if si.Languages == nil { - si.Languages = make(map[string]float64) - } - if si.Redirects == nil { - si.Redirects = make(map[string]string) - } - // 回填缓存 - d.siteCacheMu.Lock() - if existing, exists := d.siteCache[host]; !exists { - d.siteCache[host] = &si - } else { - si = *existing // 用更新的版本 - } - d.siteCacheMu.Unlock() - return &si, nil -} - -// SetSiteInfo 将网站元信息直接写入内存缓存并标记为脏(异步刷入磁盘)。 -func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { - cp := *info // 浅拷贝,断开与调用方的引用 - if cp.Languages != nil { - cp.Languages = copyMapF64(cp.Languages) - } - if cp.Redirects != nil { - cp.Redirects = copyMapStr(cp.Redirects) - } - d.siteCacheMu.Lock() - d.siteCache[host] = &cp - d.siteCacheMu.Unlock() - d.siteDirtyMu.Lock() - d.siteDirty[host] = struct{}{} - d.siteDirtyMu.Unlock() - return nil -} - -// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回缓存。 -// 只用 siteCacheMu(不碰 bbolt),不会与 flush 产生跨锁依赖。 -func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { - d.siteCacheMu.Lock() - - // 从缓存读取(缓存未命中会返回 nil,后续初始化为空) - info, ok := d.siteCache[host] - if !ok { - // 从 bbolt 加载(注意:这是唯一会做 bbolt 读的地方,但只在首次 miss 时触发) - var si SiteInfo - err := d.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(bucketSiteGate).Get([]byte(host)) - if v == nil { - return fmt.Errorf("not found") - } - return decompressUnmarshal(v, &si) - }) - if err != nil { - si = SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} - } else { - if si.Languages == nil { - si.Languages = make(map[string]float64) - } - if si.Redirects == nil { - si.Redirects = make(map[string]string) - } - } - d.siteCache[host] = &si - info = &si - } - - // 在锁内调用修改函数(直接修改缓存中的对象) - fn(info) - - d.siteCacheMu.Unlock() - - // 标记脏(锁外) - d.siteDirtyMu.Lock() - d.siteDirty[host] = struct{}{} - d.siteDirtyMu.Unlock() - return nil -} - -// ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。 -func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error { - return d.db.View(func(tx *bolt.Tx) error { - return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { - var info SiteInfo - if err := decompressUnmarshal(v, &info); err != nil { - return nil // 跳过损坏条目 - } - return fn(string(k), &info) - }) - }) -} - -// ForEachSnippet 遍历所有 URL 摘要条目,对每个条目调用 fn。 -func (d *DB) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error { - return d.db.View(func(tx *bolt.Tx) error { - return tx.Bucket(bucketGate).ForEach(func(k, v []byte) error { - var entry SnippetEntry - if err := decompressUnmarshal(v, &entry); err != nil { - return nil // 跳过损坏条目 - } - return fn(string(k), &entry) - }) - }) -} - -// ---- 优先爬取队列(Priority Queue)相关方法 ---- - -// PriorityEntry 记录一条待优先爬取的 URL 或域名。 -type PriorityEntry struct { - URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL) - IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL) - AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳 - Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记) -} - -// GetPriorityURLs 返回所有未访问的 priority 条目(按添加时间升序)。 -func (d *DB) GetPriorityURLs() ([]PriorityEntry, error) { - var entries []PriorityEntry - err := d.db.View(func(tx *bolt.Tx) error { - return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { - var e PriorityEntry - if err := decompressUnmarshal(v, &e); err != nil { - return nil // 跳过损坏条目 - } - if !e.Visited { - entries = append(entries, e) - } - return nil - }) - }) - return entries, err -} - -// AddPriorityURL 添加一条 priority 条目(key = URL,value = PriorityEntry)。 -// 每次手动添加都会重新爬取,不做去重检查。 -func (d *DB) AddPriorityURL(entry PriorityEntry) error { - return d.db.Update(func(tx *bolt.Tx) error { - data, err := marshalCompress(entry) - if err != nil { - return err - } - return tx.Bucket(bucketPriority).Put([]byte(entry.URL), data) - }) -} - -// RemovePriorityURL 删除指定 URL 的 priority 条目。 -func (d *DB) RemovePriorityURL(url string) error { - return d.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketPriority).Delete([]byte(url)) - }) -} - -// MarkPriorityURLVisited 将指定 URL 的 priority 条目标记为已访问。 -// 用于 priority 爬取完成后标记,避免 RemovePriorityURL 后同一 URL 被重复添加。 -func (d *DB) MarkPriorityURLVisited(url string) error { - return d.db.Update(func(tx *bolt.Tx) error { - k := []byte(url) - v := tx.Bucket(bucketPriority).Get(k) - if v == nil { - return nil // 条目不存在,无需处理 - } - var e PriorityEntry - if err := decompressUnmarshal(v, &e); err != nil { - return nil - } - if e.Visited { - return nil // 已是 visited 状态 - } - e.Visited = true - data, err := marshalCompress(e) - if err != nil { - return err - } - return tx.Bucket(bucketPriority).Put(k, data) - }) -} - -// ClearVisitedPriorityURLs 批量删除所有已标记为 visited 的条目(crawler 爬完后调用)。 -func (d *DB) ClearVisitedPriorityURLs() error { - return d.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketPriority).ForEach(func(k, v []byte) error { - var e PriorityEntry - if err := decompressUnmarshal(v, &e); err != nil { - return nil - } - if e.Visited { - return tx.Bucket(bucketPriority).Delete(k) - } - return nil - }) - }) -} diff --git a/storage/store.go b/storage/store.go new file mode 100644 index 0000000..4b4b157 --- /dev/null +++ b/storage/store.go @@ -0,0 +1,651 @@ +// Package storage provides unified storage interface backed by Redis + MySQL. +// Redis is used as the primary high-performance store, MySQL is used for persistence. +package storage + +import ( + "context" + "crypto/md5" + "fmt" + "sync" + "sync/atomic" + + goredis "github.com/redis/go-redis/v9" + extredis "sese-engine/redis" +) + +// PriorityEntry 记录一条待优先爬取的 URL 或域名。 +type PriorityEntry struct { + URL string `json:"url"` // 用户提交的 URL 或域名(会自动规范化为带 scheme 的 URL) + IsDomain bool `json:"domain"` // 是否为纯域名(true=仅域名,false=完整 URL) + AddedAt int64 `json:"added_at"` // 添加时的 Unix 时间戳 + Visited bool `json:"visited"` // 是否已爬取(crawler 爬完后标记) +} + +// Store 统一存储接口,所有数据操作都通过此接口。 +type Store interface { + // 倒排索引操作 + GetIndex(keyword string) ([]IndexEntry, error) + BatchSetIndex(batch map[string][]IndexEntry) error + ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error + + // URL 摘要操作 + GetSnippet(url string) (*SnippetEntry, error) + SetSnippet(url string, entry *SnippetEntry) error + ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error + + // 网站信息操作 + GetSiteInfo(host string) (*SiteInfo, error) + SetSiteInfo(host string, info *SiteInfo) error + UpdateSiteInfo(host string, fn func(*SiteInfo)) error + ForEachSite(fn func(host string, info *SiteInfo) error) error + + // Priority URL 操作 + GetPriorityURLs() ([]PriorityEntry, error) + AddPriorityURL(entry PriorityEntry) error + RemovePriorityURL(url string) error + MarkPriorityURLVisited(url string) error + ClearVisitedPriorityURLs() error + + // 生命周期 + Close() error +} + +// RedisStore 实现了 Store 接口,提供 Redis 存储能力。 +type RedisStoreV2 struct { + client *goredis.Client + + // 内存索引聚合(用于写入) + mem map[string][]IndexEntry + memMu sync.RWMutex + rowCount int64 + + // 站点信息内存缓存 + siteCache map[string]*SiteInfo + siteCacheMu sync.RWMutex +} + +// NewRedisStoreV2 创建新的 Redis 存储实例 +func NewRedisStoreV2() *RedisStoreV2 { + return &RedisStoreV2{ + mem: make(map[string][]IndexEntry), + siteCache: make(map[string]*SiteInfo), + } +} + +// Init 初始化 Redis 存储 +func (r *RedisStoreV2) Init() error { + if extredis.Client == nil { + return fmt.Errorf("redis not initialized, call redis.Open() first") + } + r.client = extredis.Client + return nil +} + +// Close 关闭存储 +func (r *RedisStoreV2) Close() error { + // 将内存数据刷到 Redis + if err := r.FlushMemToRedis(); err != nil { + return err + } + return nil +} + +// ---- 倒排索引操作 ---- + +// GetIndex 获取关键词的倒排索引 +func (r *RedisStoreV2) GetIndex(keyword string) ([]IndexEntry, error) { + ctx := context.Background() + + entries, err := r.client.ZRevRangeWithScores(ctx, "idx:"+keyword, 0, -1).Result() + if err != nil { + if err == goredis.Nil { + return nil, nil + } + return nil, err + } + + result := make([]IndexEntry, 0, len(entries)) + for _, e := range entries { + result = append(result, IndexEntry{ + Weight: float32(e.Score), + URL: e.Member.(string), + }) + } + return result, nil +} + +// BatchSetIndex 批量设置倒排索引 +func (r *RedisStoreV2) BatchSetIndex(batch map[string][]IndexEntry) error { + ctx := context.Background() + + for keyword, entries := range batch { + if len(entries) == 0 { + continue + } + + // 先删除旧的 + r.client.Del(ctx, "idx:"+keyword) + + // 添加新的 + if len(entries) > 0 { + zSlice := make([]goredis.Z, len(entries)) + for i, e := range entries { + zSlice[i] = goredis.Z{ + Score: float64(e.Weight), + Member: e.URL, + } + } + if err := r.client.ZAdd(ctx, "idx:"+keyword, zSlice...).Err(); err != nil { + return err + } + } + } + return nil +} + +// ForEachIndex 遍历所有倒排索引 +func (r *RedisStoreV2) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error { + ctx := context.Background() + var cursor uint64 + + for { + keys, nextCursor, err := r.client.Scan(ctx, cursor, "idx:*", 1000).Result() + if err != nil { + return err + } + + for _, key := range keys { + keyword := key[4:] // 去掉 "idx:" 前缀 + entries, err := r.GetIndex(keyword) + if err != nil { + continue + } + if err := fn(keyword, entries); err != nil { + return err + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + return nil +} + +// ---- URL 摘要操作 ---- + +// GetSnippet 获取 URL 摘要 +func (r *RedisStoreV2) GetSnippet(url string) (*SnippetEntry, error) { + ctx := context.Background() + hash := urlHash(url) + + data, err := r.client.HGetAll(ctx, "gate:"+hash).Result() + if err != nil { + return nil, err + } + if len(data) == 0 { + return nil, nil + } + + return &SnippetEntry{ + Title: data["title"], + Description: data["desc"], + Text: data["text"], + Timestamp: parseInt64(data["ts"]), + ContentHash: data["hash"], + }, nil +} + +// SetSnippet 设置 URL 摘要 +func (r *RedisStoreV2) SetSnippet(url string, entry *SnippetEntry) error { + ctx := context.Background() + hash := urlHash(url) + + fields := map[string]interface{}{ + "url": url, + "title": entry.Title, + "desc": entry.Description, + "text": entry.Text, + "ts": entry.Timestamp, + "hash": entry.ContentHash, + } + + err := r.client.HMSet(ctx, "gate:"+hash, fields).Err() + if err != nil { + return err + } + + // 同时存储 URL→hash 的映射 + r.client.Set(ctx, "url2hash:"+url, hash, 0) + return nil +} + +// ForEachSnippet 遍历所有 URL 摘要 +func (r *RedisStoreV2) ForEachSnippet(fn func(url string, entry *SnippetEntry) error) error { + ctx := context.Background() + var cursor uint64 + + for { + keys, nextCursor, err := r.client.Scan(ctx, cursor, "gate:*", 1000).Result() + if err != nil { + return err + } + + for _, key := range keys { + data, err := r.client.HGetAll(ctx, key).Result() + if err != nil || len(data) == 0 { + continue + } + + entry := &SnippetEntry{ + Title: data["title"], + Description: data["desc"], + Text: data["text"], + Timestamp: parseInt64(data["ts"]), + ContentHash: data["hash"], + } + + if err := fn(data["url"], entry); err != nil { + return err + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + return nil +} + +// ---- 网站信息操作 ---- + +// GetSiteInfo 获取网站信息 +func (r *RedisStoreV2) GetSiteInfo(host string) (*SiteInfo, error) { + // 先从内存缓存读取 + r.siteCacheMu.RLock() + if info, ok := r.siteCache[host]; ok { + r.siteCacheMu.RUnlock() + return info, nil + } + r.siteCacheMu.RUnlock() + + // 从 Redis 读取 + ctx := context.Background() + data, err := r.client.HGetAll(ctx, "site:"+host).Result() + if err != nil { + return nil, err + } + if len(data) == 0 { + return nil, nil + } + + info := &SiteInfo{ + VisitCount: int(parseInt64(data["visit_count"])), + LastVisitTime: parseInt64(data["last_visit_time"]), + } + + if v, ok := data["success_rate"]; ok { + f := parseFloat64(v) + info.SuccessRate = &f + } + if v, ok := data["https_available"]; ok { + b := parseInt64(v) == 1 + info.HTTPSAvailable = &b + } + + // 回填缓存 + r.siteCacheMu.Lock() + r.siteCache[host] = info + r.siteCacheMu.Unlock() + + return info, nil +} + +// SetSiteInfo 设置网站信息 +func (r *RedisStoreV2) SetSiteInfo(host string, info *SiteInfo) error { + ctx := context.Background() + + fields := map[string]interface{}{ + "visit_count": info.VisitCount, + "last_visit_time": info.LastVisitTime, + } + + if info.SuccessRate != nil { + fields["success_rate"] = *info.SuccessRate + } + if info.HTTPSAvailable != nil { + if *info.HTTPSAvailable { + fields["https_available"] = 1 + } else { + fields["https_available"] = 0 + } + } + + err := r.client.HMSet(ctx, "site:"+host, fields).Err() + if err != nil { + return err + } + + // 更新缓存 + r.siteCacheMu.Lock() + r.siteCache[host] = info + r.siteCacheMu.Unlock() + + return nil +} + +// UpdateSiteInfo 更新网站信息 +func (r *RedisStoreV2) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { + info, err := r.GetSiteInfo(host) + if err != nil { + return err + } + if info == nil { + info = &SiteInfo{} + } + + // 调用更新函数 + fn(info) + + return r.SetSiteInfo(host, info) +} + +// ForEachSite 遍历所有网站信息 +func (r *RedisStoreV2) ForEachSite(fn func(host string, info *SiteInfo) error) error { + ctx := context.Background() + var cursor uint64 + + for { + keys, nextCursor, err := r.client.Scan(ctx, cursor, "site:*", 1000).Result() + if err != nil { + return err + } + + for _, key := range keys { + host := key[5:] // 去掉 "site:" 前缀 + info, err := r.GetSiteInfo(host) + if err != nil || info == nil { + continue + } + if err := fn(host, info); err != nil { + return err + } + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + return nil +} + +// ---- Priority URL 操作 ---- + +// GetPriorityURLs 获取所有未访问的 Priority URL +func (r *RedisStoreV2) GetPriorityURLs() ([]PriorityEntry, error) { + ctx := context.Background() + + keys, err := r.client.Keys(ctx, "priority:*").Result() + if err != nil { + return nil, err + } + + var entries []PriorityEntry + for _, key := range keys { + data, err := r.client.HGetAll(ctx, key).Result() + if err != nil || len(data) == 0 { + continue + } + + visited := data["visited"] == "1" + if visited { + continue + } + + entry := PriorityEntry{ + URL: data["url"], + IsDomain: data["is_domain"] == "1", + AddedAt: parseInt64(data["added_at"]), + Visited: visited, + } + entries = append(entries, entry) + } + + return entries, nil +} + +// AddPriorityURL 添加 Priority URL +func (r *RedisStoreV2) AddPriorityURL(entry PriorityEntry) error { + ctx := context.Background() + + fields := map[string]interface{}{ + "url": entry.URL, + "is_domain": boolToStr(entry.IsDomain), + "added_at": entry.AddedAt, + "visited": boolToStr(entry.Visited), + } + + key := "priority:" + entry.URL + return r.client.HMSet(ctx, key, fields).Err() +} + +// RemovePriorityURL 删除 Priority URL +func (r *RedisStoreV2) RemovePriorityURL(url string) error { + ctx := context.Background() + return r.client.Del(ctx, "priority:"+url).Err() +} + +// MarkPriorityURLVisited 标记 Priority URL 为已访问 +func (r *RedisStoreV2) MarkPriorityURLVisited(url string) error { + ctx := context.Background() + return r.client.HSet(ctx, "priority:"+url, "visited", "1").Err() +} + +// ClearVisitedPriorityURLs 清除已访问的 Priority URL +func (r *RedisStoreV2) ClearVisitedPriorityURLs() error { + ctx := context.Background() + + keys, err := r.client.Keys(ctx, "priority:*").Result() + if err != nil { + return err + } + + for _, key := range keys { + visited, _ := r.client.HGet(ctx, key, "visited").Result() + if visited == "1" { + r.client.Del(ctx, key) + } + } + + return nil +} + +// ---- 内存索引操作(用于写入)---- + +// GetMemIndex 获取内存中的索引条目 +func (r *RedisStoreV2) GetMemIndex(keyword string) []IndexEntry { + r.memMu.RLock() + defer r.memMu.RUnlock() + return r.mem[keyword] +} + +// SetMemIndex 设置内存中的索引条目 +func (r *RedisStoreV2) SetMemIndex(keyword string, entries []IndexEntry) { + r.memMu.Lock() + r.mem[keyword] = entries + r.memMu.Unlock() +} + +// GetAllMemIndexes 获取所有内存索引 +func (r *RedisStoreV2) GetAllMemIndexes() map[string][]IndexEntry { + r.memMu.RLock() + defer r.memMu.RUnlock() + + result := make(map[string][]IndexEntry, len(r.mem)) + for k, v := range r.mem { + result[k] = v + } + return result +} + +// GetRowCount 获取未刷盘的索引条目数 +func (r *RedisStoreV2) GetRowCount() int64 { + return atomic.LoadInt64(&r.rowCount) +} + +// AddRowCount 增加未刷盘的索引条目计数 +func (r *RedisStoreV2) AddRowCount(delta int64) { + atomic.AddInt64(&r.rowCount, delta) +} + +// SetRowCount 设置未刷盘的索引条目计数 +func (r *RedisStoreV2) SetRowCount(v int64) { + atomic.StoreInt64(&r.rowCount, v) +} + +// FlushMemToRedis 将内存索引刷到 Redis +func (r *RedisStoreV2) FlushMemToRedis() error { + r.memMu.Lock() + snapshot := r.mem + r.mem = make(map[string][]IndexEntry) + atomic.StoreInt64(&r.rowCount, 0) + r.memMu.Unlock() + + if len(snapshot) == 0 { + return nil + } + + // 合并内存和 Redis 数据,然后写回 + for keyword, memEntries := range snapshot { + // 从 Redis 读取已有数据 + diskEntries, _ := r.GetIndex(keyword) + + // 合并 + merged := mergeEntries(memEntries, diskEntries) + + // 写回 Redis + if err := r.setIndexEntries(keyword, merged); err != nil { + return err + } + } + + return nil +} + +func (r *RedisStoreV2) setIndexEntries(keyword string, entries []IndexEntry) error { + ctx := context.Background() + + // 删除旧的 + r.client.Del(ctx, "idx:"+keyword) + + if len(entries) > 0 { + zSlice := make([]goredis.Z, len(entries)) + for i, e := range entries { + zSlice[i] = goredis.Z{ + Score: float64(e.Weight), + Member: e.URL, + } + } + return r.client.ZAdd(ctx, "idx:"+keyword, zSlice...).Err() + } + + return nil +} + +func mergeEntries(newEntries, existingEntries []IndexEntry) []IndexEntry { + seen := make(map[string]bool) + var result []IndexEntry + + // 先添加新条目 + for _, e := range newEntries { + if !seen[e.URL] { + result = append(result, e) + seen[e.URL] = true + } + } + + // 添加已有条目中不在新条目里的 + for _, e := range existingEntries { + if !seen[e.URL] { + result = append(result, e) + seen[e.URL] = true + } + } + + return result +} + +// SiteCacheRefresh 从 Redis 刷新站点缓存 +func (r *RedisStoreV2) SiteCacheRefresh() error { + ctx := context.Background() + var cursor uint64 + + r.siteCacheMu.Lock() + defer r.siteCacheMu.Unlock() + + for { + keys, nextCursor, err := r.client.Scan(ctx, cursor, "site:*", 1000).Result() + if err != nil { + return err + } + + for _, key := range keys { + host := key[5:] // 去掉 "site:" 前缀 + data, err := r.client.HGetAll(ctx, key).Result() + if err != nil || len(data) == 0 { + continue + } + + info := &SiteInfo{ + VisitCount: int(parseInt64(data["visit_count"])), + LastVisitTime: parseInt64(data["last_visit_time"]), + } + + if v, ok := data["success_rate"]; ok { + f := parseFloat64(v) + info.SuccessRate = &f + } + if v, ok := data["https_available"]; ok { + b := parseInt64(v) == 1 + info.HTTPSAvailable = &b + } + + r.siteCache[host] = info + } + + cursor = nextCursor + if cursor == 0 { + break + } + } + + return nil +} + +// ---- 辅助函数 ---- + +func urlHash(url string) string { + return fmt.Sprintf("%x", md5.Sum([]byte(url))) +} + +func parseInt64(s string) int64 { + var v int64 + fmt.Sscanf(s, "%d", &v) + return v +} + +func parseFloat64(s string) float64 { + var v float64 + fmt.Sscanf(s, "%f", &v) + return v +} + +func boolToStr(b bool) string { + if b { + return "1" + } + return "0" +} diff --git a/storage/types.go b/storage/types.go new file mode 100644 index 0000000..ec2586e --- /dev/null +++ b/storage/types.go @@ -0,0 +1,37 @@ +// Package storage provides unified storage interface backed by Redis + MySQL. +// This file defines the core data types used throughout the storage system. +package storage + +// IndexEntry 是倒排索引中的单个条目。 +// 一条索引记录表示"某个 URL 与某个关键词的相关性权重"。 +type IndexEntry struct { + Weight float32 `json:"w"` // 该 URL 在该关键词下的得分/权重 + URL string `json:"u"` // 网页 URL +} + +// SnippetEntry 是 URL 对应的摘要信息缓存。 +// 包含页面标题、描述、正文片段、抓取时间戳和内容哈希(用于增量重爬检测)。 +type SnippetEntry struct { + Title string `json:"title"` // 网页标题 + Description string `json:"desc"` // meta description 或自动生成的描述 + Text string `json:"text"` // 正文前 N 字符的文本片段 + Timestamp int64 `json:"ts"` // 抓取该页面时的 Unix 时间戳 + ContentHash string `json:"hash"` // 正文内容的 FNV-1a 哈希(用于增量重爬判断内容是否变化) +} + +// SiteInfo 存放每个域名/主机的元信息。 +type SiteInfo struct { + VisitCount int `json:"visit_count"` // 累计访问该网站的次数 + LastVisitTime int64 `json:"last_visit_time"` // 上次访问该网站的时间戳 + Fingerprint any `json:"fingerprint,omitempty"` // 网站指纹(用于识别重复站点) + SuccessRate *float64 `json:"success_rate,omitempty"` // 访问成功率(成功次数/总访问次数) + HTMLStructure string `json:"html_structure,omitempty"` // HTML 结构特征摘要 + IPs []string `json:"ips,omitempty"` // 该域名解析出的 IP 列表 + Quality *float64 `json:"quality,omitempty"` // 网站质量评分(0~1) + HTTPSAvailable *bool `json:"https_available,omitempty"` // 是否支持 HTTPS + Keywords []string `json:"keywords,omitempty"` // 该网站的高频关键词列表 + OutLinks []string `json:"out_links,omitempty"` // 从该网站页面提取的出站链接列表 + Languages map[string]float64 `json:"languages,omitempty"` // 网站语种分布(语种代码 → 占比) + Redirects map[string]string `json:"redirects,omitempty"` // 重定向链(URL → 最终 URL) + ServerTypes []string `json:"server_types,omitempty"` // 网站使用的 HTTP Server 类型列表 +}