- 支持 TCP / WebSocket 监听,配置热重载,systemd 集成 - meshAuthHook 实现用户名/密码认证与 ACL - meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等) - meshDBHook 将 msh/# 主题 payload 异步写入数据库 - 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置 - payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip - 自动补充 config.yaml 缺失字段(文件存在时写回) - .gitignore 屏蔽 data/ 和 .workbuddy/
199 lines
5.2 KiB
Go
199 lines
5.2 KiB
Go
package database
|
||
|
||
import (
|
||
"fmt"
|
||
"log"
|
||
"meshgo/config"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"gorm.io/driver/mysql"
|
||
"gorm.io/driver/sqlite"
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/logger"
|
||
)
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// PayloadLog — payload 日志表(仅记录 msh/# 主题)
|
||
// ---------------------------------------------------------------------------
|
||
|
||
type PayloadLog struct {
|
||
ID uint64 `gorm:"primaryKey;autoIncrement"` // 自增 ID
|
||
Client string `gorm:"type:varchar(255);index"` // 客户端 ID
|
||
Topic string `gorm:"type:varchar(512);index"` // 完整主题
|
||
Qos byte // QoS 等级
|
||
Payload []byte // 原始 payload
|
||
CreatedAt int64 `gorm:"index"` // 发送时间(Unix 秒)
|
||
SenderIP string `gorm:"type:varchar(64)"` // 发送者 IP
|
||
}
|
||
|
||
// TableName 指定表名
|
||
func (PayloadLog) TableName() string { return "payload_log" }
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// DB 全局单例
|
||
// ---------------------------------------------------------------------------
|
||
|
||
var (
|
||
db *gorm.DB
|
||
dbMu sync.RWMutex
|
||
)
|
||
|
||
// Get 返回当前数据库实例(只读)
|
||
func Get() *gorm.DB {
|
||
dbMu.RLock()
|
||
defer dbMu.RUnlock()
|
||
return db
|
||
}
|
||
|
||
// Set 设置数据库实例(仅供内部和测试使用)
|
||
func Set(d *gorm.DB) {
|
||
dbMu.Lock()
|
||
db = d
|
||
dbMu.Unlock()
|
||
}
|
||
|
||
// Init 根据配置初始化数据库连接,完成自动迁移
|
||
// dbType: "sqlite3" | "mysql"
|
||
func Init(cfg *config.DatabaseConfig, dataDir string) error {
|
||
if !cfg.Enabled {
|
||
log.Println("[db] 数据库未启用,跳过初始化")
|
||
return nil
|
||
}
|
||
|
||
var (
|
||
db *gorm.DB
|
||
err error
|
||
)
|
||
|
||
switch strings.ToLower(cfg.Type) {
|
||
case "mysql":
|
||
db, err = initMySQL(cfg.DSN)
|
||
case "sqlite3", "":
|
||
db, err = initSQLite(cfg, dataDir)
|
||
default:
|
||
return fmt.Errorf("[db] 不支持的数据库类型: %s(支持: sqlite3, mysql)", cfg.Type)
|
||
}
|
||
if err != nil {
|
||
return fmt.Errorf("[db] 初始化失败: %w", err)
|
||
}
|
||
|
||
// 自动迁移表结构(仅新增列,不会删列)
|
||
if err = db.AutoMigrate(&PayloadLog{}); err != nil {
|
||
return fmt.Errorf("[db] 表迁移失败: %w", err)
|
||
}
|
||
|
||
Set(db)
|
||
log.Printf("[db] 已连接 %s", cfg.Type)
|
||
return nil
|
||
}
|
||
|
||
// initSQLite 构建 SQLite 连接,dbFile 相对于 dataDir
|
||
func initSQLite(cfg *config.DatabaseConfig, dataDir string) (*gorm.DB, error) {
|
||
path := filepath.Join(dataDir, cfg.File)
|
||
// 确保父目录存在
|
||
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||
return nil, fmt.Errorf("创建数据库目录失败: %w", err)
|
||
}
|
||
log.Printf("[db] SQLite 数据库路径: %s", path)
|
||
|
||
// 默认 foreign_keys=ON;sqlite3 driver 支持 _loc=Local
|
||
dsn := fmt.Sprintf("%s?_foreign_keys=on&_loc=Local", path)
|
||
|
||
gormDB, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{
|
||
Logger: logger.Default.LogMode(logger.Silent),
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 复用底层 sql.DB 设置连接池
|
||
sqlDB, err := gormDB.DB()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
sqlDB.SetMaxOpenConns(1) // SQLite 建议单连接
|
||
sqlDB.SetMaxIdleConns(1)
|
||
sqlDB.SetConnMaxLifetime(time.Hour)
|
||
return gormDB, nil
|
||
}
|
||
|
||
// initMySQL 构建 MySQL 连接
|
||
func initMySQL(dsn string) (*gorm.DB, error) {
|
||
if dsn == "" {
|
||
return nil, fmt.Errorf("MySQL DSN 未配置(请填写 database.dsn 字段)")
|
||
}
|
||
gormDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
|
||
Logger: logger.Default.LogMode(logger.Silent),
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
sqlDB, err := gormDB.DB()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
sqlDB.SetMaxOpenConns(25)
|
||
sqlDB.SetMaxIdleConns(5)
|
||
sqlDB.SetConnMaxLifetime(5 * time.Minute)
|
||
return gormDB, nil
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Close 关闭数据库连接
|
||
// ---------------------------------------------------------------------------
|
||
|
||
func Close() error {
|
||
gormDB := Get()
|
||
if gormDB == nil {
|
||
return nil
|
||
}
|
||
sqlDB, err := gormDB.DB()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return sqlDB.Close()
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// WritePayloadLog 异步写入 payload 日志(丢到 channel 不阻塞主流程)
|
||
// ---------------------------------------------------------------------------
|
||
|
||
var WriteCh = make(chan *PayloadLog, 1000) // 有缓冲 channel,导出供 main.go 关闭
|
||
|
||
// StartWriter 启动异步写入 worker
|
||
func StartWriter() {
|
||
go func() {
|
||
for entry := range WriteCh {
|
||
if err := insertPayloadLog(entry); err != nil {
|
||
log.Printf("[db] 写入 payload_log 失败: %v", err)
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Insert 将待写入对象推入队列(非阻塞)
|
||
// 数据库未初始化时静默跳过(由 main.go 保证 Serve 之前已 Init,此为安全兜底)
|
||
func Insert(entry *PayloadLog) {
|
||
// 安全兜底:若 WriteCh 未初始化(极端情况),直接丢弃
|
||
if WriteCh == nil {
|
||
return
|
||
}
|
||
select {
|
||
case WriteCh <- entry:
|
||
default:
|
||
log.Printf("[db] 写入队列已满,丢弃日志: topic=%s", entry.Topic)
|
||
}
|
||
}
|
||
|
||
func insertPayloadLog(entry *PayloadLog) error {
|
||
gormDB := Get()
|
||
if gormDB == nil {
|
||
return nil // Init 尚未完成,静默跳过
|
||
}
|
||
return gormDB.Create(entry).Error
|
||
}
|