package main import ( "bytes" "fmt" "log" "meshgo/database" "strings" "time" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/packets" ) // meshLogHook 实现所有可打印日志的 Hook 接口 type meshLogHook struct { mqtt.HookBase } func (h *meshLogHook) ID() string { return "meshgo-log" } func (h *meshLogHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnStarted, mqtt.OnStopped, mqtt.OnConnect, mqtt.OnDisconnect, mqtt.OnSessionEstablished, mqtt.OnPublish, mqtt.OnPublished, mqtt.OnSubscribe, mqtt.OnSubscribed, mqtt.OnUnsubscribe, mqtt.OnUnsubscribed, mqtt.OnWillSent, mqtt.OnQosComplete, mqtt.OnQosDropped, mqtt.OnPublishDropped, }, []byte{b}) } // OnStarted 服务启动完成 func (h *meshLogHook) OnStarted() { log.Println("[hook] ✓ 服务已启动") } // OnStopped 服务停止 func (h *meshLogHook) OnStopped() { log.Println("[hook] ✓ 服务已停止") } // OnConnect 客户端请求连接(认证前) func (h *meshLogHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error { log.Printf("[hook] CONNECT client=%s remote=%s", safeClientID(cl), safeRemoteAddr(cl)) return nil } // OnSessionEstablished 客户端认证成功,session 建立 func (h *meshLogHook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) { user := string(pk.Connect.Username) if user == "" { user = "(anonymous)" } log.Printf("[hook] CONNECTED client=%s username=%s keepalive=%d", safeClientID(cl), user, pk.Connect.Keepalive) } // OnDisconnect 客户端断开(err=nil 表示主动断开,expire=true 表示 session 过期) func (h *meshLogHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { cause := "主动断开" if expire { cause = "session 过期" } else if err != nil { cause = fmt.Sprintf("异常: %v", err) } log.Printf("[hook] DISCONNECT client=%s cause=%s", safeClientID(cl), cause) } // OnPublish 收到客户端发布的原始消息(可修改 packet 后继续) func (h *meshLogHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { // 打印消息内容,过长截断 body := string(pk.Payload) if len(body) > 200 { body = body[:200] + "...(truncated)" } log.Printf("[hook] PUBLISH client=%s topic=%s qos=%d retain=%t payload=%s", safeClientID(cl), pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, body) return pk, nil } // OnPublished 消息已投递给所有订阅者 func (h *meshLogHook) OnPublished(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] PUBLISHED client=%s topic=%s id=%d", safeClientID(cl), pk.TopicName, pk.PacketID) } // OnSubscribe 客户端订阅请求(可修改过滤条件) func (h *meshLogHook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { for _, sub := range pk.Filters { log.Printf("[hook] SUBSCRIBE client=%s filter=%s qos=%d", safeClientID(cl), sub.Filter, sub.Qos) } return pk } // OnSubscribed 订阅成功 func (h *meshLogHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) { for i, sub := range pk.Filters { code := "?" if i < len(reasonCodes) { code = fmt.Sprintf("%d", reasonCodes[i]) } log.Printf("[hook] SUBSCRIBED client=%s filter=%s reason=%s", safeClientID(cl), sub.Filter, code) } } // OnUnsubscribe 客户端取消订阅 func (h *meshLogHook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { for _, sub := range pk.Filters { log.Printf("[hook] UNSUBSCRIBE client=%s filter=%s", safeClientID(cl), sub.Filter) } return pk } // OnUnsubscribed 取消订阅完成 func (h *meshLogHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] UNSUBSCRIBED client=%s", safeClientID(cl)) } // OnWillSent 遗嘱消息已发送 func (h *meshLogHook) OnWillSent(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] LWT_SENT client=%s topic=%s", safeClientID(cl), pk.TopicName) } // OnQosComplete QoS 交付完成 func (h *meshLogHook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] QOS_COMPLETE client=%s topic=%s id=%d", safeClientID(cl), pk.TopicName, pk.PacketID) } // OnQosDropped QoS 消息超时丢弃 func (h *meshLogHook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] QOS_DROPPED client=%s topic=%s id=%d", safeClientID(cl), pk.TopicName, pk.PacketID) } // OnPublishDropped 消息因客户端慢被丢弃 func (h *meshLogHook) OnPublishDropped(cl *mqtt.Client, pk packets.Packet) { log.Printf("[hook] PUBLISH_DROPPED client=%s topic=%s", safeClientID(cl), pk.TopicName) } // --------------------------------------------------------------------------- // 辅助函数 // --------------------------------------------------------------------------- func safeClientID(cl *mqtt.Client) string { if cl == nil { return "(nil)" } return cl.ID } func safeRemoteAddr(cl *mqtt.Client) string { if cl == nil { return "(unknown)" } return cl.Net.Remote } // 编译期接口检查 var _ mqtt.Hook = (*meshLogHook)(nil) // --------------------------------------------------------------------------- // meshDBHook — 将 msh/# 主题的 payload 写入数据库 // --------------------------------------------------------------------------- // meshDBHook 拦截 msh/# 主题消息并写入 payload_log 表 type meshDBHook struct { mqtt.HookBase } func (h *meshDBHook) ID() string { return "meshgo-db" } func (h *meshDBHook) Provides(b byte) bool { return b == mqtt.OnPublish } // OnPublish 收到发布消息时,检查是否为 msh/# 并异步写库 func (h *meshDBHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { // 仅记录 msh/ 开头的主题 if !strings.HasPrefix(pk.TopicName, "msh/") { return pk, nil } entry := &database.PayloadLog{ Client: safeClientID(cl), Topic: pk.TopicName, Qos: pk.FixedHeader.Qos, Payload: pk.Payload, CreatedAt: time.Now().Unix(), SenderIP: safeRemoteAddr(cl), } // 异步写入,不阻塞消息投递 database.Insert(entry) log.Printf("[hook] [db] queued msh payload: client=%s topic=%s qos=%d size=%d bytes", entry.Client, entry.Topic, entry.Qos, len(entry.Payload)) return pk, nil } // 编译期接口检查 var _ mqtt.Hook = (*meshDBHook)(nil)