Files
2026-04-20 18:26:54 +08:00

446 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package mysql
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
goredis "github.com/redis/go-redis/v9"
)
// Flusher 管理 Redis → MySQL 刷盘任务
type Flusher struct {
redisDB *goredis.Client // Redis 客户端引用
interval time.Duration // 刷盘间隔
batchSize int // 每批次处理数量
mu sync.Mutex // 防止并发刷盘
stopCh chan struct{}
wg sync.WaitGroup
}
// NewFlusher 创建刷盘器
func NewFlusher(redisDB *goredis.Client, interval time.Duration, batchSize int) *Flusher {
if interval <= 0 {
interval = 5 * time.Minute
}
if batchSize <= 0 {
batchSize = 1000
}
return &Flusher{
redisDB: redisDB,
interval: interval,
batchSize: batchSize,
stopCh: make(chan struct{}),
}
}
// Start 启动后台刷盘任务
func (f *Flusher) Start() {
f.wg.Add(1)
go func() {
defer f.wg.Done()
ticker := time.NewTicker(f.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.RunAll()
case <-f.stopCh:
log.Printf("[mysql-flusher] stopped")
return
}
}
}()
log.Printf("[mysql-flusher] started (interval=%v, batchSize=%d)", f.interval, f.batchSize)
}
// Stop 停止刷盘任务
func (f *Flusher) Stop() {
close(f.stopCh)
f.wg.Wait()
}
// RunAll 执行所有类型的刷盘
func (f *Flusher) RunAll() {
f.mu.Lock()
defer f.mu.Unlock()
start := time.Now()
log.Printf("[mysql-flusher] === starting flush ===")
// 刷盘顺序:snippet → site → index(按数据量从小到大)
f.flushSnippets()
f.flushSites()
f.flushIndex()
f.flushPriorityURLs()
log.Printf("[mysql-flusher] === flush done (took %v) ===", time.Since(start))
}
// flushSnippets 将 Redis gate:* 数据刷到 url_snippets 表
func (f *Flusher) flushSnippets() {
ctx := context.Background()
var cursor uint64
total := 0
for {
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "gate:*", int64(f.batchSize)).Result()
if err != nil {
log.Printf("[mysql-flusher][snippets] scan error: %v", err)
return
}
if len(keys) > 0 {
f.batchUpsertSnippets(ctx, keys)
total += len(keys)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
if total > 0 {
log.Printf("[mysql-flusher][snippets] flushed %d entries", total)
}
}
// batchUpsertSnippets 批量 upsert url_snippets
func (f *Flusher) batchUpsertSnippets(ctx context.Context, keys []string) {
if len(keys) == 0 || DB == nil {
return
}
query := `INSERT INTO url_snippets (url, url_hash, title, description, text, timestamp, content_hash)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
description = VALUES(description),
text = VALUES(text),
timestamp = VALUES(timestamp),
content_hash = VALUES(content_hash)`
tx, err := DB.BeginTx(ctx, nil)
if err != nil {
log.Printf("[mysql-flusher][snippets] begin tx error: %v", err)
return
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
log.Printf("[mysql-flusher][snippets] prepare error: %v", err)
return
}
defer stmt.Close()
for _, key := range keys {
data, err := f.redisDB.HGetAll(ctx, key).Result()
if err != nil || len(data) == 0 {
continue
}
url := data["url"]
urlHash := data["url_hash"]
if urlHash == "" {
// 从 key 中提取 hashkey 格式:gate:<hash>
urlHash = strings.TrimPrefix(key, "gate:")
}
title := data["title"]
description := data["desc"]
text := data["text"]
ts := parseInt64(data["ts"])
contentHash := data["hash"]
_, err = stmt.ExecContext(ctx, url, urlHash, title, description, text, ts, contentHash)
if err != nil {
log.Printf("[mysql-flusher][snippets] exec error for %s: %v", url, err)
}
}
if err := tx.Commit(); err != nil {
log.Printf("[mysql-flusher][snippets] commit error: %v", err)
}
}
// flushSites 将 Redis site:* 数据刷到 site_info 表
func (f *Flusher) flushSites() {
ctx := context.Background()
var cursor uint64
total := 0
for {
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "site:*", int64(f.batchSize)).Result()
if err != nil {
log.Printf("[mysql-flusher][sites] scan error: %v", err)
return
}
if len(keys) > 0 {
f.batchUpsertSites(ctx, keys)
total += len(keys)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
if total > 0 {
log.Printf("[mysql-flusher][sites] flushed %d entries", total)
}
}
// batchUpsertSites 批量 upsert site_info
func (f *Flusher) batchUpsertSites(ctx context.Context, keys []string) {
if len(keys) == 0 || DB == nil {
return
}
query := `INSERT INTO site_info (host, visit_count, last_visit_time, fingerprint, success_rate,
html_structure, ips, quality, https_available, keywords, out_links,
languages, redirects, server_types)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
visit_count = VALUES(visit_count),
last_visit_time = VALUES(last_visit_time),
fingerprint = VALUES(fingerprint),
success_rate = VALUES(success_rate),
html_structure = VALUES(html_structure),
ips = VALUES(ips),
quality = VALUES(quality),
https_available = VALUES(https_available),
keywords = VALUES(keywords),
out_links = VALUES(out_links),
languages = VALUES(languages),
redirects = VALUES(redirects),
server_types = VALUES(server_types)`
tx, err := DB.BeginTx(ctx, nil)
if err != nil {
log.Printf("[mysql-flusher][sites] begin tx error: %v", err)
return
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
log.Printf("[mysql-flusher][sites] prepare error: %v", err)
return
}
defer stmt.Close()
for _, key := range keys {
data, err := f.redisDB.HGetAll(ctx, key).Result()
if err != nil || len(data) == 0 {
continue
}
host := strings.TrimPrefix(key, "site:")
visitCount := int(parseInt64(data["visit_count"]))
lastVisitTime := parseInt64(data["last_visit_time"])
fingerprint := data["fingerprint"]
htmlStructure := data["html_structure"]
var successRate *float64
if v := data["success_rate"]; v != "" {
f := parseFloat(v)
successRate = &f
}
var quality *float64
if v := data["quality"]; v != "" {
q := parseFloat(v)
quality = &q
}
var httpsAvailable *int8
if v := data["https_available"]; v != "" {
i := int8(parseInt64(v))
httpsAvailable = &i
}
// JSON 字段:空字符串转为 NULL 或 "[]"
// MySQL JSON 类型不接受空字符串
ips := data["ips"]
if ips == "" {
ips = "[]"
}
keywords := data["keywords"]
if keywords == "" {
keywords = "[]"
}
outLinks := data["out_links"]
if outLinks == "" {
outLinks = "[]"
}
languages := data["languages"]
if languages == "" {
languages = "[]"
}
redirects := data["redirects"]
if redirects == "" {
redirects = "[]"
}
serverTypes := data["server_types"]
if serverTypes == "" {
serverTypes = "[]"
}
_, err = stmt.ExecContext(ctx, host, visitCount, lastVisitTime, fingerprint, successRate,
htmlStructure, ips, quality, httpsAvailable, keywords, outLinks,
languages, redirects, serverTypes)
if err != nil {
log.Printf("[mysql-flusher][sites] exec error for %s: %v", host, err)
}
}
if err := tx.Commit(); err != nil {
log.Printf("[mysql-flusher][sites] commit error: %v", err)
}
}
// flushIndex 将 Redis idx:* 数据刷到 index_entries 表
func (f *Flusher) flushIndex() {
ctx := context.Background()
var cursor uint64
total := 0
for {
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "idx:*", int64(f.batchSize)).Result()
if err != nil {
log.Printf("[mysql-flusher][index] scan error: %v", err)
return
}
if len(keys) > 0 {
f.batchUpsertIndex(ctx, keys)
total += len(keys)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
if total > 0 {
log.Printf("[mysql-flusher][index] flushed %d keywords", total)
}
}
// batchUpsertIndex 批量 upsert index_entries
func (f *Flusher) batchUpsertIndex(ctx context.Context, keys []string) {
if len(keys) == 0 || DB == nil {
return
}
tx, err := DB.BeginTx(ctx, nil)
if err != nil {
log.Printf("[mysql-flusher][index] begin tx error: %v", err)
return
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `INSERT INTO index_entries (keyword, url, weight)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE weight = VALUES(weight)`)
if err != nil {
log.Printf("[mysql-flusher][index] prepare error: %v", err)
return
}
defer stmt.Close()
for _, key := range keys {
keyword := strings.TrimPrefix(key, "idx:")
// 获取有序集合中的所有成员
entries, err := f.redisDB.ZRevRangeWithScores(ctx, key, 0, -1).Result()
if err != nil {
log.Printf("[mysql-flusher][index] zrange error for %s: %v", keyword, err)
continue
}
for _, entry := range entries {
url := entry.Member.(string)
weight := float32(entry.Score)
_, err = stmt.ExecContext(ctx, keyword, url, weight)
if err != nil {
log.Printf("[mysql-flusher][index] exec error for %s/%s: %v", keyword, url, err)
}
}
}
if err := tx.Commit(); err != nil {
log.Printf("[mysql-flusher][index] commit error: %v", err)
}
}
// flushPriorityURLs 将 Redis priority:* 数据刷到 priority_urls 表
func (f *Flusher) flushPriorityURLs() {
ctx := context.Background()
keys, err := f.redisDB.Keys(ctx, "priority:*").Result()
if err != nil {
log.Printf("[mysql-flusher][priority] keys error: %v", err)
return
}
if len(keys) == 0 || DB == nil {
return
}
tx, err := DB.BeginTx(ctx, nil)
if err != nil {
log.Printf("[mysql-flusher][priority] begin tx error: %v", err)
return
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `INSERT IGNORE INTO priority_urls (url) VALUES (?)`)
if err != nil {
log.Printf("[mysql-flusher][priority] prepare error: %v", err)
return
}
defer stmt.Close()
for _, key := range keys {
url := strings.TrimPrefix(key, "priority:")
_, err = stmt.ExecContext(ctx, url)
if err != nil {
log.Printf("[mysql-flusher][priority] exec error for %s: %v", url, err)
}
}
if err := tx.Commit(); err != nil {
log.Printf("[mysql-flusher][priority] commit error: %v", err)
return
}
log.Printf("[mysql-flusher][priority] flushed %d entries", len(keys))
}
// ============================================
// 辅助函数
// ============================================
func parseInt64(s string) int64 {
var v int64
fmt.Sscanf(s, "%d", &v)
return v
}
func parseFloat(s string) float64 {
var v float64
fmt.Sscanf(s, "%f", &v)
return v
}