package mysql import ( "context" "fmt" "log" "strings" "sync" "time" goredis "github.com/redis/go-redis/v9" ) // Flusher 管理 Redis → MySQL 刷盘任务 type Flusher struct { redisDB *goredis.Client // Redis 客户端引用 interval time.Duration // 刷盘间隔 batchSize int // 每批次处理数量 mu sync.Mutex // 防止并发刷盘 stopCh chan struct{} wg sync.WaitGroup } // NewFlusher 创建刷盘器 func NewFlusher(redisDB *goredis.Client, interval time.Duration, batchSize int) *Flusher { if interval <= 0 { interval = 5 * time.Minute } if batchSize <= 0 { batchSize = 1000 } return &Flusher{ redisDB: redisDB, interval: interval, batchSize: batchSize, stopCh: make(chan struct{}), } } // Start 启动后台刷盘任务 func (f *Flusher) Start() { f.wg.Add(1) go func() { defer f.wg.Done() ticker := time.NewTicker(f.interval) defer ticker.Stop() for { select { case <-ticker.C: f.RunAll() case <-f.stopCh: log.Printf("[mysql-flusher] stopped") return } } }() log.Printf("[mysql-flusher] started (interval=%v, batchSize=%d)", f.interval, f.batchSize) } // Stop 停止刷盘任务 func (f *Flusher) Stop() { close(f.stopCh) f.wg.Wait() } // RunAll 执行所有类型的刷盘 func (f *Flusher) RunAll() { f.mu.Lock() defer f.mu.Unlock() start := time.Now() log.Printf("[mysql-flusher] === starting flush ===") // 刷盘顺序:snippet → site → index(按数据量从小到大) f.flushSnippets() f.flushSites() f.flushIndex() f.flushPriorityURLs() log.Printf("[mysql-flusher] === flush done (took %v) ===", time.Since(start)) } // flushSnippets 将 Redis gate:* 数据刷到 url_snippets 表 func (f *Flusher) flushSnippets() { ctx := context.Background() var cursor uint64 total := 0 for { keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "gate:*", int64(f.batchSize)).Result() if err != nil { log.Printf("[mysql-flusher][snippets] scan error: %v", err) return } if len(keys) > 0 { f.batchUpsertSnippets(ctx, keys) total += len(keys) } cursor = nextCursor if cursor == 0 { break } } if total > 0 { log.Printf("[mysql-flusher][snippets] flushed %d entries", total) } } // batchUpsertSnippets 批量 upsert url_snippets func (f *Flusher) batchUpsertSnippets(ctx context.Context, keys []string) { if len(keys) == 0 || DB == nil { return } query := `INSERT INTO url_snippets (url, url_hash, title, description, text, timestamp, content_hash) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE title = VALUES(title), description = VALUES(description), text = VALUES(text), timestamp = VALUES(timestamp), content_hash = VALUES(content_hash)` tx, err := DB.BeginTx(ctx, nil) if err != nil { log.Printf("[mysql-flusher][snippets] begin tx error: %v", err) return } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, query) if err != nil { log.Printf("[mysql-flusher][snippets] prepare error: %v", err) return } defer stmt.Close() for _, key := range keys { data, err := f.redisDB.HGetAll(ctx, key).Result() if err != nil || len(data) == 0 { continue } url := data["url"] urlHash := data["url_hash"] if urlHash == "" { // 从 key 中提取 hash(key 格式:gate:) urlHash = strings.TrimPrefix(key, "gate:") } title := data["title"] description := data["desc"] text := data["text"] ts := parseInt64(data["ts"]) contentHash := data["hash"] _, err = stmt.ExecContext(ctx, url, urlHash, title, description, text, ts, contentHash) if err != nil { log.Printf("[mysql-flusher][snippets] exec error for %s: %v", url, err) } } if err := tx.Commit(); err != nil { log.Printf("[mysql-flusher][snippets] commit error: %v", err) } } // flushSites 将 Redis site:* 数据刷到 site_info 表 func (f *Flusher) flushSites() { ctx := context.Background() var cursor uint64 total := 0 for { keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "site:*", int64(f.batchSize)).Result() if err != nil { log.Printf("[mysql-flusher][sites] scan error: %v", err) return } if len(keys) > 0 { f.batchUpsertSites(ctx, keys) total += len(keys) } cursor = nextCursor if cursor == 0 { break } } if total > 0 { log.Printf("[mysql-flusher][sites] flushed %d entries", total) } } // batchUpsertSites 批量 upsert site_info func (f *Flusher) batchUpsertSites(ctx context.Context, keys []string) { if len(keys) == 0 || DB == nil { return } query := `INSERT INTO site_info (host, visit_count, last_visit_time, fingerprint, success_rate, html_structure, ips, quality, https_available, keywords, out_links, languages, redirects, server_types) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE visit_count = VALUES(visit_count), last_visit_time = VALUES(last_visit_time), fingerprint = VALUES(fingerprint), success_rate = VALUES(success_rate), html_structure = VALUES(html_structure), ips = VALUES(ips), quality = VALUES(quality), https_available = VALUES(https_available), keywords = VALUES(keywords), out_links = VALUES(out_links), languages = VALUES(languages), redirects = VALUES(redirects), server_types = VALUES(server_types)` tx, err := DB.BeginTx(ctx, nil) if err != nil { log.Printf("[mysql-flusher][sites] begin tx error: %v", err) return } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, query) if err != nil { log.Printf("[mysql-flusher][sites] prepare error: %v", err) return } defer stmt.Close() for _, key := range keys { data, err := f.redisDB.HGetAll(ctx, key).Result() if err != nil || len(data) == 0 { continue } host := strings.TrimPrefix(key, "site:") visitCount := int(parseInt64(data["visit_count"])) lastVisitTime := parseInt64(data["last_visit_time"]) fingerprint := data["fingerprint"] htmlStructure := data["html_structure"] var successRate *float64 if v := data["success_rate"]; v != "" { f := parseFloat(v) successRate = &f } var quality *float64 if v := data["quality"]; v != "" { q := parseFloat(v) quality = &q } var httpsAvailable *int8 if v := data["https_available"]; v != "" { i := int8(parseInt64(v)) httpsAvailable = &i } // JSON 字段:空字符串转为 NULL 或 "[]" // MySQL JSON 类型不接受空字符串 ips := data["ips"] if ips == "" { ips = "[]" } keywords := data["keywords"] if keywords == "" { keywords = "[]" } outLinks := data["out_links"] if outLinks == "" { outLinks = "[]" } languages := data["languages"] if languages == "" { languages = "[]" } redirects := data["redirects"] if redirects == "" { redirects = "[]" } serverTypes := data["server_types"] if serverTypes == "" { serverTypes = "[]" } _, err = stmt.ExecContext(ctx, host, visitCount, lastVisitTime, fingerprint, successRate, htmlStructure, ips, quality, httpsAvailable, keywords, outLinks, languages, redirects, serverTypes) if err != nil { log.Printf("[mysql-flusher][sites] exec error for %s: %v", host, err) } } if err := tx.Commit(); err != nil { log.Printf("[mysql-flusher][sites] commit error: %v", err) } } // flushIndex 将 Redis idx:* 数据刷到 index_entries 表 func (f *Flusher) flushIndex() { ctx := context.Background() var cursor uint64 total := 0 for { keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "idx:*", int64(f.batchSize)).Result() if err != nil { log.Printf("[mysql-flusher][index] scan error: %v", err) return } if len(keys) > 0 { f.batchUpsertIndex(ctx, keys) total += len(keys) } cursor = nextCursor if cursor == 0 { break } } if total > 0 { log.Printf("[mysql-flusher][index] flushed %d keywords", total) } } // batchUpsertIndex 批量 upsert index_entries func (f *Flusher) batchUpsertIndex(ctx context.Context, keys []string) { if len(keys) == 0 || DB == nil { return } tx, err := DB.BeginTx(ctx, nil) if err != nil { log.Printf("[mysql-flusher][index] begin tx error: %v", err) return } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, `INSERT INTO index_entries (keyword, url, weight) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE weight = VALUES(weight)`) if err != nil { log.Printf("[mysql-flusher][index] prepare error: %v", err) return } defer stmt.Close() for _, key := range keys { keyword := strings.TrimPrefix(key, "idx:") // 获取有序集合中的所有成员 entries, err := f.redisDB.ZRevRangeWithScores(ctx, key, 0, -1).Result() if err != nil { log.Printf("[mysql-flusher][index] zrange error for %s: %v", keyword, err) continue } for _, entry := range entries { url := entry.Member.(string) weight := float32(entry.Score) _, err = stmt.ExecContext(ctx, keyword, url, weight) if err != nil { log.Printf("[mysql-flusher][index] exec error for %s/%s: %v", keyword, url, err) } } } if err := tx.Commit(); err != nil { log.Printf("[mysql-flusher][index] commit error: %v", err) } } // flushPriorityURLs 将 Redis priority:* 数据刷到 priority_urls 表 func (f *Flusher) flushPriorityURLs() { ctx := context.Background() keys, err := f.redisDB.Keys(ctx, "priority:*").Result() if err != nil { log.Printf("[mysql-flusher][priority] keys error: %v", err) return } if len(keys) == 0 || DB == nil { return } tx, err := DB.BeginTx(ctx, nil) if err != nil { log.Printf("[mysql-flusher][priority] begin tx error: %v", err) return } defer tx.Rollback() stmt, err := tx.PrepareContext(ctx, `INSERT IGNORE INTO priority_urls (url) VALUES (?)`) if err != nil { log.Printf("[mysql-flusher][priority] prepare error: %v", err) return } defer stmt.Close() for _, key := range keys { url := strings.TrimPrefix(key, "priority:") _, err = stmt.ExecContext(ctx, url) if err != nil { log.Printf("[mysql-flusher][priority] exec error for %s: %v", url, err) } } if err := tx.Commit(); err != nil { log.Printf("[mysql-flusher][priority] commit error: %v", err) return } log.Printf("[mysql-flusher][priority] flushed %d entries", len(keys)) } // ============================================ // 辅助函数 // ============================================ func parseInt64(s string) int64 { var v int64 fmt.Sscanf(s, "%d", &v) return v } func parseFloat(s string) float64 { var v float64 fmt.Sscanf(s, "%f", &v) return v }