From bf41e82a438ceda54f0d0c47f671bbde7413a6bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Fri, 15 May 2026 18:09:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E5=8C=96=20meshgo=20?= =?UTF-8?q?MQTT=20=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 支持 TCP / WebSocket 监听,配置热重载,systemd 集成 - meshAuthHook 实现用户名/密码认证与 ACL - meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等) - meshDBHook 将 msh/# 主题 payload 异步写入数据库 - 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置 - payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip - 自动补充 config.yaml 缺失字段(文件存在时写回) - .gitignore 屏蔽 data/ 和 .workbuddy/ --- .gitignore | 10 ++ Makefile | 40 ++++++++ config/config.go | 215 ++++++++++++++++++++++++++++++++++++++++++ database/database.go | 198 +++++++++++++++++++++++++++++++++++++++ go.mod | 19 ++++ go.sum | 27 ++++++ hooks.go | 217 +++++++++++++++++++++++++++++++++++++++++++ main.go | 181 ++++++++++++++++++++++++++++++++++++ meshgo.service | 31 +++++++ paths_unix.go | 10 ++ paths_windows.go | 10 ++ 11 files changed, 958 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 config/config.go create mode 100644 database/database.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hooks.go create mode 100644 main.go create mode 100644 meshgo.service create mode 100644 paths_unix.go create mode 100644 paths_windows.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f7d66af --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# data 目录(运行时数据,勿提交) +data/ + +# WorkBuddy 工作目录 +.workbuddy/ + +*.exe +*.o + +meshgo \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c0b70aa --- /dev/null +++ b/Makefile @@ -0,0 +1,40 @@ +BINARY := meshgo +GOOS := linux +GOARCH := amd64 +OUT := ./build/$(BINARY) + +.PHONY: all build build-linux build-win install clean + +all: build + +## 本机编译(Windows CGO) +build: + @mkdir -p build + CGO_ENABLED=1 go build -trimpath -ldflags="-s -w" -o $(OUT) . + @echo ">>> 已生成: $(OUT)" + +## Linux 交叉编译(需要目标机器有 gcc,可选) +## 若遇 sqlite3 链接错误,请在 Linux 服务器上直接编译: +## make build-linux-native +build-linux: + @mkdir -p build + CGO_ENABLED=1 GOOS=$(GOOS) GOARCH=$(GOARCH) CC=x86_64-linux-gnu-gcc \ + go build -trimpath -ldflags="-s -w" -o $(OUT) . + @echo ">>> 已生成 Linux $(GOARCH) 二进制: $(OUT)" + +## Linux 服务器上原生编译(推荐生产环境使用) +build-linux-native: + @mkdir -p build + CGO_ENABLED=1 go build -trimpath -ldflags="-s -w" -o $(OUT) . + @echo ">>> 已生成: $(OUT)" + +## 部署到目标机器(需要 SSH_HOST 环境变量,如 make install SSH_HOST=root@192.168.1.10) +install: build + @test -n "$(SSH_HOST)" || (echo "请设置 SSH_HOST,例如: make install SSH_HOST=root@192.168.1.10" && exit 1) + scp $(OUT) $(SSH_HOST):/usr/local/bin/$(BINARY) + scp meshgo.service $(SSH_HOST):/etc/systemd/system/$(BINARY).service + ssh $(SSH_HOST) "systemctl daemon-reload && systemctl enable $(BINARY) && systemctl restart $(BINARY)" + @echo ">>> 部署完成" + +clean: + rm -rf build/ diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..d441f3f --- /dev/null +++ b/config/config.go @@ -0,0 +1,215 @@ +package config + +import ( + "log" + "os" + "sync" + + "gopkg.in/yaml.v3" +) + +const DefaultConfigPath = "/etc/meshgo/config.yaml" + +// Config 主配置结构 +type Config struct { + Server ServerConfig `yaml:"server"` + Auth AuthConfig `yaml:"auth"` + Logging LoggingConfig `yaml:"logging"` + Database DatabaseConfig `yaml:"database"` +} + +// ServerConfig MQTT 监听相关配置 +type ServerConfig struct { + // TCP 监听地址,如 :1883 + TCPAddr string `yaml:"tcp_addr"` + // WebSocket 监听地址,如 :8883(留空则不启动) + WSAddr string `yaml:"ws_addr"` + // 最大并发连接数,0 表示不限 + MaxConnections int `yaml:"max_connections"` + // 客户端消息写入超时(秒) + WriteTimeout int `yaml:"write_timeout"` +} + +// AuthConfig 认证配置 +type AuthConfig struct { + // 是否开启用户名/密码认证 + Enabled bool `yaml:"enabled"` + // 允许匿名连接 + AllowAnonymous bool `yaml:"allow_anonymous"` + // 内置用户列表(简单场景使用) + Users []UserEntry `yaml:"users"` +} + +// UserEntry 单条用户凭证 +type UserEntry struct { + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +// LoggingConfig 日志配置 +type LoggingConfig struct { + // debug / info / warn / error + Level string `yaml:"level"` + // 日志文件路径,留空则输出到 stdout + File string `yaml:"file"` +} + +// DatabaseConfig 数据库配置 +type DatabaseConfig struct { + // 是否启用数据库,默认 false + Enabled bool `yaml:"enabled"` + // 数据库类型:sqlite3(默认) / mysql + Type string `yaml:"type"` + // SQLite 数据库文件路径(相对于 dataDir) + // MySQL 留空,DSN 填 DSN 字段 + File string `yaml:"file"` + // MySQL DSN,如 user:password@tcp(127.0.0.1:3306)/meshgo + // SQLite 模式下此字段被忽略 + DSN string `yaml:"dsn"` +} + +// --------------------------------------------------------------------------- +// 全局单例 +// --------------------------------------------------------------------------- + +var ( + current *Config + mu sync.RWMutex +) + +// Get 返回当前配置的只读快照 +func Get() *Config { + mu.RLock() + defer mu.RUnlock() + return current +} + +// Load 从磁盘加载配置,替换全局单例 +func Load(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err + } + cfg := defaultConfig() + if err = yaml.Unmarshal(data, cfg); err != nil { + return err + } + // 补充缺失字段的默认值 + applyDefaults(cfg) + mu.Lock() + current = cfg + mu.Unlock() + log.Printf("[config] 已加载配置文件: %s", path) + return nil +} + +// Reload 重新读取配置文件(SIGHUP 时调用) +func Reload(path string) { + log.Printf("[config] 收到重载信号,重新读取: %s", path) + if err := Load(path); err != nil { + log.Printf("[config] 重载失败,继续使用旧配置: %v", err) + } +} + +// EnsureConfigComplete 确保配置文件存在且字段完整 +// - 文件不存在:创建带注释的默认配置 +// - 文件存在但缺失字段:补充缺失字段并写回文件 +func EnsureConfigComplete(path string) error { + data, err := os.ReadFile(path) + if err != nil { + // 文件不存在,创建默认配置 + cfg := defaultConfig() + return writeDefaultConfig(path, cfg) + } + + // 文件存在,解析并补充缺失字段 + cfg := defaultConfig() + if err = yaml.Unmarshal(data, cfg); err != nil { + return err + } + if changed := applyDefaults(cfg); changed { + log.Printf("[config] 检测到配置文件缺失字段,已自动补充: %s", path) + if err = writeDefaultConfig(path, cfg); err != nil { + return err + } + } + return nil +} + +// writeDefaultConfig 写入带注释头的默认配置 +func writeDefaultConfig(path string, cfg *Config) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + header := `# meshgo MQTT 服务配置文件 +# 修改后执行: systemctl reload meshgo 即可热重载(无需重启服务) +# +# 热重载支持的字段:auth / logging +# 需要重启才能生效的字段:server.tcp_addr / server.ws_addr / database + +` + return os.WriteFile(path, append([]byte(header), data...), 0640) +} + +// applyDefaults 补充零值字段的默认值,返回是否发生了修改 +func applyDefaults(cfg *Config) bool { + changed := false + // Server + if cfg.Server.TCPAddr == "" { + cfg.Server.TCPAddr = ":1883" + changed = true + } + if cfg.Server.WriteTimeout == 0 { + cfg.Server.WriteTimeout = 3 + changed = true + } + // Auth + if !cfg.Auth.Enabled { + cfg.Auth.Enabled = false + cfg.Auth.AllowAnonymous = true + changed = true + } + // Logging + if cfg.Logging.Level == "" { + cfg.Logging.Level = "info" + changed = true + } + // Database + if cfg.Database.Type == "" { + cfg.Database.Type = "sqlite3" + changed = true + } + if cfg.Database.File == "" { + cfg.Database.File = "meshgo.db" + changed = true + } + return changed +} + +// defaultConfig 返回一份合理的默认配置 +func defaultConfig() *Config { + return &Config{ + Server: ServerConfig{ + TCPAddr: ":1883", + WSAddr: "", + MaxConnections: 0, + WriteTimeout: 3, + }, + Auth: AuthConfig{ + Enabled: false, + AllowAnonymous: true, + Users: []UserEntry{}, + }, + Logging: LoggingConfig{ + Level: "info", + File: "", + }, + Database: DatabaseConfig{ + Enabled: false, + Type: "sqlite3", + File: "meshgo.db", + DSN: "", + }, + } +} diff --git a/database/database.go b/database/database.go new file mode 100644 index 0000000..3494f90 --- /dev/null +++ b/database/database.go @@ -0,0 +1,198 @@ +package database + +import ( + "fmt" + "log" + "meshgo/config" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "gorm.io/driver/mysql" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// --------------------------------------------------------------------------- +// PayloadLog — payload 日志表(仅记录 msh/# 主题) +// --------------------------------------------------------------------------- + +type PayloadLog struct { + ID uint64 `gorm:"primaryKey;autoIncrement"` // 自增 ID + Client string `gorm:"type:varchar(255);index"` // 客户端 ID + Topic string `gorm:"type:varchar(512);index"` // 完整主题 + Qos byte // QoS 等级 + Payload []byte // 原始 payload + CreatedAt int64 `gorm:"index"` // 发送时间(Unix 秒) + SenderIP string `gorm:"type:varchar(64)"` // 发送者 IP +} + +// TableName 指定表名 +func (PayloadLog) TableName() string { return "payload_log" } + +// --------------------------------------------------------------------------- +// DB 全局单例 +// --------------------------------------------------------------------------- + +var ( + db *gorm.DB + dbMu sync.RWMutex +) + +// Get 返回当前数据库实例(只读) +func Get() *gorm.DB { + dbMu.RLock() + defer dbMu.RUnlock() + return db +} + +// Set 设置数据库实例(仅供内部和测试使用) +func Set(d *gorm.DB) { + dbMu.Lock() + db = d + dbMu.Unlock() +} + +// Init 根据配置初始化数据库连接,完成自动迁移 +// dbType: "sqlite3" | "mysql" +func Init(cfg *config.DatabaseConfig, dataDir string) error { + if !cfg.Enabled { + log.Println("[db] 数据库未启用,跳过初始化") + return nil + } + + var ( + db *gorm.DB + err error + ) + + switch strings.ToLower(cfg.Type) { + case "mysql": + db, err = initMySQL(cfg.DSN) + case "sqlite3", "": + db, err = initSQLite(cfg, dataDir) + default: + return fmt.Errorf("[db] 不支持的数据库类型: %s(支持: sqlite3, mysql)", cfg.Type) + } + if err != nil { + return fmt.Errorf("[db] 初始化失败: %w", err) + } + + // 自动迁移表结构(仅新增列,不会删列) + if err = db.AutoMigrate(&PayloadLog{}); err != nil { + return fmt.Errorf("[db] 表迁移失败: %w", err) + } + + Set(db) + log.Printf("[db] 已连接 %s", cfg.Type) + return nil +} + +// initSQLite 构建 SQLite 连接,dbFile 相对于 dataDir +func initSQLite(cfg *config.DatabaseConfig, dataDir string) (*gorm.DB, error) { + path := filepath.Join(dataDir, cfg.File) + // 确保父目录存在 + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, fmt.Errorf("创建数据库目录失败: %w", err) + } + log.Printf("[db] SQLite 数据库路径: %s", path) + + // 默认 foreign_keys=ON;sqlite3 driver 支持 _loc=Local + dsn := fmt.Sprintf("%s?_foreign_keys=on&_loc=Local", path) + + gormDB, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + return nil, err + } + + // 复用底层 sql.DB 设置连接池 + sqlDB, err := gormDB.DB() + if err != nil { + return nil, err + } + sqlDB.SetMaxOpenConns(1) // SQLite 建议单连接 + sqlDB.SetMaxIdleConns(1) + sqlDB.SetConnMaxLifetime(time.Hour) + return gormDB, nil +} + +// initMySQL 构建 MySQL 连接 +func initMySQL(dsn string) (*gorm.DB, error) { + if dsn == "" { + return nil, fmt.Errorf("MySQL DSN 未配置(请填写 database.dsn 字段)") + } + gormDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + return nil, err + } + sqlDB, err := gormDB.DB() + if err != nil { + return nil, err + } + sqlDB.SetMaxOpenConns(25) + sqlDB.SetMaxIdleConns(5) + sqlDB.SetConnMaxLifetime(5 * time.Minute) + return gormDB, nil +} + +// --------------------------------------------------------------------------- +// Close 关闭数据库连接 +// --------------------------------------------------------------------------- + +func Close() error { + gormDB := Get() + if gormDB == nil { + return nil + } + sqlDB, err := gormDB.DB() + if err != nil { + return err + } + return sqlDB.Close() +} + +// --------------------------------------------------------------------------- +// WritePayloadLog 异步写入 payload 日志(丢到 channel 不阻塞主流程) +// --------------------------------------------------------------------------- + +var WriteCh = make(chan *PayloadLog, 1000) // 有缓冲 channel,导出供 main.go 关闭 + +// StartWriter 启动异步写入 worker +func StartWriter() { + go func() { + for entry := range WriteCh { + if err := insertPayloadLog(entry); err != nil { + log.Printf("[db] 写入 payload_log 失败: %v", err) + } + } + }() +} + +// Insert 将待写入对象推入队列(非阻塞) +// 数据库未初始化时静默跳过(由 main.go 保证 Serve 之前已 Init,此为安全兜底) +func Insert(entry *PayloadLog) { + // 安全兜底:若 WriteCh 未初始化(极端情况),直接丢弃 + if WriteCh == nil { + return + } + select { + case WriteCh <- entry: + default: + log.Printf("[db] 写入队列已满,丢弃日志: topic=%s", entry.Topic) + } +} + +func insertPayloadLog(entry *PayloadLog) error { + gormDB := Get() + if gormDB == nil { + return nil // Init 尚未完成,静默跳过 + } + return gormDB.Create(entry).Error +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1720f38 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module meshgo + +go 1.25.5 + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mochi-mqtt/server/v2 v2.7.9 // indirect + github.com/rs/xid v1.4.0 // indirect + golang.org/x/text v0.21.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/mysql v1.6.0 // indirect + gorm.io/driver/sqlite v1.6.0 // indirect + gorm.io/gorm v1.31.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2584cff --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= +github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= +github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= +gorm.io/driver/mysql v1.6.0/go.mod h1:D/oCC2GWK3M/dqoLxnOlaNKmXz8WNTfcS9y5ovaSqKo= +gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ= +gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/hooks.go b/hooks.go new file mode 100644 index 0000000..935790e --- /dev/null +++ b/hooks.go @@ -0,0 +1,217 @@ +package main + +import ( + "bytes" + "fmt" + "log" + "meshgo/database" + "strings" + "time" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" +) + +// meshLogHook 实现所有可打印日志的 Hook 接口 +type meshLogHook struct { + mqtt.HookBase +} + +func (h *meshLogHook) ID() string { return "meshgo-log" } + +func (h *meshLogHook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnStarted, + mqtt.OnStopped, + mqtt.OnConnect, + mqtt.OnDisconnect, + mqtt.OnSessionEstablished, + mqtt.OnPublish, + mqtt.OnPublished, + mqtt.OnSubscribe, + mqtt.OnSubscribed, + mqtt.OnUnsubscribe, + mqtt.OnUnsubscribed, + mqtt.OnWillSent, + mqtt.OnQosComplete, + mqtt.OnQosDropped, + mqtt.OnPublishDropped, + }, []byte{b}) +} + +// OnStarted 服务启动完成 +func (h *meshLogHook) OnStarted() { + log.Println("[hook] ✓ 服务已启动") +} + +// OnStopped 服务停止 +func (h *meshLogHook) OnStopped() { + log.Println("[hook] ✓ 服务已停止") +} + +// OnConnect 客户端请求连接(认证前) +func (h *meshLogHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error { + log.Printf("[hook] CONNECT client=%s remote=%s", + safeClientID(cl), safeRemoteAddr(cl)) + return nil +} + +// OnSessionEstablished 客户端认证成功,session 建立 +func (h *meshLogHook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) { + user := string(pk.Connect.Username) + if user == "" { + user = "(anonymous)" + } + log.Printf("[hook] CONNECTED client=%s username=%s keepalive=%d", + safeClientID(cl), user, pk.Connect.Keepalive) +} + +// OnDisconnect 客户端断开(err=nil 表示主动断开,expire=true 表示 session 过期) +func (h *meshLogHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { + cause := "主动断开" + if expire { + cause = "session 过期" + } else if err != nil { + cause = fmt.Sprintf("异常: %v", err) + } + log.Printf("[hook] DISCONNECT client=%s cause=%s", safeClientID(cl), cause) +} + +// OnPublish 收到客户端发布的原始消息(可修改 packet 后继续) +func (h *meshLogHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { + // 打印消息内容,过长截断 + body := string(pk.Payload) + if len(body) > 200 { + body = body[:200] + "...(truncated)" + } + log.Printf("[hook] PUBLISH client=%s topic=%s qos=%d retain=%t payload=%s", + safeClientID(cl), pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, body) + return pk, nil +} + +// OnPublished 消息已投递给所有订阅者 +func (h *meshLogHook) OnPublished(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] PUBLISHED client=%s topic=%s id=%d", + safeClientID(cl), pk.TopicName, pk.PacketID) +} + +// OnSubscribe 客户端订阅请求(可修改过滤条件) +func (h *meshLogHook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { + for _, sub := range pk.Filters { + log.Printf("[hook] SUBSCRIBE client=%s filter=%s qos=%d", + safeClientID(cl), sub.Filter, sub.Qos) + } + return pk +} + +// OnSubscribed 订阅成功 +func (h *meshLogHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { + for i, sub := range pk.Filters { + code := "?" + if i < len(reasonCodes) { + code = fmt.Sprintf("%d", reasonCodes[i]) + } + log.Printf("[hook] SUBSCRIBED client=%s filter=%s reason=%s", + safeClientID(cl), sub.Filter, code) + } +} + +// OnUnsubscribe 客户端取消订阅 +func (h *meshLogHook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { + for _, sub := range pk.Filters { + log.Printf("[hook] UNSUBSCRIBE client=%s filter=%s", + safeClientID(cl), sub.Filter) + } + return pk +} + +// OnUnsubscribed 取消订阅完成 +func (h *meshLogHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] UNSUBSCRIBED client=%s", safeClientID(cl)) +} + +// OnWillSent 遗嘱消息已发送 +func (h *meshLogHook) OnWillSent(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] LWT_SENT client=%s topic=%s", safeClientID(cl), pk.TopicName) +} + +// OnQosComplete QoS 交付完成 +func (h *meshLogHook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] QOS_COMPLETE client=%s topic=%s id=%d", + safeClientID(cl), pk.TopicName, pk.PacketID) +} + +// OnQosDropped QoS 消息超时丢弃 +func (h *meshLogHook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] QOS_DROPPED client=%s topic=%s id=%d", + safeClientID(cl), pk.TopicName, pk.PacketID) +} + +// OnPublishDropped 消息因客户端慢被丢弃 +func (h *meshLogHook) OnPublishDropped(cl *mqtt.Client, pk packets.Packet) { + log.Printf("[hook] PUBLISH_DROPPED client=%s topic=%s", + safeClientID(cl), pk.TopicName) +} + +// --------------------------------------------------------------------------- +// 辅助函数 +// --------------------------------------------------------------------------- + +func safeClientID(cl *mqtt.Client) string { + if cl == nil { + return "(nil)" + } + return cl.ID +} + +func safeRemoteAddr(cl *mqtt.Client) string { + if cl == nil { + return "(unknown)" + } + return cl.Net.Remote +} + +// 编译期接口检查 +var _ mqtt.Hook = (*meshLogHook)(nil) + +// --------------------------------------------------------------------------- +// meshDBHook — 将 msh/# 主题的 payload 写入数据库 +// --------------------------------------------------------------------------- + +// meshDBHook 拦截 msh/# 主题消息并写入 payload_log 表 +type meshDBHook struct { + mqtt.HookBase +} + +func (h *meshDBHook) ID() string { return "meshgo-db" } + +func (h *meshDBHook) Provides(b byte) bool { + return b == mqtt.OnPublish +} + +// OnPublish 收到发布消息时,检查是否为 msh/# 并异步写库 +func (h *meshDBHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { + // 仅记录 msh/ 开头的主题 + if !strings.HasPrefix(pk.TopicName, "msh/") { + return pk, nil + } + + entry := &database.PayloadLog{ + Client: safeClientID(cl), + Topic: pk.TopicName, + Qos: pk.FixedHeader.Qos, + Payload: pk.Payload, + CreatedAt: time.Now().Unix(), + SenderIP: safeRemoteAddr(cl), + } + + // 异步写入,不阻塞消息投递 + database.Insert(entry) + log.Printf("[hook] [db] queued msh payload: client=%s topic=%s qos=%d size=%d bytes", + entry.Client, entry.Topic, entry.Qos, len(entry.Payload)) + + return pk, nil +} + +// 编译期接口检查 +var _ mqtt.Hook = (*meshDBHook)(nil) diff --git a/main.go b/main.go new file mode 100644 index 0000000..225bcc3 --- /dev/null +++ b/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "bytes" + "log" + "meshgo/config" + "meshgo/database" + "os" + "os/signal" + "syscall" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/listeners" + "github.com/mochi-mqtt/server/v2/packets" +) + + +func main() { + // ---------------------------------------------------------------- + // 1. 确保运行时目录存在 + // ---------------------------------------------------------------- + ensureDir(dataDir) + ensureDir(configDir) + + // ---------------------------------------------------------------- + // 2. 初始化配置(如无配置文件则写入默认值) + // ---------------------------------------------------------------- + if err := config.EnsureConfigComplete(configFile); err != nil { + log.Fatalf("[main] 无法写入默认配置文件: %v", err) + } + if err := config.Load(configFile); err != nil { + log.Fatalf("[main] 无法加载配置文件: %v", err) + } + + // ---------------------------------------------------------------- + // 3. 初始化数据库(在启动服务前完成,避免 hook 调用时未初始化) + // ---------------------------------------------------------------- + if err := database.Init(&config.Get().Database, dataDir); err != nil { + log.Fatalf("[main] 数据库初始化失败: %v", err) + } + database.StartWriter() // 启动异步写入 worker + + // ---------------------------------------------------------------- + // 4. 构建 MQTT 服务器(所有初始化在 Serve 之前完成) + // ---------------------------------------------------------------- + cfg := config.Get() + server := mqtt.New(&mqtt.Options{ + InlineClient: false, + }) + + // 注册认证 hook + if err := server.AddHook(new(meshAuthHook), nil); err != nil { + log.Fatalf("[main] 注册认证 hook 失败: %v", err) + } + // 注册日志 hook + if err := server.AddHook(new(meshLogHook), nil); err != nil { + log.Fatalf("[main] 注册日志 hook 失败: %v", err) + } + // 注册 payload 数据库日志 hook + if err := server.AddHook(new(meshDBHook), nil); err != nil { + log.Fatalf("[main] 注册 payload 数据库 hook 失败: %v", err) + } + + // 添加 TCP 监听 + tcpListener := listeners.NewTCP( + listeners.Config{ID: "tcp1", Address: cfg.Server.TCPAddr}, + ) + if err := server.AddListener(tcpListener); err != nil { + log.Fatalf("[main] 添加 TCP 监听失败 (%s): %v", cfg.Server.TCPAddr, err) + } + log.Printf("[main] TCP 监听已绑定: %s", cfg.Server.TCPAddr) + + // 可选:添加 WebSocket 监听 + if cfg.Server.WSAddr != "" { + wsListener := listeners.NewWebsocket( + listeners.Config{ID: "ws1", Address: cfg.Server.WSAddr}, + ) + if err := server.AddListener(wsListener); err != nil { + log.Fatalf("[main] 添加 WebSocket 监听失败 (%s): %v", cfg.Server.WSAddr, err) + } + log.Printf("[main] WebSocket 监听已绑定: %s", cfg.Server.WSAddr) + } + + // ---------------------------------------------------------------- + // 5. 启动服务器(阻塞,直到收到退出信号) + // ---------------------------------------------------------------- + log.Println("[main] meshgo MQTT 服务已启动") + if err := server.Serve(); err != nil { + log.Fatalf("[main] MQTT 服务器启动失败: %v", err) + } + + // ---------------------------------------------------------------- + // 5. 信号处理 + // SIGHUP → 热重载配置 + // SIGINT / SIGTERM → 优雅退出 + // ---------------------------------------------------------------- + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + + for sig := range sigCh { + switch sig { + case syscall.SIGHUP: + log.Println("[main] 收到 SIGHUP,热重载配置…") + config.Reload(configFile) + // 注:监听地址变更需要重启服务,此处仅刷新认证/日志等运行时配置 + case syscall.SIGINT, syscall.SIGTERM: + log.Println("[main] 正在优雅关闭服务…") + close(database.WriteCh) // 停止异步写入 worker + if err := database.Close(); err != nil { + log.Printf("[main] 关闭数据库时出错: %v", err) + } + if err := server.Close(); err != nil { + log.Printf("[main] 关闭服务器时出错: %v", err) + } + log.Println("[main] meshgo 已停止") + return + } + } +} + +// ensureDir 确保目录存在,不存在则创建(含父目录) +func ensureDir(path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + if err = os.MkdirAll(path, 0755); err != nil { + log.Fatalf("[main] 无法创建目录 %s: %v", path, err) + } + log.Printf("[main] 已创建目录: %s", path) + } +} + +// ---------------------------------------------------------------- +// 认证 Hook +// ---------------------------------------------------------------- + +// meshAuthHook 实现 mochi-mqtt Hook 接口,基于配置文件进行认证 +type meshAuthHook struct { + mqtt.HookBase +} + +func (h *meshAuthHook) ID() string { return "meshgo-auth" } + +func (h *meshAuthHook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnConnectAuthenticate, + mqtt.OnACLCheck, + }, []byte{b}) +} + +// OnConnectAuthenticate 验证客户端连接凭证 +func (h *meshAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { + cfg := config.Get().Auth + + // 未开启认证,全部放行 + if !cfg.Enabled { + return true + } + + username := string(pk.Connect.Username) + password := string(pk.Connect.Password) + + // 匿名连接处理 + if username == "" { + return cfg.AllowAnonymous + } + + // 逐条比对用户列表 + for _, u := range cfg.Users { + if u.Username == username && u.Password == password { + return true + } + } + return false +} + +// OnACLCheck 主题级别 ACL 检查(默认全放行,可按需扩展) +func (h *meshAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { + return true +} + +// 确保 meshAuthHook 实现了 mqtt.Hook 接口(编译期检查) +var _ mqtt.Hook = (*meshAuthHook)(nil) diff --git a/meshgo.service b/meshgo.service new file mode 100644 index 0000000..c8084ed --- /dev/null +++ b/meshgo.service @@ -0,0 +1,31 @@ +[Unit] +Description=meshgo MQTT Broker +Documentation=https://github.com/mochi-mqtt/server +After=network.target + +[Service] +Type=simple +# 以 root 运行以便绑定 1883 端口;如已设置 CAP_NET_BIND_SERVICE 可改为普通用户 +User=root +Group=root + +# 二进制路径,编译后 cp meshgo /usr/local/bin/meshgo +ExecStart=/usr/local/bin/meshgo + +# systemctl reload meshgo → 发送 SIGHUP → 热重载配置 +ExecReload=/bin/kill -HUP $MAINPID + +Restart=on-failure +RestartSec=5s + +# 日志输出交由 journald 接管 +StandardOutput=journal +StandardError=journal +SyslogIdentifier=meshgo + +# 资源保护(可选) +PrivateTmp=true +NoNewPrivileges=true + +[Install] +WantedBy=multi-user.target diff --git a/paths_unix.go b/paths_unix.go new file mode 100644 index 0000000..0e7c063 --- /dev/null +++ b/paths_unix.go @@ -0,0 +1,10 @@ +//go:build !windows + +package main + +// Linux/macOS:使用系统标准路径 +const ( + dataDir = "/var/lib/meshgo" + configDir = "/etc/meshgo" + configFile = configDir + "/config.yaml" +) diff --git a/paths_windows.go b/paths_windows.go new file mode 100644 index 0000000..83554fa --- /dev/null +++ b/paths_windows.go @@ -0,0 +1,10 @@ +//go:build windows + +package main + +// Windows:映射到当前目录下的 data/ 子目录,方便本地测试 +const ( + dataDir = `./data/var/lib/meshgo` + configDir = `./data/etc/meshgo` + configFile = configDir + `/config.yaml` +)