446 lines
10 KiB
Go
446 lines
10 KiB
Go
package mysql
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
goredis "github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
// Flusher 管理 Redis → MySQL 刷盘任务
|
||
type Flusher struct {
|
||
redisDB *goredis.Client // Redis 客户端引用
|
||
interval time.Duration // 刷盘间隔
|
||
batchSize int // 每批次处理数量
|
||
mu sync.Mutex // 防止并发刷盘
|
||
stopCh chan struct{}
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
// NewFlusher 创建刷盘器
|
||
func NewFlusher(redisDB *goredis.Client, interval time.Duration, batchSize int) *Flusher {
|
||
if interval <= 0 {
|
||
interval = 5 * time.Minute
|
||
}
|
||
if batchSize <= 0 {
|
||
batchSize = 1000
|
||
}
|
||
return &Flusher{
|
||
redisDB: redisDB,
|
||
interval: interval,
|
||
batchSize: batchSize,
|
||
stopCh: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// Start 启动后台刷盘任务
|
||
func (f *Flusher) Start() {
|
||
f.wg.Add(1)
|
||
go func() {
|
||
defer f.wg.Done()
|
||
ticker := time.NewTicker(f.interval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
f.RunAll()
|
||
case <-f.stopCh:
|
||
log.Printf("[mysql-flusher] stopped")
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
log.Printf("[mysql-flusher] started (interval=%v, batchSize=%d)", f.interval, f.batchSize)
|
||
}
|
||
|
||
// Stop 停止刷盘任务
|
||
func (f *Flusher) Stop() {
|
||
close(f.stopCh)
|
||
f.wg.Wait()
|
||
}
|
||
|
||
// RunAll 执行所有类型的刷盘
|
||
func (f *Flusher) RunAll() {
|
||
f.mu.Lock()
|
||
defer f.mu.Unlock()
|
||
|
||
start := time.Now()
|
||
log.Printf("[mysql-flusher] === starting flush ===")
|
||
|
||
// 刷盘顺序:snippet → site → index(按数据量从小到大)
|
||
f.flushSnippets()
|
||
f.flushSites()
|
||
f.flushIndex()
|
||
f.flushPriorityURLs()
|
||
|
||
log.Printf("[mysql-flusher] === flush done (took %v) ===", time.Since(start))
|
||
}
|
||
|
||
// flushSnippets 将 Redis gate:* 数据刷到 url_snippets 表
|
||
func (f *Flusher) flushSnippets() {
|
||
ctx := context.Background()
|
||
var cursor uint64
|
||
total := 0
|
||
|
||
for {
|
||
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "gate:*", int64(f.batchSize)).Result()
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][snippets] scan error: %v", err)
|
||
return
|
||
}
|
||
|
||
if len(keys) > 0 {
|
||
f.batchUpsertSnippets(ctx, keys)
|
||
total += len(keys)
|
||
}
|
||
|
||
cursor = nextCursor
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
if total > 0 {
|
||
log.Printf("[mysql-flusher][snippets] flushed %d entries", total)
|
||
}
|
||
}
|
||
|
||
// batchUpsertSnippets 批量 upsert url_snippets
|
||
func (f *Flusher) batchUpsertSnippets(ctx context.Context, keys []string) {
|
||
if len(keys) == 0 || DB == nil {
|
||
return
|
||
}
|
||
|
||
query := `INSERT INTO url_snippets (url, url_hash, title, description, text, timestamp, content_hash)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
ON DUPLICATE KEY UPDATE
|
||
title = VALUES(title),
|
||
description = VALUES(description),
|
||
text = VALUES(text),
|
||
timestamp = VALUES(timestamp),
|
||
content_hash = VALUES(content_hash)`
|
||
|
||
tx, err := DB.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][snippets] begin tx error: %v", err)
|
||
return
|
||
}
|
||
defer tx.Rollback()
|
||
|
||
stmt, err := tx.PrepareContext(ctx, query)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][snippets] prepare error: %v", err)
|
||
return
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, key := range keys {
|
||
data, err := f.redisDB.HGetAll(ctx, key).Result()
|
||
if err != nil || len(data) == 0 {
|
||
continue
|
||
}
|
||
|
||
url := data["url"]
|
||
urlHash := data["url_hash"]
|
||
if urlHash == "" {
|
||
// 从 key 中提取 hash(key 格式:gate:<hash>)
|
||
urlHash = strings.TrimPrefix(key, "gate:")
|
||
}
|
||
|
||
title := data["title"]
|
||
description := data["desc"]
|
||
text := data["text"]
|
||
ts := parseInt64(data["ts"])
|
||
contentHash := data["hash"]
|
||
|
||
_, err = stmt.ExecContext(ctx, url, urlHash, title, description, text, ts, contentHash)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][snippets] exec error for %s: %v", url, err)
|
||
}
|
||
}
|
||
|
||
if err := tx.Commit(); err != nil {
|
||
log.Printf("[mysql-flusher][snippets] commit error: %v", err)
|
||
}
|
||
}
|
||
|
||
// flushSites 将 Redis site:* 数据刷到 site_info 表
|
||
func (f *Flusher) flushSites() {
|
||
ctx := context.Background()
|
||
var cursor uint64
|
||
total := 0
|
||
|
||
for {
|
||
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "site:*", int64(f.batchSize)).Result()
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][sites] scan error: %v", err)
|
||
return
|
||
}
|
||
|
||
if len(keys) > 0 {
|
||
f.batchUpsertSites(ctx, keys)
|
||
total += len(keys)
|
||
}
|
||
|
||
cursor = nextCursor
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
if total > 0 {
|
||
log.Printf("[mysql-flusher][sites] flushed %d entries", total)
|
||
}
|
||
}
|
||
|
||
// batchUpsertSites 批量 upsert site_info
|
||
func (f *Flusher) batchUpsertSites(ctx context.Context, keys []string) {
|
||
if len(keys) == 0 || DB == nil {
|
||
return
|
||
}
|
||
|
||
query := `INSERT INTO site_info (host, visit_count, last_visit_time, fingerprint, success_rate,
|
||
html_structure, ips, quality, https_available, keywords, out_links,
|
||
languages, redirects, server_types)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
ON DUPLICATE KEY UPDATE
|
||
visit_count = VALUES(visit_count),
|
||
last_visit_time = VALUES(last_visit_time),
|
||
fingerprint = VALUES(fingerprint),
|
||
success_rate = VALUES(success_rate),
|
||
html_structure = VALUES(html_structure),
|
||
ips = VALUES(ips),
|
||
quality = VALUES(quality),
|
||
https_available = VALUES(https_available),
|
||
keywords = VALUES(keywords),
|
||
out_links = VALUES(out_links),
|
||
languages = VALUES(languages),
|
||
redirects = VALUES(redirects),
|
||
server_types = VALUES(server_types)`
|
||
|
||
tx, err := DB.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][sites] begin tx error: %v", err)
|
||
return
|
||
}
|
||
defer tx.Rollback()
|
||
|
||
stmt, err := tx.PrepareContext(ctx, query)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][sites] prepare error: %v", err)
|
||
return
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, key := range keys {
|
||
data, err := f.redisDB.HGetAll(ctx, key).Result()
|
||
if err != nil || len(data) == 0 {
|
||
continue
|
||
}
|
||
|
||
host := strings.TrimPrefix(key, "site:")
|
||
|
||
visitCount := int(parseInt64(data["visit_count"]))
|
||
lastVisitTime := parseInt64(data["last_visit_time"])
|
||
fingerprint := data["fingerprint"]
|
||
htmlStructure := data["html_structure"]
|
||
|
||
var successRate *float64
|
||
if v := data["success_rate"]; v != "" {
|
||
f := parseFloat(v)
|
||
successRate = &f
|
||
}
|
||
|
||
var quality *float64
|
||
if v := data["quality"]; v != "" {
|
||
q := parseFloat(v)
|
||
quality = &q
|
||
}
|
||
|
||
var httpsAvailable *int8
|
||
if v := data["https_available"]; v != "" {
|
||
i := int8(parseInt64(v))
|
||
httpsAvailable = &i
|
||
}
|
||
|
||
// JSON 字段:空字符串转为 NULL 或 "[]"
|
||
// MySQL JSON 类型不接受空字符串
|
||
ips := data["ips"]
|
||
if ips == "" {
|
||
ips = "[]"
|
||
}
|
||
keywords := data["keywords"]
|
||
if keywords == "" {
|
||
keywords = "[]"
|
||
}
|
||
outLinks := data["out_links"]
|
||
if outLinks == "" {
|
||
outLinks = "[]"
|
||
}
|
||
languages := data["languages"]
|
||
if languages == "" {
|
||
languages = "[]"
|
||
}
|
||
redirects := data["redirects"]
|
||
if redirects == "" {
|
||
redirects = "[]"
|
||
}
|
||
serverTypes := data["server_types"]
|
||
if serverTypes == "" {
|
||
serverTypes = "[]"
|
||
}
|
||
|
||
_, err = stmt.ExecContext(ctx, host, visitCount, lastVisitTime, fingerprint, successRate,
|
||
htmlStructure, ips, quality, httpsAvailable, keywords, outLinks,
|
||
languages, redirects, serverTypes)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][sites] exec error for %s: %v", host, err)
|
||
}
|
||
}
|
||
|
||
if err := tx.Commit(); err != nil {
|
||
log.Printf("[mysql-flusher][sites] commit error: %v", err)
|
||
}
|
||
}
|
||
|
||
// flushIndex 将 Redis idx:* 数据刷到 index_entries 表
|
||
func (f *Flusher) flushIndex() {
|
||
ctx := context.Background()
|
||
var cursor uint64
|
||
total := 0
|
||
|
||
for {
|
||
keys, nextCursor, err := f.redisDB.Scan(ctx, cursor, "idx:*", int64(f.batchSize)).Result()
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][index] scan error: %v", err)
|
||
return
|
||
}
|
||
|
||
if len(keys) > 0 {
|
||
f.batchUpsertIndex(ctx, keys)
|
||
total += len(keys)
|
||
}
|
||
|
||
cursor = nextCursor
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
if total > 0 {
|
||
log.Printf("[mysql-flusher][index] flushed %d keywords", total)
|
||
}
|
||
}
|
||
|
||
// batchUpsertIndex 批量 upsert index_entries
|
||
func (f *Flusher) batchUpsertIndex(ctx context.Context, keys []string) {
|
||
if len(keys) == 0 || DB == nil {
|
||
return
|
||
}
|
||
|
||
tx, err := DB.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][index] begin tx error: %v", err)
|
||
return
|
||
}
|
||
defer tx.Rollback()
|
||
|
||
stmt, err := tx.PrepareContext(ctx, `INSERT INTO index_entries (keyword, url, weight)
|
||
VALUES (?, ?, ?)
|
||
ON DUPLICATE KEY UPDATE weight = VALUES(weight)`)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][index] prepare error: %v", err)
|
||
return
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, key := range keys {
|
||
keyword := strings.TrimPrefix(key, "idx:")
|
||
|
||
// 获取有序集合中的所有成员
|
||
entries, err := f.redisDB.ZRevRangeWithScores(ctx, key, 0, -1).Result()
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][index] zrange error for %s: %v", keyword, err)
|
||
continue
|
||
}
|
||
|
||
for _, entry := range entries {
|
||
url := entry.Member.(string)
|
||
weight := float32(entry.Score)
|
||
_, err = stmt.ExecContext(ctx, keyword, url, weight)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][index] exec error for %s/%s: %v", keyword, url, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := tx.Commit(); err != nil {
|
||
log.Printf("[mysql-flusher][index] commit error: %v", err)
|
||
}
|
||
}
|
||
|
||
// flushPriorityURLs 将 Redis priority:* 数据刷到 priority_urls 表
|
||
func (f *Flusher) flushPriorityURLs() {
|
||
ctx := context.Background()
|
||
|
||
keys, err := f.redisDB.Keys(ctx, "priority:*").Result()
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][priority] keys error: %v", err)
|
||
return
|
||
}
|
||
|
||
if len(keys) == 0 || DB == nil {
|
||
return
|
||
}
|
||
|
||
tx, err := DB.BeginTx(ctx, nil)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][priority] begin tx error: %v", err)
|
||
return
|
||
}
|
||
defer tx.Rollback()
|
||
|
||
stmt, err := tx.PrepareContext(ctx, `INSERT IGNORE INTO priority_urls (url) VALUES (?)`)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][priority] prepare error: %v", err)
|
||
return
|
||
}
|
||
defer stmt.Close()
|
||
|
||
for _, key := range keys {
|
||
url := strings.TrimPrefix(key, "priority:")
|
||
_, err = stmt.ExecContext(ctx, url)
|
||
if err != nil {
|
||
log.Printf("[mysql-flusher][priority] exec error for %s: %v", url, err)
|
||
}
|
||
}
|
||
|
||
if err := tx.Commit(); err != nil {
|
||
log.Printf("[mysql-flusher][priority] commit error: %v", err)
|
||
return
|
||
}
|
||
|
||
log.Printf("[mysql-flusher][priority] flushed %d entries", len(keys))
|
||
}
|
||
|
||
// ============================================
|
||
// 辅助函数
|
||
// ============================================
|
||
|
||
func parseInt64(s string) int64 {
|
||
var v int64
|
||
fmt.Sscanf(s, "%d", &v)
|
||
return v
|
||
}
|
||
|
||
func parseFloat(s string) float64 {
|
||
var v float64
|
||
fmt.Sscanf(s, "%f", &v)
|
||
return v
|
||
}
|