- 支持 TCP / WebSocket 监听,配置热重载,systemd 集成 - meshAuthHook 实现用户名/密码认证与 ACL - meshLogHook 打印所有 MQTT 事件(CONNECT/PUBLISH/SUBSCRIBE 等) - meshDBHook 将 msh/# 主题 payload 异步写入数据库 - 数据库支持 SQLite(默认)和 MySQL,自动初始化并补充缺失配置 - payload_log 表字段:ID、client、topic、qos、payload、created_at、sender_ip - 自动补充 config.yaml 缺失字段(文件存在时写回) - .gitignore 屏蔽 data/ 和 .workbuddy/
218 lines
6.1 KiB
Go
218 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"log"
|
|
"meshgo/database"
|
|
"strings"
|
|
"time"
|
|
|
|
mqtt "github.com/mochi-mqtt/server/v2"
|
|
"github.com/mochi-mqtt/server/v2/packets"
|
|
)
|
|
|
|
// meshLogHook 实现所有可打印日志的 Hook 接口
|
|
type meshLogHook struct {
|
|
mqtt.HookBase
|
|
}
|
|
|
|
func (h *meshLogHook) ID() string { return "meshgo-log" }
|
|
|
|
func (h *meshLogHook) Provides(b byte) bool {
|
|
return bytes.Contains([]byte{
|
|
mqtt.OnStarted,
|
|
mqtt.OnStopped,
|
|
mqtt.OnConnect,
|
|
mqtt.OnDisconnect,
|
|
mqtt.OnSessionEstablished,
|
|
mqtt.OnPublish,
|
|
mqtt.OnPublished,
|
|
mqtt.OnSubscribe,
|
|
mqtt.OnSubscribed,
|
|
mqtt.OnUnsubscribe,
|
|
mqtt.OnUnsubscribed,
|
|
mqtt.OnWillSent,
|
|
mqtt.OnQosComplete,
|
|
mqtt.OnQosDropped,
|
|
mqtt.OnPublishDropped,
|
|
}, []byte{b})
|
|
}
|
|
|
|
// OnStarted 服务启动完成
|
|
func (h *meshLogHook) OnStarted() {
|
|
log.Println("[hook] ✓ 服务已启动")
|
|
}
|
|
|
|
// OnStopped 服务停止
|
|
func (h *meshLogHook) OnStopped() {
|
|
log.Println("[hook] ✓ 服务已停止")
|
|
}
|
|
|
|
// OnConnect 客户端请求连接(认证前)
|
|
func (h *meshLogHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
|
log.Printf("[hook] CONNECT client=%s remote=%s",
|
|
safeClientID(cl), safeRemoteAddr(cl))
|
|
return nil
|
|
}
|
|
|
|
// OnSessionEstablished 客户端认证成功,session 建立
|
|
func (h *meshLogHook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) {
|
|
user := string(pk.Connect.Username)
|
|
if user == "" {
|
|
user = "(anonymous)"
|
|
}
|
|
log.Printf("[hook] CONNECTED client=%s username=%s keepalive=%d",
|
|
safeClientID(cl), user, pk.Connect.Keepalive)
|
|
}
|
|
|
|
// OnDisconnect 客户端断开(err=nil 表示主动断开,expire=true 表示 session 过期)
|
|
func (h *meshLogHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
|
|
cause := "主动断开"
|
|
if expire {
|
|
cause = "session 过期"
|
|
} else if err != nil {
|
|
cause = fmt.Sprintf("异常: %v", err)
|
|
}
|
|
log.Printf("[hook] DISCONNECT client=%s cause=%s", safeClientID(cl), cause)
|
|
}
|
|
|
|
// OnPublish 收到客户端发布的原始消息(可修改 packet 后继续)
|
|
func (h *meshLogHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
|
|
// 打印消息内容,过长截断
|
|
body := string(pk.Payload)
|
|
if len(body) > 200 {
|
|
body = body[:200] + "...(truncated)"
|
|
}
|
|
log.Printf("[hook] PUBLISH client=%s topic=%s qos=%d retain=%t payload=%s",
|
|
safeClientID(cl), pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, body)
|
|
return pk, nil
|
|
}
|
|
|
|
// OnPublished 消息已投递给所有订阅者
|
|
func (h *meshLogHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] PUBLISHED client=%s topic=%s id=%d",
|
|
safeClientID(cl), pk.TopicName, pk.PacketID)
|
|
}
|
|
|
|
// OnSubscribe 客户端订阅请求(可修改过滤条件)
|
|
func (h *meshLogHook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
|
|
for _, sub := range pk.Filters {
|
|
log.Printf("[hook] SUBSCRIBE client=%s filter=%s qos=%d",
|
|
safeClientID(cl), sub.Filter, sub.Qos)
|
|
}
|
|
return pk
|
|
}
|
|
|
|
// OnSubscribed 订阅成功
|
|
func (h *meshLogHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
|
|
for i, sub := range pk.Filters {
|
|
code := "?"
|
|
if i < len(reasonCodes) {
|
|
code = fmt.Sprintf("%d", reasonCodes[i])
|
|
}
|
|
log.Printf("[hook] SUBSCRIBED client=%s filter=%s reason=%s",
|
|
safeClientID(cl), sub.Filter, code)
|
|
}
|
|
}
|
|
|
|
// OnUnsubscribe 客户端取消订阅
|
|
func (h *meshLogHook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet {
|
|
for _, sub := range pk.Filters {
|
|
log.Printf("[hook] UNSUBSCRIBE client=%s filter=%s",
|
|
safeClientID(cl), sub.Filter)
|
|
}
|
|
return pk
|
|
}
|
|
|
|
// OnUnsubscribed 取消订阅完成
|
|
func (h *meshLogHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] UNSUBSCRIBED client=%s", safeClientID(cl))
|
|
}
|
|
|
|
// OnWillSent 遗嘱消息已发送
|
|
func (h *meshLogHook) OnWillSent(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] LWT_SENT client=%s topic=%s", safeClientID(cl), pk.TopicName)
|
|
}
|
|
|
|
// OnQosComplete QoS 交付完成
|
|
func (h *meshLogHook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] QOS_COMPLETE client=%s topic=%s id=%d",
|
|
safeClientID(cl), pk.TopicName, pk.PacketID)
|
|
}
|
|
|
|
// OnQosDropped QoS 消息超时丢弃
|
|
func (h *meshLogHook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] QOS_DROPPED client=%s topic=%s id=%d",
|
|
safeClientID(cl), pk.TopicName, pk.PacketID)
|
|
}
|
|
|
|
// OnPublishDropped 消息因客户端慢被丢弃
|
|
func (h *meshLogHook) OnPublishDropped(cl *mqtt.Client, pk packets.Packet) {
|
|
log.Printf("[hook] PUBLISH_DROPPED client=%s topic=%s",
|
|
safeClientID(cl), pk.TopicName)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 辅助函数
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func safeClientID(cl *mqtt.Client) string {
|
|
if cl == nil {
|
|
return "(nil)"
|
|
}
|
|
return cl.ID
|
|
}
|
|
|
|
func safeRemoteAddr(cl *mqtt.Client) string {
|
|
if cl == nil {
|
|
return "(unknown)"
|
|
}
|
|
return cl.Net.Remote
|
|
}
|
|
|
|
// 编译期接口检查
|
|
var _ mqtt.Hook = (*meshLogHook)(nil)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// meshDBHook — 将 msh/# 主题的 payload 写入数据库
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// meshDBHook 拦截 msh/# 主题消息并写入 payload_log 表
|
|
type meshDBHook struct {
|
|
mqtt.HookBase
|
|
}
|
|
|
|
func (h *meshDBHook) ID() string { return "meshgo-db" }
|
|
|
|
func (h *meshDBHook) Provides(b byte) bool {
|
|
return b == mqtt.OnPublish
|
|
}
|
|
|
|
// OnPublish 收到发布消息时,检查是否为 msh/# 并异步写库
|
|
func (h *meshDBHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
|
|
// 仅记录 msh/ 开头的主题
|
|
if !strings.HasPrefix(pk.TopicName, "msh/") {
|
|
return pk, nil
|
|
}
|
|
|
|
entry := &database.PayloadLog{
|
|
Client: safeClientID(cl),
|
|
Topic: pk.TopicName,
|
|
Qos: pk.FixedHeader.Qos,
|
|
Payload: pk.Payload,
|
|
CreatedAt: time.Now().Unix(),
|
|
SenderIP: safeRemoteAddr(cl),
|
|
}
|
|
|
|
// 异步写入,不阻塞消息投递
|
|
database.Insert(entry)
|
|
log.Printf("[hook] [db] queued msh payload: client=%s topic=%s qos=%d size=%d bytes",
|
|
entry.Client, entry.Topic, entry.Qos, len(entry.Payload))
|
|
|
|
return pk, nil
|
|
}
|
|
|
|
// 编译期接口检查
|
|
var _ mqtt.Hook = (*meshDBHook)(nil)
|