将mqtt客户端改成mqtt服务端

This commit is contained in:
2026-06-03 11:52:55 +08:00
parent e56262c2d3
commit 618bde456a
5 changed files with 135 additions and 132 deletions
+56 -93
View File
@@ -1,25 +1,25 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"meshtastic_mqtt_server/mqtpp"
mqtt "github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
const (
defaultHost = "mqtt.meshtastic.org"
defaultUsername = "meshdev"
defaultPassword = "large4cats"
defaultPSK = "AQ=="
defaultTopic = "msh/US/#"
defaultHost = "0.0.0.0"
defaultPSK = "AQ=="
ansiGreenBGWhiteText = "\033[42;37m"
ansiBlueBGWhiteText = "\033[44;37m"
@@ -32,35 +32,41 @@ const (
)
type config struct {
host string
port int
username string
password string
psk string
topics topicsFlag
qos int
clientID string
key []byte
host string
port int
psk string
key []byte
}
type topicsFlag []string
type meshtasticFilterHook struct {
mqtt.HookBase
key []byte
}
// String 将已配置的 topic 列表转换为字符串,供 flag 包显示默认值或帮助信息
func (t *topicsFlag) String() string {
if t == nil {
return ""
// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称
func (h *meshtasticFilterHook) ID() string {
return "meshtastic-filter"
}
// Provides 声明该 hook 只处理客户端发布消息。
func (h *meshtasticFilterHook) Provides(b byte) bool {
return b == mqtt.OnPublish
}
// OnPublish 在 broker 转发消息前校验 payload;无效消息会被拒绝并丢弃。
func (h *meshtasticFilterHook) OnPublish(_ *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key)
if !valid {
return pk, packets.ErrRejectPacket
}
b, _ := json.Marshal([]string(*t))
return string(b)
if record["type"] != "empty_packet" {
printJSON(record)
}
return pk, nil
}
// Set 追加一个 --topic 参数值,支持命令行重复传入多个订阅主题
func (t *topicsFlag) Set(value string) error {
*t = append(*t, value)
return nil
}
// main 是程序入口,负责解析参数并启动 MQTT 订阅流程。
// main 是程序入口,负责解析参数并启动 MQTT broker
func main() {
cfg, err := parseArgs()
if err != nil {
@@ -76,22 +82,11 @@ func main() {
// parseArgs 解析命令行参数,并展开 Meshtastic channel PSK。
func parseArgs() (*config, error) {
cfg := &config{}
flag.StringVar(&cfg.host, "host", defaultHost, "MQTT broker hostname")
flag.IntVar(&cfg.port, "port", 1883, "MQTT broker port")
flag.StringVar(&cfg.username, "username", defaultUsername, "MQTT username")
flag.StringVar(&cfg.password, "password", defaultPassword, "MQTT password")
flag.StringVar(&cfg.host, "host", defaultHost, "MQTT broker listen host")
flag.IntVar(&cfg.port, "port", 1883, "MQTT broker listen port")
flag.StringVar(&cfg.psk, "psk", defaultPSK, "Base64 channel PSK used to try decrypting encrypted packets")
flag.Var(&cfg.topics, "topic", "Topic to subscribe; may be repeated. Defaults to msh/US/#")
flag.IntVar(&cfg.qos, "qos", 0, "MQTT subscription QoS (0, 1, or 2)")
flag.StringVar(&cfg.clientID, "client-id", "meshtastic-nodeinfo-subscriber", "MQTT client id")
flag.Parse()
if len(cfg.topics) == 0 {
cfg.topics = topicsFlag{defaultTopic}
}
if cfg.qos < 0 || cfg.qos > 2 {
return nil, fmt.Errorf("invalid qos %d: must be 0, 1, or 2", cfg.qos)
}
key, err := mqtpp.ExpandPSK(cfg.psk)
if err != nil {
return nil, err
@@ -100,62 +95,30 @@ func parseArgs() (*config, error) {
return cfg, nil
}
// run 创建 MQTT 客户端,连接 broker,订阅 topic,并阻塞等待退出信号。
// run 创建 MQTT broker,监听传入 publish,并阻塞等待退出信号。
func run(cfg *config) error {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.host, cfg.port))
opts.SetClientID(cfg.clientID)
opts.SetKeepAlive(60 * time.Second)
if cfg.username != "" {
opts.SetUsername(cfg.username)
opts.SetPassword(cfg.password)
}
opts.OnConnect = func(client mqtt.Client) {
printJSON(map[string]any{"event": "connected", "reason_code": "0"})
for _, topic := range cfg.topics {
token := client.Subscribe(topic, byte(cfg.qos), handleMessage(cfg.key))
token.Wait()
if err := token.Error(); err != nil {
printJSON(map[string]any{"event": "subscribe_error", "topic": topic, "qos": cfg.qos, "error": err.Error()})
continue
}
printJSON(map[string]any{"event": "subscribed", "topic": topic, "qos": cfg.qos})
}
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if err := token.Error(); err != nil {
server := mqtt.New(nil)
if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
return err
}
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key}, nil); err != nil {
return err
}
addr := net.JoinHostPort(cfg.host, strconv.Itoa(cfg.port))
listener := listeners.NewTCP(listeners.Config{ID: "tcp", Address: addr})
if err := server.AddListener(listener); err != nil {
return err
}
if err := server.Serve(); err != nil {
return err
}
printJSON(map[string]any{"event": "broker_started", "address": addr})
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
client.Disconnect(250)
return nil
}
// handleMessage 返回 MQTT 消息回调,把原始 payload 交给 MQTTPP 处理后按类型输出。
func handleMessage(key []byte) mqtt.MessageHandler {
return func(_ mqtt.Client, msg mqtt.Message) {
valid, _, decodedJSON := mqtpp.MQTTPP(msg.Topic(), msg.Payload(), key)
if !valid || len(decodedJSON) == 0 {
return
}
var record map[string]any
if err := json.Unmarshal(decodedJSON, &record); err != nil {
printJSON(map[string]any{"topic": msg.Topic(), "error": "json decode failed: " + err.Error(), "payload_len": len(msg.Payload())})
return
}
if record["type"] == "empty_packet" {
return
}
printJSONBytes(record, decodedJSON)
}
return server.Close()
}
// printJSON 将记录编码为 JSON 后按数据包类型着色输出。