package database import ( "fmt" "log" "meshgo/config" "os" "path/filepath" "strings" "sync" "time" "gorm.io/driver/mysql" "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/logger" ) // --------------------------------------------------------------------------- // PayloadLog — payload 日志表(仅记录 msh/# 主题) // --------------------------------------------------------------------------- type PayloadLog struct { ID uint64 `gorm:"primaryKey;autoIncrement"` // 自增 ID Client string `gorm:"type:varchar(255);index"` // 客户端 ID Topic string `gorm:"type:varchar(512);index"` // 完整主题 Qos byte // QoS 等级 Payload []byte // 原始 payload CreatedAt int64 `gorm:"index"` // 发送时间(Unix 秒) SenderIP string `gorm:"type:varchar(64)"` // 发送者 IP } // TableName 指定表名 func (PayloadLog) TableName() string { return "payload_log" } // --------------------------------------------------------------------------- // DB 全局单例 // --------------------------------------------------------------------------- var ( db *gorm.DB dbMu sync.RWMutex ) // Get 返回当前数据库实例(只读) func Get() *gorm.DB { dbMu.RLock() defer dbMu.RUnlock() return db } // Set 设置数据库实例(仅供内部和测试使用) func Set(d *gorm.DB) { dbMu.Lock() db = d dbMu.Unlock() } // Init 根据配置初始化数据库连接,完成自动迁移 // dbType: "sqlite3" | "mysql" func Init(cfg *config.DatabaseConfig, dataDir string) error { if !cfg.Enabled { log.Println("[db] 数据库未启用,跳过初始化") return nil } var ( db *gorm.DB err error ) switch strings.ToLower(cfg.Type) { case "mysql": db, err = initMySQL(cfg.DSN) case "sqlite3", "": db, err = initSQLite(cfg, dataDir) default: return fmt.Errorf("[db] 不支持的数据库类型: %s(支持: sqlite3, mysql)", cfg.Type) } if err != nil { return fmt.Errorf("[db] 初始化失败: %w", err) } // 自动迁移表结构(仅新增列,不会删列) if err = db.AutoMigrate(&PayloadLog{}); err != nil { return fmt.Errorf("[db] 表迁移失败: %w", err) } Set(db) log.Printf("[db] 已连接 %s", cfg.Type) return nil } // initSQLite 构建 SQLite 连接,dbFile 相对于 dataDir func initSQLite(cfg *config.DatabaseConfig, dataDir string) (*gorm.DB, error) { path := filepath.Join(dataDir, cfg.File) // 确保父目录存在 if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return nil, fmt.Errorf("创建数据库目录失败: %w", err) } log.Printf("[db] SQLite 数据库路径: %s", path) // 默认 foreign_keys=ON;sqlite3 driver 支持 _loc=Local dsn := fmt.Sprintf("%s?_foreign_keys=on&_loc=Local", path) gormDB, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { return nil, err } // 复用底层 sql.DB 设置连接池 sqlDB, err := gormDB.DB() if err != nil { return nil, err } sqlDB.SetMaxOpenConns(1) // SQLite 建议单连接 sqlDB.SetMaxIdleConns(1) sqlDB.SetConnMaxLifetime(time.Hour) return gormDB, nil } // initMySQL 构建 MySQL 连接 func initMySQL(dsn string) (*gorm.DB, error) { if dsn == "" { return nil, fmt.Errorf("MySQL DSN 未配置(请填写 database.dsn 字段)") } gormDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) if err != nil { return nil, err } sqlDB, err := gormDB.DB() if err != nil { return nil, err } sqlDB.SetMaxOpenConns(25) sqlDB.SetMaxIdleConns(5) sqlDB.SetConnMaxLifetime(5 * time.Minute) return gormDB, nil } // --------------------------------------------------------------------------- // Close 关闭数据库连接 // --------------------------------------------------------------------------- func Close() error { gormDB := Get() if gormDB == nil { return nil } sqlDB, err := gormDB.DB() if err != nil { return err } return sqlDB.Close() } // --------------------------------------------------------------------------- // WritePayloadLog 异步写入 payload 日志(丢到 channel 不阻塞主流程) // --------------------------------------------------------------------------- var WriteCh = make(chan *PayloadLog, 1000) // 有缓冲 channel,导出供 main.go 关闭 // StartWriter 启动异步写入 worker func StartWriter() { go func() { for entry := range WriteCh { if err := insertPayloadLog(entry); err != nil { log.Printf("[db] 写入 payload_log 失败: %v", err) } } }() } // Insert 将待写入对象推入队列(非阻塞) // 数据库未初始化时静默跳过(由 main.go 保证 Serve 之前已 Init,此为安全兜底) func Insert(entry *PayloadLog) { // 安全兜底:若 WriteCh 未初始化(极端情况),直接丢弃 if WriteCh == nil { return } select { case WriteCh <- entry: default: log.Printf("[db] 写入队列已满,丢弃日志: topic=%s", entry.Topic) } } func insertPayloadLog(entry *PayloadLog) error { gormDB := Get() if gormDB == nil { return nil // Init 尚未完成,静默跳过 } return gormDB.Create(entry).Error }