Files
kevin bf41e82a43 feat: 初始化 meshgo MQTT 服务
- 支持 TCP / WebSocket 监听,配置热重载,systemd 集成
- meshAuthHook 实现用户名/密码认证与 ACL
- meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等)
- meshDBHook 将 msh/# 主题 payload 异步写入数据库
- 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置
- payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip
- 自动补充 config.yaml 缺失字段(文件存在时写回)
- .gitignore 屏蔽 data/ 和 .workbuddy/
2026-05-15 18:09:39 +08:00

218 lines
6.1 KiB
Go

package main
import (
"bytes"
"fmt"
"log"
"meshgo/database"
"strings"
"time"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
)
// meshLogHook 实现所有可打印日志的 Hook 接口
type meshLogHook struct {
mqtt.HookBase
}
func (h *meshLogHook) ID() string { return "meshgo-log" }
func (h *meshLogHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnStarted,
mqtt.OnStopped,
mqtt.OnConnect,
mqtt.OnDisconnect,
mqtt.OnSessionEstablished,
mqtt.OnPublish,
mqtt.OnPublished,
mqtt.OnSubscribe,
mqtt.OnSubscribed,
mqtt.OnUnsubscribe,
mqtt.OnUnsubscribed,
mqtt.OnWillSent,
mqtt.OnQosComplete,
mqtt.OnQosDropped,
mqtt.OnPublishDropped,
}, []byte{b})
}
// OnStarted 服务启动完成
func (h *meshLogHook) OnStarted() {
log.Println("[hook] ✓ 服务已启动")
}
// OnStopped 服务停止
func (h *meshLogHook) OnStopped() {
log.Println("[hook] ✓ 服务已停止")
}
// OnConnect 客户端请求连接(认证前)
func (h *meshLogHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
log.Printf("[hook] CONNECT client=%s remote=%s",
safeClientID(cl), safeRemoteAddr(cl))
return nil
}
// OnSessionEstablished 客户端认证成功,session 建立
func (h *meshLogHook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) {
user := string(pk.Connect.Username)
if user == "" {
user = "(anonymous)"
}
log.Printf("[hook] CONNECTED client=%s username=%s keepalive=%d",
safeClientID(cl), user, pk.Connect.Keepalive)
}
// OnDisconnect 客户端断开(err=nil 表示主动断开,expire=true 表示 session 过期)
func (h *meshLogHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
cause := "主动断开"
if expire {
cause = "session 过期"
} else if err != nil {
cause = fmt.Sprintf("异常: %v", err)
}
log.Printf("[hook] DISCONNECT client=%s cause=%s", safeClientID(cl), cause)
}
// OnPublish 收到客户端发布的原始消息(可修改 packet 后继续)
func (h *meshLogHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
// 打印消息内容,过长截断
body := string(pk.Payload)
if len(body) > 200 {
body = body[:200] + "...(truncated)"
}
log.Printf("[hook] PUBLISH client=%s topic=%s qos=%d retain=%t payload=%s",
safeClientID(cl), pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, body)
return pk, nil
}
// OnPublished 消息已投递给所有订阅者
func (h *meshLogHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] PUBLISHED client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnSubscribe 客户端订阅请求(可修改过滤条件)
func (h *meshLogHook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
for _, sub := range pk.Filters {
log.Printf("[hook] SUBSCRIBE client=%s filter=%s qos=%d",
safeClientID(cl), sub.Filter, sub.Qos)
}
return pk
}
// OnSubscribed 订阅成功
func (h *meshLogHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
for i, sub := range pk.Filters {
code := "?"
if i < len(reasonCodes) {
code = fmt.Sprintf("%d", reasonCodes[i])
}
log.Printf("[hook] SUBSCRIBED client=%s filter=%s reason=%s",
safeClientID(cl), sub.Filter, code)
}
}
// OnUnsubscribe 客户端取消订阅
func (h *meshLogHook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
for _, sub := range pk.Filters {
log.Printf("[hook] UNSUBSCRIBE client=%s filter=%s",
safeClientID(cl), sub.Filter)
}
return pk
}
// OnUnsubscribed 取消订阅完成
func (h *meshLogHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] UNSUBSCRIBED client=%s", safeClientID(cl))
}
// OnWillSent 遗嘱消息已发送
func (h *meshLogHook) OnWillSent(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] LWT_SENT client=%s topic=%s", safeClientID(cl), pk.TopicName)
}
// OnQosComplete QoS 交付完成
func (h *meshLogHook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] QOS_COMPLETE client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnQosDropped QoS 消息超时丢弃
func (h *meshLogHook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] QOS_DROPPED client=%s topic=%s id=%d",
safeClientID(cl), pk.TopicName, pk.PacketID)
}
// OnPublishDropped 消息因客户端慢被丢弃
func (h *meshLogHook) OnPublishDropped(cl *mqtt.Client, pk packets.Packet) {
log.Printf("[hook] PUBLISH_DROPPED client=%s topic=%s",
safeClientID(cl), pk.TopicName)
}
// ---------------------------------------------------------------------------
// 辅助函数
// ---------------------------------------------------------------------------
func safeClientID(cl *mqtt.Client) string {
if cl == nil {
return "(nil)"
}
return cl.ID
}
func safeRemoteAddr(cl *mqtt.Client) string {
if cl == nil {
return "(unknown)"
}
return cl.Net.Remote
}
// 编译期接口检查
var _ mqtt.Hook = (*meshLogHook)(nil)
// ---------------------------------------------------------------------------
// meshDBHook — 将 msh/# 主题的 payload 写入数据库
// ---------------------------------------------------------------------------
// meshDBHook 拦截 msh/# 主题消息并写入 payload_log 表
type meshDBHook struct {
mqtt.HookBase
}
func (h *meshDBHook) ID() string { return "meshgo-db" }
func (h *meshDBHook) Provides(b byte) bool {
return b == mqtt.OnPublish
}
// OnPublish 收到发布消息时,检查是否为 msh/# 并异步写库
func (h *meshDBHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
// 仅记录 msh/ 开头的主题
if !strings.HasPrefix(pk.TopicName, "msh/") {
return pk, nil
}
entry := &database.PayloadLog{
Client: safeClientID(cl),
Topic: pk.TopicName,
Qos: pk.FixedHeader.Qos,
Payload: pk.Payload,
CreatedAt: time.Now().Unix(),
SenderIP: safeRemoteAddr(cl),
}
// 异步写入,不阻塞消息投递
database.Insert(entry)
log.Printf("[hook] [db] queued msh payload: client=%s topic=%s qos=%d size=%d bytes",
entry.Client, entry.Topic, entry.Qos, len(entry.Payload))
return pk, nil
}
// 编译期接口检查
var _ mqtt.Hook = (*meshDBHook)(nil)