diff --git a/README.md b/README.md index 9493d89..1fdd255 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,17 @@ # Meshtastic MQTT Server +本程序启动一个本地 MQTT broker,并在转发客户端发布的消息前校验 Meshtastic MQTT payload。 + +每条传入的 `PUBLISH` 都会先进入: + +```go +valid, _, record := mqtpp.MQTTPP(topic, payload, key) +``` + +- `valid == true`:保留原始 topic、payload、QoS、retain 等字段,正常转发给订阅匹配 topic 的客户端 +- `valid == false`:丢弃该消息,不转发给订阅客户端 + +当前不桥接到 `mqtt.meshtastic.org` 等上游 broker。 ## 运行 @@ -7,39 +19,58 @@ go run . ``` -默认连接: +默认监听: -- broker:`mqtt.meshtastic.org:1883` -- username:`meshdev` -- password:`large4cats` -- topic:`msh/US/#` +- host:`0.0.0.0` +- port:`1883` - PSK:`AQ==` -也可以指定 topic: +也可以指定监听地址和 PSK: ```bash -go run . --topic 'msh/US/#' -``` - -多个 topic 可重复传入: - -```bash -go run . --topic 'msh/US/#' --topic 'msh/EU_868/#' +go run . --host 127.0.0.1 --port 1883 --psk AQ== ``` ## 参数 ```text ---host MQTT broker hostname ---port MQTT broker port ---username MQTT username ---password MQTT password ---psk Base64 channel PSK used to try decrypting encrypted packets ---topic Topic to subscribe; may be repeated ---qos MQTT subscription QoS: 0, 1, or 2 ---client-id MQTT client id +--host MQTT broker listen host +--port MQTT broker listen port +--psk Base64 channel PSK used to try decrypting encrypted packets ``` +## 转发规则 + +程序监听所有传入 publish。payload 能被 `mqtpp.MQTTPP` 解析时,认为 `valid == true`,broker 会继续把原始 MQTT 消息转发给订阅者;解析失败时,认为 `valid == false`,broker 会拒绝并丢弃该 publish。 + +`empty_packet` 仍然属于 `valid == true`,会被转发;只是控制台默认不显示它。 + +无法解密但能解析的加密包通常会输出为 `encrypted_packet`,仍然属于 `valid == true`,因此会被转发。 + +## 本地验证 + +一个终端启动 broker: + +```bash +go run . --host 127.0.0.1 --port 1883 --psk AQ== +``` + +另一个终端订阅: + +```bash +mosquitto_sub -h 127.0.0.1 -p 1883 -t '#' +``` + +发布非法 payload: + +```bash +mosquitto_pub -h 127.0.0.1 -p 1883 -t 'msh/US/test' -m 'not protobuf' +``` + +订阅端应该收不到该消息。 + +要验证 valid 消息转发,请使用真实 Meshtastic MQTT payload 发布到本 broker;订阅匹配 topic 的客户端应收到原始消息,broker 控制台会打印解析后的 `record`。 + ## 控制台颜色说明 程序会按数据包类型使用不同背景色,方便快速区分消息类型。 diff --git a/go.mod b/go.mod index e76ceb5..9c7dd90 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,12 @@ module meshtastic_mqtt_server go 1.23 require ( - github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/mochi-mqtt/server/v2 v2.7.9 google.golang.org/protobuf v1.36.11 ) require ( github.com/gorilla/websocket v1.5.3 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect + github.com/rs/xid v1.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4226624..a2b9fe3 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,20 @@ -github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= -github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= +github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index d826d0c..d3814ab 100644 --- a/main.go +++ b/main.go @@ -1,25 +1,25 @@ package main import ( - "encoding/json" "flag" "fmt" + "net" "os" "os/signal" + "strconv" "syscall" - "time" "meshtastic_mqtt_server/mqtpp" - mqtt "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + "github.com/mochi-mqtt/server/v2/listeners" + "github.com/mochi-mqtt/server/v2/packets" ) const ( - defaultHost = "mqtt.meshtastic.org" - defaultUsername = "meshdev" - defaultPassword = "large4cats" - defaultPSK = "AQ==" - defaultTopic = "msh/US/#" + defaultHost = "0.0.0.0" + defaultPSK = "AQ==" ansiGreenBGWhiteText = "\033[42;37m" ansiBlueBGWhiteText = "\033[44;37m" @@ -32,35 +32,41 @@ const ( ) type config struct { - host string - port int - username string - password string - psk string - topics topicsFlag - qos int - clientID string - key []byte + host string + port int + psk string + key []byte } -type topicsFlag []string +type meshtasticFilterHook struct { + mqtt.HookBase + key []byte +} -// String 将已配置的 topic 列表转换为字符串,供 flag 包显示默认值或帮助信息。 -func (t *topicsFlag) String() string { - if t == nil { - return "" +// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。 +func (h *meshtasticFilterHook) ID() string { + return "meshtastic-filter" +} + +// Provides 声明该 hook 只处理客户端发布消息。 +func (h *meshtasticFilterHook) Provides(b byte) bool { + return b == mqtt.OnPublish +} + +// OnPublish 在 broker 转发消息前校验 payload;无效消息会被拒绝并丢弃。 +func (h *meshtasticFilterHook) OnPublish(_ *mqtt.Client, pk packets.Packet) (packets.Packet, error) { + valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key) + if !valid { + return pk, packets.ErrRejectPacket } - b, _ := json.Marshal([]string(*t)) - return string(b) + + if record["type"] != "empty_packet" { + printJSON(record) + } + return pk, nil } -// Set 追加一个 --topic 参数值,支持命令行重复传入多个订阅主题。 -func (t *topicsFlag) Set(value string) error { - *t = append(*t, value) - return nil -} - -// main 是程序入口,负责解析参数并启动 MQTT 订阅流程。 +// main 是程序入口,负责解析参数并启动 MQTT broker。 func main() { cfg, err := parseArgs() if err != nil { @@ -76,22 +82,11 @@ func main() { // parseArgs 解析命令行参数,并展开 Meshtastic channel PSK。 func parseArgs() (*config, error) { cfg := &config{} - flag.StringVar(&cfg.host, "host", defaultHost, "MQTT broker hostname") - flag.IntVar(&cfg.port, "port", 1883, "MQTT broker port") - flag.StringVar(&cfg.username, "username", defaultUsername, "MQTT username") - flag.StringVar(&cfg.password, "password", defaultPassword, "MQTT password") + flag.StringVar(&cfg.host, "host", defaultHost, "MQTT broker listen host") + flag.IntVar(&cfg.port, "port", 1883, "MQTT broker listen port") flag.StringVar(&cfg.psk, "psk", defaultPSK, "Base64 channel PSK used to try decrypting encrypted packets") - flag.Var(&cfg.topics, "topic", "Topic to subscribe; may be repeated. Defaults to msh/US/#") - flag.IntVar(&cfg.qos, "qos", 0, "MQTT subscription QoS (0, 1, or 2)") - flag.StringVar(&cfg.clientID, "client-id", "meshtastic-nodeinfo-subscriber", "MQTT client id") flag.Parse() - if len(cfg.topics) == 0 { - cfg.topics = topicsFlag{defaultTopic} - } - if cfg.qos < 0 || cfg.qos > 2 { - return nil, fmt.Errorf("invalid qos %d: must be 0, 1, or 2", cfg.qos) - } key, err := mqtpp.ExpandPSK(cfg.psk) if err != nil { return nil, err @@ -100,62 +95,30 @@ func parseArgs() (*config, error) { return cfg, nil } -// run 创建 MQTT 客户端,连接 broker,订阅 topic,并阻塞等待退出信号。 +// run 创建 MQTT broker,监听传入 publish,并阻塞等待退出信号。 func run(cfg *config) error { - opts := mqtt.NewClientOptions() - opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.host, cfg.port)) - opts.SetClientID(cfg.clientID) - opts.SetKeepAlive(60 * time.Second) - if cfg.username != "" { - opts.SetUsername(cfg.username) - opts.SetPassword(cfg.password) - } - - opts.OnConnect = func(client mqtt.Client) { - printJSON(map[string]any{"event": "connected", "reason_code": "0"}) - for _, topic := range cfg.topics { - token := client.Subscribe(topic, byte(cfg.qos), handleMessage(cfg.key)) - token.Wait() - if err := token.Error(); err != nil { - printJSON(map[string]any{"event": "subscribe_error", "topic": topic, "qos": cfg.qos, "error": err.Error()}) - continue - } - printJSON(map[string]any{"event": "subscribed", "topic": topic, "qos": cfg.qos}) - } - } - - client := mqtt.NewClient(opts) - token := client.Connect() - token.Wait() - if err := token.Error(); err != nil { + server := mqtt.New(nil) + if err := server.AddHook(new(auth.AllowHook), nil); err != nil { return err } + if err := server.AddHook(&meshtasticFilterHook{key: cfg.key}, nil); err != nil { + return err + } + + addr := net.JoinHostPort(cfg.host, strconv.Itoa(cfg.port)) + listener := listeners.NewTCP(listeners.Config{ID: "tcp", Address: addr}) + if err := server.AddListener(listener); err != nil { + return err + } + if err := server.Serve(); err != nil { + return err + } + printJSON(map[string]any{"event": "broker_started", "address": addr}) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) <-sigCh - client.Disconnect(250) - return nil -} - -// handleMessage 返回 MQTT 消息回调,把原始 payload 交给 MQTTPP 处理后按类型输出。 -func handleMessage(key []byte) mqtt.MessageHandler { - return func(_ mqtt.Client, msg mqtt.Message) { - valid, _, decodedJSON := mqtpp.MQTTPP(msg.Topic(), msg.Payload(), key) - if !valid || len(decodedJSON) == 0 { - return - } - - var record map[string]any - if err := json.Unmarshal(decodedJSON, &record); err != nil { - printJSON(map[string]any{"topic": msg.Topic(), "error": "json decode failed: " + err.Error(), "payload_len": len(msg.Payload())}) - return - } - if record["type"] == "empty_packet" { - return - } - printJSONBytes(record, decodedJSON) - } + return server.Close() } // printJSON 将记录编码为 JSON 后按数据包类型着色输出。 diff --git a/mqtpp/mqtpp.go b/mqtpp/mqtpp.go index 157e055..659d1be 100644 --- a/mqtpp/mqtpp.go +++ b/mqtpp/mqtpp.go @@ -113,22 +113,21 @@ type telemetryInfo struct { Metrics map[string]any } -// MQTTPP 处理一个 MQTT 原始 payload,返回合规状态、原始数据和解码后的 JSON。 -// 第一个返回值表示数据是否合规;第二个返回值在不合规时为 nil;第三个返回值是解码结果 JSON。 -func MQTTPP(topic string, raw []byte, key []byte) (bool, []byte, []byte) { - if !isCompliantMQTTPacket(raw) { - return false, nil, nil - } +// MQTTPP 处理一个 MQTT 原始 payload,返回合规状态、原始数据和解码后的记录。 +// 第一个返回值表示数据是否合规;第二个返回值在不合规时为 nil;第三个返回值是解码结果记录。 +func MQTTPP(topic string, raw []byte, key []byte) (bool, []byte, map[string]any) { env, err := parseServiceEnvelope(raw) if err != nil { - return true, raw, MustJSON(map[string]any{"topic": topic, "error": "protobuf decode failed: " + err.Error(), "payload_len": len(raw)}) + //解包失败 + return false, nil, map[string]any{"topic": topic, "error": "protobuf decode failed: " + err.Error(), "payload_len": len(raw)} } record, err := describePacket(topic, env, key) if err != nil { - return true, raw, MustJSON(map[string]any{"topic": topic, "error": err.Error(), "payload_len": len(raw)}) + //解码失败 + return false, nil, map[string]any{"topic": topic, "error": err.Error(), "payload_len": len(raw)} } - return true, raw, MustJSON(record) + return true, raw, record } // ExpandPSK 展开 Base64 PSK,兼容 Meshtastic 默认索引 PSK 和短 key 补零规则。