Files
kevin bf41e82a43 feat: 初始化 meshgo MQTT 服务
- 支持 TCP / WebSocket 监听,配置热重载,systemd 集成
- meshAuthHook 实现用户名/密码认证与 ACL
- meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等)
- meshDBHook 将 msh/# 主题 payload 异步写入数据库
- 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置
- payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip
- 自动补充 config.yaml 缺失字段(文件存在时写回)
- .gitignore 屏蔽 data/ 和 .workbuddy/
2026-05-15 18:09:39 +08:00

199 lines
5.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package database
import (
"fmt"
"log"
"meshgo/config"
"os"
"path/filepath"
"strings"
"sync"
"time"
"gorm.io/driver/mysql"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
// ---------------------------------------------------------------------------
// PayloadLog — payload 日志表(仅记录 msh/# 主题)
// ---------------------------------------------------------------------------
type PayloadLog struct {
ID uint64 `gorm:"primaryKey;autoIncrement"` // 自增 ID
Client string `gorm:"type:varchar(255);index"` // 客户端 ID
Topic string `gorm:"type:varchar(512);index"` // 完整主题
Qos byte // QoS 等级
Payload []byte // 原始 payload
CreatedAt int64 `gorm:"index"` // 发送时间(Unix 秒)
SenderIP string `gorm:"type:varchar(64)"` // 发送者 IP
}
// TableName 指定表名
func (PayloadLog) TableName() string { return "payload_log" }
// ---------------------------------------------------------------------------
// DB 全局单例
// ---------------------------------------------------------------------------
var (
db *gorm.DB
dbMu sync.RWMutex
)
// Get 返回当前数据库实例(只读)
func Get() *gorm.DB {
dbMu.RLock()
defer dbMu.RUnlock()
return db
}
// Set 设置数据库实例(仅供内部和测试使用)
func Set(d *gorm.DB) {
dbMu.Lock()
db = d
dbMu.Unlock()
}
// Init 根据配置初始化数据库连接,完成自动迁移
// dbType: "sqlite3" | "mysql"
func Init(cfg *config.DatabaseConfig, dataDir string) error {
if !cfg.Enabled {
log.Println("[db] 数据库未启用,跳过初始化")
return nil
}
var (
db *gorm.DB
err error
)
switch strings.ToLower(cfg.Type) {
case "mysql":
db, err = initMySQL(cfg.DSN)
case "sqlite3", "":
db, err = initSQLite(cfg, dataDir)
default:
return fmt.Errorf("[db] 不支持的数据库类型: %s(支持: sqlite3, mysql", cfg.Type)
}
if err != nil {
return fmt.Errorf("[db] 初始化失败: %w", err)
}
// 自动迁移表结构(仅新增列,不会删列)
if err = db.AutoMigrate(&PayloadLog{}); err != nil {
return fmt.Errorf("[db] 表迁移失败: %w", err)
}
Set(db)
log.Printf("[db] 已连接 %s", cfg.Type)
return nil
}
// initSQLite 构建 SQLite 连接,dbFile 相对于 dataDir
func initSQLite(cfg *config.DatabaseConfig, dataDir string) (*gorm.DB, error) {
path := filepath.Join(dataDir, cfg.File)
// 确保父目录存在
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, fmt.Errorf("创建数据库目录失败: %w", err)
}
log.Printf("[db] SQLite 数据库路径: %s", path)
// 默认 foreign_keys=ONsqlite3 driver 支持 _loc=Local
dsn := fmt.Sprintf("%s?_foreign_keys=on&_loc=Local", path)
gormDB, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
// 复用底层 sql.DB 设置连接池
sqlDB, err := gormDB.DB()
if err != nil {
return nil, err
}
sqlDB.SetMaxOpenConns(1) // SQLite 建议单连接
sqlDB.SetMaxIdleConns(1)
sqlDB.SetConnMaxLifetime(time.Hour)
return gormDB, nil
}
// initMySQL 构建 MySQL 连接
func initMySQL(dsn string) (*gorm.DB, error) {
if dsn == "" {
return nil, fmt.Errorf("MySQL DSN 未配置(请填写 database.dsn 字段)")
}
gormDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
sqlDB, err := gormDB.DB()
if err != nil {
return nil, err
}
sqlDB.SetMaxOpenConns(25)
sqlDB.SetMaxIdleConns(5)
sqlDB.SetConnMaxLifetime(5 * time.Minute)
return gormDB, nil
}
// ---------------------------------------------------------------------------
// Close 关闭数据库连接
// ---------------------------------------------------------------------------
func Close() error {
gormDB := Get()
if gormDB == nil {
return nil
}
sqlDB, err := gormDB.DB()
if err != nil {
return err
}
return sqlDB.Close()
}
// ---------------------------------------------------------------------------
// WritePayloadLog 异步写入 payload 日志(丢到 channel 不阻塞主流程)
// ---------------------------------------------------------------------------
var WriteCh = make(chan *PayloadLog, 1000) // 有缓冲 channel,导出供 main.go 关闭
// StartWriter 启动异步写入 worker
func StartWriter() {
go func() {
for entry := range WriteCh {
if err := insertPayloadLog(entry); err != nil {
log.Printf("[db] 写入 payload_log 失败: %v", err)
}
}
}()
}
// Insert 将待写入对象推入队列(非阻塞)
// 数据库未初始化时静默跳过(由 main.go 保证 Serve 之前已 Init,此为安全兜底)
func Insert(entry *PayloadLog) {
// 安全兜底:若 WriteCh 未初始化(极端情况),直接丢弃
if WriteCh == nil {
return
}
select {
case WriteCh <- entry:
default:
log.Printf("[db] 写入队列已满,丢弃日志: topic=%s", entry.Topic)
}
}
func insertPayloadLog(entry *PayloadLog) error {
gormDB := Get()
if gormDB == nil {
return nil // Init 尚未完成,静默跳过
}
return gormDB.Create(entry).Error
}