feat: 初始化 meshgo MQTT 服务

- 支持 TCP / WebSocket 监听,配置热重载,systemd 集成
- meshAuthHook 实现用户名/密码认证与 ACL
- meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等)
- meshDBHook 将 msh/# 主题 payload 异步写入数据库
- 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置
- payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip
- 自动补充 config.yaml 缺失字段(文件存在时写回)
- .gitignore 屏蔽 data/ 和 .workbuddy/
This commit is contained in:
2026-05-15 18:09:39 +08:00
commit bf41e82a43
11 changed files with 958 additions and 0 deletions
+10
View File
@@ -0,0 +1,10 @@
# data 目录(运行时数据,勿提交)
data/
# WorkBuddy 工作目录
.workbuddy/
*.exe
*.o
meshgo
+40
View File
@@ -0,0 +1,40 @@
BINARY := meshgo
GOOS := linux
GOARCH := amd64
OUT := ./build/$(BINARY)
.PHONY: all build build-linux build-win install clean
all: build
## 本机编译(Windows CGO
build:
@mkdir -p build
CGO_ENABLED=1 go build -trimpath -ldflags="-s -w" -o $(OUT) .
@echo ">>> 已生成: $(OUT)"
## Linux 交叉编译(需要目标机器有 gcc,可选)
## 若遇 sqlite3 链接错误,请在 Linux 服务器上直接编译:
## make build-linux-native
build-linux:
@mkdir -p build
CGO_ENABLED=1 GOOS=$(GOOS) GOARCH=$(GOARCH) CC=x86_64-linux-gnu-gcc \
go build -trimpath -ldflags="-s -w" -o $(OUT) .
@echo ">>> 已生成 Linux $(GOARCH) 二进制: $(OUT)"
## Linux 服务器上原生编译(推荐生产环境使用)
build-linux-native:
@mkdir -p build
CGO_ENABLED=1 go build -trimpath -ldflags="-s -w" -o $(OUT) .
@echo ">>> 已生成: $(OUT)"
## 部署到目标机器(需要 SSH_HOST 环境变量,如 make install SSH_HOST=root@192.168.1.10
install: build
@test -n "$(SSH_HOST)" || (echo "请设置 SSH_HOST,例如: make install SSH_HOST=root@192.168.1.10" && exit 1)
scp $(OUT) $(SSH_HOST):/usr/local/bin/$(BINARY)
scp meshgo.service $(SSH_HOST):/etc/systemd/system/$(BINARY).service
ssh $(SSH_HOST) "systemctl daemon-reload && systemctl enable $(BINARY) && systemctl restart $(BINARY)"
@echo ">>> 部署完成"
clean:
rm -rf build/
+215
View File
@@ -0,0 +1,215 @@
package config
import (
"log"
"os"
"sync"
"gopkg.in/yaml.v3"
)
const DefaultConfigPath = "/etc/meshgo/config.yaml"
// Config 主配置结构
type Config struct {
Server ServerConfig `yaml:"server"`
Auth AuthConfig `yaml:"auth"`
Logging LoggingConfig `yaml:"logging"`
Database DatabaseConfig `yaml:"database"`
}
// ServerConfig MQTT 监听相关配置
type ServerConfig struct {
// TCP 监听地址,如 :1883
TCPAddr string `yaml:"tcp_addr"`
// WebSocket 监听地址,如 :8883(留空则不启动)
WSAddr string `yaml:"ws_addr"`
// 最大并发连接数,0 表示不限
MaxConnections int `yaml:"max_connections"`
// 客户端消息写入超时(秒)
WriteTimeout int `yaml:"write_timeout"`
}
// AuthConfig 认证配置
type AuthConfig struct {
// 是否开启用户名/密码认证
Enabled bool `yaml:"enabled"`
// 允许匿名连接
AllowAnonymous bool `yaml:"allow_anonymous"`
// 内置用户列表(简单场景使用)
Users []UserEntry `yaml:"users"`
}
// UserEntry 单条用户凭证
type UserEntry struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
}
// LoggingConfig 日志配置
type LoggingConfig struct {
// debug / info / warn / error
Level string `yaml:"level"`
// 日志文件路径,留空则输出到 stdout
File string `yaml:"file"`
}
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
// 是否启用数据库,默认 false
Enabled bool `yaml:"enabled"`
// 数据库类型:sqlite3(默认) / mysql
Type string `yaml:"type"`
// SQLite 数据库文件路径(相对于 dataDir)
// MySQL 留空,DSN 填 DSN 字段
File string `yaml:"file"`
// MySQL DSN,如 user:password@tcp(127.0.0.1:3306)/meshgo
// SQLite 模式下此字段被忽略
DSN string `yaml:"dsn"`
}
// ---------------------------------------------------------------------------
// 全局单例
// ---------------------------------------------------------------------------
var (
current *Config
mu sync.RWMutex
)
// Get 返回当前配置的只读快照
func Get() *Config {
mu.RLock()
defer mu.RUnlock()
return current
}
// Load 从磁盘加载配置,替换全局单例
func Load(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return err
}
cfg := defaultConfig()
if err = yaml.Unmarshal(data, cfg); err != nil {
return err
}
// 补充缺失字段的默认值
applyDefaults(cfg)
mu.Lock()
current = cfg
mu.Unlock()
log.Printf("[config] 已加载配置文件: %s", path)
return nil
}
// Reload 重新读取配置文件(SIGHUP 时调用)
func Reload(path string) {
log.Printf("[config] 收到重载信号,重新读取: %s", path)
if err := Load(path); err != nil {
log.Printf("[config] 重载失败,继续使用旧配置: %v", err)
}
}
// EnsureConfigComplete 确保配置文件存在且字段完整
// - 文件不存在:创建带注释的默认配置
// - 文件存在但缺失字段:补充缺失字段并写回文件
func EnsureConfigComplete(path string) error {
data, err := os.ReadFile(path)
if err != nil {
// 文件不存在,创建默认配置
cfg := defaultConfig()
return writeDefaultConfig(path, cfg)
}
// 文件存在,解析并补充缺失字段
cfg := defaultConfig()
if err = yaml.Unmarshal(data, cfg); err != nil {
return err
}
if changed := applyDefaults(cfg); changed {
log.Printf("[config] 检测到配置文件缺失字段,已自动补充: %s", path)
if err = writeDefaultConfig(path, cfg); err != nil {
return err
}
}
return nil
}
// writeDefaultConfig 写入带注释头的默认配置
func writeDefaultConfig(path string, cfg *Config) error {
data, err := yaml.Marshal(cfg)
if err != nil {
return err
}
header := `# meshgo MQTT 服务配置文件
# 修改后执行: systemctl reload meshgo 即可热重载(无需重启服务)
#
# 热重载支持的字段:auth / logging
# 需要重启才能生效的字段:server.tcp_addr / server.ws_addr / database
`
return os.WriteFile(path, append([]byte(header), data...), 0640)
}
// applyDefaults 补充零值字段的默认值,返回是否发生了修改
func applyDefaults(cfg *Config) bool {
changed := false
// Server
if cfg.Server.TCPAddr == "" {
cfg.Server.TCPAddr = ":1883"
changed = true
}
if cfg.Server.WriteTimeout == 0 {
cfg.Server.WriteTimeout = 3
changed = true
}
// Auth
if !cfg.Auth.Enabled {
cfg.Auth.Enabled = false
cfg.Auth.AllowAnonymous = true
changed = true
}
// Logging
if cfg.Logging.Level == "" {
cfg.Logging.Level = "info"
changed = true
}
// Database
if cfg.Database.Type == "" {
cfg.Database.Type = "sqlite3"
changed = true
}
if cfg.Database.File == "" {
cfg.Database.File = "meshgo.db"
changed = true
}
return changed
}
// defaultConfig 返回一份合理的默认配置
func defaultConfig() *Config {
return &Config{
Server: ServerConfig{
TCPAddr: ":1883",
WSAddr: "",
MaxConnections: 0,
WriteTimeout: 3,
},
Auth: AuthConfig{
Enabled: false,
AllowAnonymous: true,
Users: []UserEntry{},
},
Logging: LoggingConfig{
Level: "info",
File: "",
},
Database: DatabaseConfig{
Enabled: false,
Type: "sqlite3",
File: "meshgo.db",
DSN: "",
},
}
}
+198
View File
@@ -0,0 +1,198 @@
package database
import (
"fmt"
"log"
"meshgo/config"
"os"
"path/filepath"
"strings"
"sync"
"time"
"gorm.io/driver/mysql"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
// ---------------------------------------------------------------------------
// PayloadLog — payload 日志表(仅记录 msh/# 主题)
// ---------------------------------------------------------------------------
type PayloadLog struct {
ID uint64 `gorm:"primaryKey;autoIncrement"` // 自增 ID
Client string `gorm:"type:varchar(255);index"` // 客户端 ID
Topic string `gorm:"type:varchar(512);index"` // 完整主题
Qos byte // QoS 等级
Payload []byte // 原始 payload
CreatedAt int64 `gorm:"index"` // 发送时间(Unix 秒)
SenderIP string `gorm:"type:varchar(64)"` // 发送者 IP
}
// TableName 指定表名
func (PayloadLog) TableName() string { return "payload_log" }
// ---------------------------------------------------------------------------
// DB 全局单例
// ---------------------------------------------------------------------------
var (
db *gorm.DB
dbMu sync.RWMutex
)
// Get 返回当前数据库实例(只读)
func Get() *gorm.DB {
dbMu.RLock()
defer dbMu.RUnlock()
return db
}
// Set 设置数据库实例(仅供内部和测试使用)
func Set(d *gorm.DB) {
dbMu.Lock()
db = d
dbMu.Unlock()
}
// Init 根据配置初始化数据库连接,完成自动迁移
// dbType: "sqlite3" | "mysql"
func Init(cfg *config.DatabaseConfig, dataDir string) error {
if !cfg.Enabled {
log.Println("[db] 数据库未启用,跳过初始化")
return nil
}
var (
db *gorm.DB
err error
)
switch strings.ToLower(cfg.Type) {
case "mysql":
db, err = initMySQL(cfg.DSN)
case "sqlite3", "":
db, err = initSQLite(cfg, dataDir)
default:
return fmt.Errorf("[db] 不支持的数据库类型: %s(支持: sqlite3, mysql", cfg.Type)
}
if err != nil {
return fmt.Errorf("[db] 初始化失败: %w", err)
}
// 自动迁移表结构(仅新增列,不会删列)
if err = db.AutoMigrate(&PayloadLog{}); err != nil {
return fmt.Errorf("[db] 表迁移失败: %w", err)
}
Set(db)
log.Printf("[db] 已连接 %s", cfg.Type)
return nil
}
// initSQLite 构建 SQLite 连接,dbFile 相对于 dataDir
func initSQLite(cfg *config.DatabaseConfig, dataDir string) (*gorm.DB, error) {
path := filepath.Join(dataDir, cfg.File)
// 确保父目录存在
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, fmt.Errorf("创建数据库目录失败: %w", err)
}
log.Printf("[db] SQLite 数据库路径: %s", path)
// 默认 foreign_keys=ONsqlite3 driver 支持 _loc=Local
dsn := fmt.Sprintf("%s?_foreign_keys=on&_loc=Local", path)
gormDB, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
// 复用底层 sql.DB 设置连接池
sqlDB, err := gormDB.DB()
if err != nil {
return nil, err
}
sqlDB.SetMaxOpenConns(1) // SQLite 建议单连接
sqlDB.SetMaxIdleConns(1)
sqlDB.SetConnMaxLifetime(time.Hour)
return gormDB, nil
}
// initMySQL 构建 MySQL 连接
func initMySQL(dsn string) (*gorm.DB, error) {
if dsn == "" {
return nil, fmt.Errorf("MySQL DSN 未配置(请填写 database.dsn 字段)")
}
gormDB, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
sqlDB, err := gormDB.DB()
if err != nil {
return nil, err
}
sqlDB.SetMaxOpenConns(25)
sqlDB.SetMaxIdleConns(5)
sqlDB.SetConnMaxLifetime(5 * time.Minute)
return gormDB, nil
}
// ---------------------------------------------------------------------------
// Close 关闭数据库连接
// ---------------------------------------------------------------------------
func Close() error {
gormDB := Get()
if gormDB == nil {
return nil
}
sqlDB, err := gormDB.DB()
if err != nil {
return err
}
return sqlDB.Close()
}
// ---------------------------------------------------------------------------
// WritePayloadLog 异步写入 payload 日志(丢到 channel 不阻塞主流程)
// ---------------------------------------------------------------------------
var WriteCh = make(chan *PayloadLog, 1000) // 有缓冲 channel,导出供 main.go 关闭
// StartWriter 启动异步写入 worker
func StartWriter() {
go func() {
for entry := range WriteCh {
if err := insertPayloadLog(entry); err != nil {
log.Printf("[db] 写入 payload_log 失败: %v", err)
}
}
}()
}
// Insert 将待写入对象推入队列(非阻塞)
// 数据库未初始化时静默跳过(由 main.go 保证 Serve 之前已 Init,此为安全兜底)
func Insert(entry *PayloadLog) {
// 安全兜底:若 WriteCh 未初始化(极端情况),直接丢弃
if WriteCh == nil {
return
}
select {
case WriteCh <- entry:
default:
log.Printf("[db] 写入队列已满,丢弃日志: topic=%s", entry.Topic)
}
}
func insertPayloadLog(entry *PayloadLog) error {
gormDB := Get()
if gormDB == nil {
return nil // Init 尚未完成,静默跳过
}
return gormDB.Create(entry).Error
}
+19
View File
@@ -0,0 +1,19 @@
module meshgo
go 1.25.5
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mochi-mqtt/server/v2 v2.7.9 // indirect
github.com/rs/xid v1.4.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.6.0 // indirect
gorm.io/driver/sqlite v1.6.0 // indirect
gorm.io/gorm v1.31.1 // indirect
)
+27
View File
@@ -0,0 +1,27 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI=
github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg=
gorm.io/driver/mysql v1.6.0/go.mod h1:D/oCC2GWK3M/dqoLxnOlaNKmXz8WNTfcS9y5ovaSqKo=
gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg=
gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=
+217
View File
@@ -0,0 +1,217 @@
package main
import (
"bytes"
"fmt"
"log"
"meshgo/database"
"strings"
"time"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
)
// meshLogHook 实现所有可打印日志的 Hook 接口
type meshLogHook struct {
mqtt.HookBase
}
func (h *meshLogHook) ID() string { return "meshgo-log" }
func (h *meshLogHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnStarted,
mqtt.OnStopped,
mqtt.OnConnect,
mqtt.OnDisconnect,
mqtt.OnSessionEstablished,
mqtt.OnPublish,
mqtt.OnPublished,
mqtt.OnSubscribe,
mqtt.OnSubscribed,
mqtt.OnUnsubscribe,
mqtt.OnUnsubscribed,
mqtt.OnWillSent,
mqtt.OnQosComplete,
mqtt.OnQosDropped,
mqtt.OnPublishDropped,
}, []byte{b})
}
// OnStarted 服务启动完成
func (h *meshLogHook) OnStarted() {
log.Println("[hook] ✓ 服务已启动")
}
// OnStopped 服务停止
func (h *meshLogHook) OnStopped() {
log.Println("[hook] ✓ 服务已停止")
}
// OnConnect 客户端请求连接(认证前)
func (h *meshLogHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
log.Printf("[hook] CONNECT client=%s remote=%s",
safeClientID(cl), safeRemoteAddr(cl))
return nil
}
// OnSessionEstablished 客户端认证成功,session 建立
func (h *meshLogHook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) {
user := string(pk.Connect.Username)
if user == "" {
user = "(anonymous)"
}
log.Printf("[hook] CONNECTED client=%s username=%s keepalive=%d",
safeClientID(cl), user, pk.Connect.Keepalive)
}
// OnDisconnect 客户端断开(err=nil 表示主动断开,expire=true 表示 session 过期)
func (h *meshLogHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
cause := "主动断开"
if expire {
cause = "session 过期"
} else if err != nil {
cause = fmt.Sprintf("异常: %v", err)
}
log.Printf("[hook] DISCONNECT client=%s cause=%s", safeClientID(cl), cause)
}
// OnPublish 收到客户端发布的原始消息(可修改 packet 后继续)
func (h *meshLogHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
// 打印消息内容,过长截断
body := string(pk.Payload)
if len(body) > 200 {
body = body[:200] + "...(truncated)"
}
log.Printf("[hook] PUBLISH client=%s topic=%s qos=%d retain=%t payload=%s",
safeClientID(cl), pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, body)
return pk, nil
}
// OnPublished 消息已投递给所有订阅者
func (h *meshLogHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] PUBLISHED client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnSubscribe 客户端订阅请求(可修改过滤条件)
func (h *meshLogHook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
for _, sub := range pk.Filters {
log.Printf("[hook] SUBSCRIBE client=%s filter=%s qos=%d",
safeClientID(cl), sub.Filter, sub.Qos)
}
return pk
}
// OnSubscribed 订阅成功
func (h *meshLogHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
for i, sub := range pk.Filters {
code := "?"
if i < len(reasonCodes) {
code = fmt.Sprintf("%d", reasonCodes[i])
}
log.Printf("[hook] SUBSCRIBED client=%s filter=%s reason=%s",
safeClientID(cl), sub.Filter, code)
}
}
// OnUnsubscribe 客户端取消订阅
func (h *meshLogHook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
for _, sub := range pk.Filters {
log.Printf("[hook] UNSUBSCRIBE client=%s filter=%s",
safeClientID(cl), sub.Filter)
}
return pk
}
// OnUnsubscribed 取消订阅完成
func (h *meshLogHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] UNSUBSCRIBED client=%s", safeClientID(cl))
}
// OnWillSent 遗嘱消息已发送
func (h *meshLogHook) OnWillSent(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] LWT_SENT client=%s topic=%s", safeClientID(cl), pk.TopicName)
}
// OnQosComplete QoS 交付完成
func (h *meshLogHook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] QOS_COMPLETE client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnQosDropped QoS 消息超时丢弃
func (h *meshLogHook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] QOS_DROPPED client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnPublishDropped 消息因客户端慢被丢弃
func (h *meshLogHook) OnPublishDropped(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] PUBLISH_DROPPED client=%s topic=%s",
safeClientID(cl), pk.TopicName)
}
// ---------------------------------------------------------------------------
// 辅助函数
// ---------------------------------------------------------------------------
func safeClientID(cl *mqtt.Client) string {
if cl == nil {
return "(nil)"
}
return cl.ID
}
func safeRemoteAddr(cl *mqtt.Client) string {
if cl == nil {
return "(unknown)"
}
return cl.Net.Remote
}
// 编译期接口检查
var _ mqtt.Hook = (*meshLogHook)(nil)
// ---------------------------------------------------------------------------
// meshDBHook — 将 msh/# 主题的 payload 写入数据库
// ---------------------------------------------------------------------------
// meshDBHook 拦截 msh/# 主题消息并写入 payload_log 表
type meshDBHook struct {
mqtt.HookBase
}
func (h *meshDBHook) ID() string { return "meshgo-db" }
func (h *meshDBHook) Provides(b byte) bool {
return b == mqtt.OnPublish
}
// OnPublish 收到发布消息时,检查是否为 msh/# 并异步写库
func (h *meshDBHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
// 仅记录 msh/ 开头的主题
if !strings.HasPrefix(pk.TopicName, "msh/") {
return pk, nil
}
entry := &database.PayloadLog{
Client: safeClientID(cl),
Topic: pk.TopicName,
Qos: pk.FixedHeader.Qos,
Payload: pk.Payload,
CreatedAt: time.Now().Unix(),
SenderIP: safeRemoteAddr(cl),
}
// 异步写入,不阻塞消息投递
database.Insert(entry)
log.Printf("[hook] [db] queued msh payload: client=%s topic=%s qos=%d size=%d bytes",
entry.Client, entry.Topic, entry.Qos, len(entry.Payload))
return pk, nil
}
// 编译期接口检查
var _ mqtt.Hook = (*meshDBHook)(nil)
+181
View File
@@ -0,0 +1,181 @@
package main
import (
"bytes"
"log"
"meshgo/config"
"meshgo/database"
"os"
"os/signal"
"syscall"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
func main() {
// ----------------------------------------------------------------
// 1. 确保运行时目录存在
// ----------------------------------------------------------------
ensureDir(dataDir)
ensureDir(configDir)
// ----------------------------------------------------------------
// 2. 初始化配置(如无配置文件则写入默认值)
// ----------------------------------------------------------------
if err := config.EnsureConfigComplete(configFile); err != nil {
log.Fatalf("[main] 无法写入默认配置文件: %v", err)
}
if err := config.Load(configFile); err != nil {
log.Fatalf("[main] 无法加载配置文件: %v", err)
}
// ----------------------------------------------------------------
// 3. 初始化数据库(在启动服务前完成,避免 hook 调用时未初始化)
// ----------------------------------------------------------------
if err := database.Init(&config.Get().Database, dataDir); err != nil {
log.Fatalf("[main] 数据库初始化失败: %v", err)
}
database.StartWriter() // 启动异步写入 worker
// ----------------------------------------------------------------
// 4. 构建 MQTT 服务器(所有初始化在 Serve 之前完成)
// ----------------------------------------------------------------
cfg := config.Get()
server := mqtt.New(&mqtt.Options{
InlineClient: false,
})
// 注册认证 hook
if err := server.AddHook(new(meshAuthHook), nil); err != nil {
log.Fatalf("[main] 注册认证 hook 失败: %v", err)
}
// 注册日志 hook
if err := server.AddHook(new(meshLogHook), nil); err != nil {
log.Fatalf("[main] 注册日志 hook 失败: %v", err)
}
// 注册 payload 数据库日志 hook
if err := server.AddHook(new(meshDBHook), nil); err != nil {
log.Fatalf("[main] 注册 payload 数据库 hook 失败: %v", err)
}
// 添加 TCP 监听
tcpListener := listeners.NewTCP(
listeners.Config{ID: "tcp1", Address: cfg.Server.TCPAddr},
)
if err := server.AddListener(tcpListener); err != nil {
log.Fatalf("[main] 添加 TCP 监听失败 (%s): %v", cfg.Server.TCPAddr, err)
}
log.Printf("[main] TCP 监听已绑定: %s", cfg.Server.TCPAddr)
// 可选:添加 WebSocket 监听
if cfg.Server.WSAddr != "" {
wsListener := listeners.NewWebsocket(
listeners.Config{ID: "ws1", Address: cfg.Server.WSAddr},
)
if err := server.AddListener(wsListener); err != nil {
log.Fatalf("[main] 添加 WebSocket 监听失败 (%s): %v", cfg.Server.WSAddr, err)
}
log.Printf("[main] WebSocket 监听已绑定: %s", cfg.Server.WSAddr)
}
// ----------------------------------------------------------------
// 5. 启动服务器(阻塞,直到收到退出信号)
// ----------------------------------------------------------------
log.Println("[main] meshgo MQTT 服务已启动")
if err := server.Serve(); err != nil {
log.Fatalf("[main] MQTT 服务器启动失败: %v", err)
}
// ----------------------------------------------------------------
// 5. 信号处理
// SIGHUP → 热重载配置
// SIGINT / SIGTERM → 优雅退出
// ----------------------------------------------------------------
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
for sig := range sigCh {
switch sig {
case syscall.SIGHUP:
log.Println("[main] 收到 SIGHUP,热重载配置…")
config.Reload(configFile)
// 注:监听地址变更需要重启服务,此处仅刷新认证/日志等运行时配置
case syscall.SIGINT, syscall.SIGTERM:
log.Println("[main] 正在优雅关闭服务…")
close(database.WriteCh) // 停止异步写入 worker
if err := database.Close(); err != nil {
log.Printf("[main] 关闭数据库时出错: %v", err)
}
if err := server.Close(); err != nil {
log.Printf("[main] 关闭服务器时出错: %v", err)
}
log.Println("[main] meshgo 已停止")
return
}
}
}
// ensureDir 确保目录存在,不存在则创建(含父目录)
func ensureDir(path string) {
if _, err := os.Stat(path); os.IsNotExist(err) {
if err = os.MkdirAll(path, 0755); err != nil {
log.Fatalf("[main] 无法创建目录 %s: %v", path, err)
}
log.Printf("[main] 已创建目录: %s", path)
}
}
// ----------------------------------------------------------------
// 认证 Hook
// ----------------------------------------------------------------
// meshAuthHook 实现 mochi-mqtt Hook 接口,基于配置文件进行认证
type meshAuthHook struct {
mqtt.HookBase
}
func (h *meshAuthHook) ID() string { return "meshgo-auth" }
func (h *meshAuthHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnACLCheck,
}, []byte{b})
}
// OnConnectAuthenticate 验证客户端连接凭证
func (h *meshAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
cfg := config.Get().Auth
// 未开启认证,全部放行
if !cfg.Enabled {
return true
}
username := string(pk.Connect.Username)
password := string(pk.Connect.Password)
// 匿名连接处理
if username == "" {
return cfg.AllowAnonymous
}
// 逐条比对用户列表
for _, u := range cfg.Users {
if u.Username == username && u.Password == password {
return true
}
}
return false
}
// OnACLCheck 主题级别 ACL 检查(默认全放行,可按需扩展)
func (h *meshAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
return true
}
// 确保 meshAuthHook 实现了 mqtt.Hook 接口(编译期检查)
var _ mqtt.Hook = (*meshAuthHook)(nil)
+31
View File
@@ -0,0 +1,31 @@
[Unit]
Description=meshgo MQTT Broker
Documentation=https://github.com/mochi-mqtt/server
After=network.target
[Service]
Type=simple
# 以 root 运行以便绑定 1883 端口;如已设置 CAP_NET_BIND_SERVICE 可改为普通用户
User=root
Group=root
# 二进制路径,编译后 cp meshgo /usr/local/bin/meshgo
ExecStart=/usr/local/bin/meshgo
# systemctl reload meshgo → 发送 SIGHUP → 热重载配置
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
RestartSec=5s
# 日志输出交由 journald 接管
StandardOutput=journal
StandardError=journal
SyslogIdentifier=meshgo
# 资源保护(可选)
PrivateTmp=true
NoNewPrivileges=true
[Install]
WantedBy=multi-user.target
+10
View File
@@ -0,0 +1,10 @@
//go:build !windows
package main
// Linux/macOS:使用系统标准路径
const (
dataDir = "/var/lib/meshgo"
configDir = "/etc/meshgo"
configFile = configDir + "/config.yaml"
)
+10
View File
@@ -0,0 +1,10 @@
//go:build windows
package main
// Windows:映射到当前目录下的 data/ 子目录,方便本地测试
const (
dataDir = `./data/var/lib/meshgo`
configDir = `./data/etc/meshgo`
configFile = configDir + `/config.yaml`
)