From 67330d4656c31e8dab423c92c8c0201f4b444536 Mon Sep 17 00:00:00 2001 From: kevin Date: Sun, 14 Jun 2026 19:56:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=BA=E5=99=A8=E4=BA=BAac?= =?UTF-8?q?k?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot_service.go | 156 ++++++++++++++++++++++++++++++++++++++++++ main.go | 21 +++--- mqtpp/builder.go | 48 +++++++++++++ mqtpp/builder_test.go | 34 +++++++++ mqtpp/mqtpp.go | 6 ++ mqtpp/pki.go | 61 ++++++++++++++++- mqtpp/pki_test.go | 64 +++++++++++++++++ 7 files changed, 381 insertions(+), 9 deletions(-) diff --git a/bot_service.go b/bot_service.go index 7bc859f..4731417 100644 --- a/bot_service.go +++ b/bot_service.go @@ -52,6 +52,162 @@ func (s *botService) StartNodeInfoBroadcaster(ctx context.Context) { go s.runNodeInfoBroadcaster(ctx) } +// MaybeAutoAck 在收到一个发往本地 bot 的 want_ack 包时,回送一个 Routing-NONE ACK。 +// +// 与固件 ReliableRouter::sniffReceived 行为对齐: +// - PKI 加密包(pki_encrypted=true)用 X25519+AES-CCM 加密 ACK,channel_id="PKI" +// - 其它情况用原 channel + bot PSK 加密 +// +// 解析失败、目标不是受管 bot、或缺少必要的密钥时,安静返回不报错——这条路径只是“尽力”。 +func (s *botService) MaybeAutoAck(record map[string]any) { + if s == nil || s.store == nil || s.server == nil || record == nil { + return + } + wantAck, _ := record["want_ack"].(bool) + if !wantAck { + return + } + toNum, ok := uint32FromRecord(record["packet_to_num"]) + if !ok || toNum == 0 || toNum == mqtpp.NodeNumBroadcast { + return + } + fromNum, ok := uint32FromRecord(record["packet_from_num"]) + if !ok || fromNum == 0 { + return + } + requestID, ok := uint32FromRecord(record["packet_id"]) + if !ok || requestID == 0 { + return + } + bot, err := s.store.GetBotNodeByNodeNum(int64(toNum)) + if err != nil || bot == nil || !bot.Enabled { + return + } + pkiEncrypted, _ := record["pki_encrypted"].(bool) + channelID, _ := record["channel_id"].(string) + + ackPacketID, err := randomPacketID() + if err != nil { + return + } + topic := botMQTTTopic(bot.TopicPrefix, fallbackChannelID(channelID, pkiEncrypted, bot.DefaultChannelID), bot.NodeID) + + var raw []byte + if pkiEncrypted { + raw, err = s.buildPKIAck(bot, fromNum, ackPacketID, requestID) + if err == nil { + topic = botMQTTTopic(bot.TopicPrefix, mqtpp.PKIChannelID, bot.NodeID) + } + } else { + raw, err = s.buildPSKAck(bot, fromNum, ackPacketID, requestID, channelID) + } + if err != nil || raw == nil { + printJSON(map[string]any{"event": "bot_auto_ack_skipped", "bot_node_id": bot.NodeID, "to": fromNum, "request_id": requestID, "error": errString(err)}) + return + } + if err := s.server.Publish(topic, raw, false, 0); err != nil { + printJSON(map[string]any{"event": "bot_auto_ack_publish_failed", "bot_node_id": bot.NodeID, "topic": topic, "error": err.Error()}) + } +} + +func (s *botService) buildPKIAck(bot *botNodeRecord, toNum, ackPacketID, requestID uint32) ([]byte, error) { + privateKeyB64 := strings.TrimSpace(bot.PrivateKey) + if privateKeyB64 == "" { + return nil, fmt.Errorf("bot has no private key") + } + privateKey, err := base64.StdEncoding.DecodeString(privateKeyB64) + if err != nil { + return nil, err + } + senderPublic, err := decodeBotPublicKey(*bot) + if err != nil { + return nil, err + } + recipientPublic, ok := lookupNodeInfoPublicKey(s.store, toNum) + if !ok { + return nil, fmt.Errorf("recipient %s has no public key on file", mqtpp.NodeNumToID(toNum)) + } + return mqtpp.BuildPKIAckServiceEnvelope(mqtpp.PKIAckBuildOptions{ + FromNodeNum: uint32(bot.NodeNum), + ToNodeNum: toNum, + PacketID: ackPacketID, + RequestID: requestID, + GatewayID: bot.NodeID, + ViaMQTT: true, + SenderPrivate: privateKey, + RecipientPub: recipientPublic, + SenderPublic: senderPublic, + }) +} + +func (s *botService) buildPSKAck(bot *botNodeRecord, toNum, ackPacketID, requestID uint32, channelID string) ([]byte, error) { + channel := fallbackChannelID(channelID, false, bot.DefaultChannelID) + if channel == "" || channel == mqtpp.PKIChannelID { + return nil, fmt.Errorf("no channel id available for psk ack") + } + psk := strings.TrimSpace(bot.PSK) + if psk == "" { + psk = botDefaultPSK + } + key, err := mqtpp.ExpandPSK(psk) + if err != nil { + return nil, err + } + return mqtpp.BuildAckServiceEnvelope(mqtpp.AckBuildOptions{ + PacketBuildOptions: mqtpp.PacketBuildOptions{ + FromNodeNum: uint32(bot.NodeNum), + ToNodeNum: toNum, + PacketID: ackPacketID, + ChannelID: channel, + GatewayID: bot.NodeID, + PSK: key, + Encrypt: true, + ViaMQTT: true, + }, + RequestID: requestID, + }) +} + +func fallbackChannelID(channelID string, pkiEncrypted bool, defaultChannelID string) string { + channelID = strings.TrimSpace(channelID) + if pkiEncrypted { + return mqtpp.PKIChannelID + } + if channelID != "" && channelID != mqtpp.PKIChannelID { + return channelID + } + return defaultChannelID +} + +func uint32FromRecord(value any) (uint32, bool) { + switch v := value.(type) { + case uint32: + return v, true + case int: + if v >= 0 { + return uint32(v), true + } + case int64: + if v >= 0 { + return uint32(v), true + } + case uint64: + return uint32(v), true + case float64: + if v >= 0 { + return uint32(v), true + } + } + return 0, false +} + +func errString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + func (s *botService) SendText(_ context.Context, req botSendTextRequest) (*botMessageRecord, error) { if s == nil || s.store == nil { return nil, fmt.Errorf("bot service is not configured") diff --git a/main.go b/main.go index 805d810..40fffde 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,7 @@ type meshtasticFilterHook struct { blocking *blockingCache settings *runtimeSettingsCache pkiResolver func(toNodeNum, fromNodeNum uint32) ([]byte, []byte, bool) + autoAcker func(record map[string]any) } // ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。 @@ -83,6 +84,9 @@ func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (pa h.stats.IncForwarded() h.dbQueue.EnqueueRecord(record, mqttClientInfoFromClient(cl)) + if h.autoAcker != nil { + h.autoAcker(record) + } if record["type"] != "empty_packet" { printJSON(record) } @@ -225,11 +229,12 @@ func run(cfg *config) error { } messageStats := &meshtasticMessageStats{} - server, mqttAddr, err := startMQTTServer(cfg, store, dbQueue, messageStats, blocking, settings) + server, mqttHook, mqttAddr, err := startMQTTServer(cfg, store, dbQueue, messageStats, blocking, settings) if err != nil { return err } botSender := newBotService(store, server, cfg.key) + mqttHook.autoAcker = botSender.MaybeAutoAck botCtx, stopBotBroadcaster := context.WithCancel(context.Background()) defer stopBotBroadcaster() botSender.StartNodeInfoBroadcaster(botCtx) @@ -289,10 +294,10 @@ func run(cfg *config) error { return runErr } -func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, string, error) { +func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, *meshtasticFilterHook, string, error) { server := mqtt.New(&mqtt.Options{InlineClient: true}) if err := server.AddHook(new(auth.AllowHook), nil); err != nil { - return nil, "", err + return nil, nil, "", err } hook := &meshtasticFilterHook{ key: cfg.key, @@ -303,23 +308,23 @@ func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *me pkiResolver: newPKIKeyResolver(store), } if err := server.AddHook(hook, nil); err != nil { - return nil, "", err + return nil, nil, "", err } addr := net.JoinHostPort(cfg.MQTT.Host, strconv.Itoa(cfg.MQTT.Port)) tlsConfig, err := buildTLSConfig(cfg.MQTT.TLS) if err != nil { - return nil, "", err + return nil, nil, "", err } listener := listeners.NewTCP(listeners.Config{ID: "tcp", Address: addr, TLSConfig: tlsConfig}) if err := server.AddListener(listener); err != nil { - return nil, "", err + return nil, nil, "", err } if err := server.Serve(); err != nil { - return nil, "", err + return nil, nil, "", err } printJSON(map[string]any{"event": "broker_started", "address": addr, "tls": cfg.MQTT.TLS.Enabled}) - return server, addr, nil + return server, hook, addr, nil } // printJSON 将记录编码为 JSON 后按数据包类型着色输出。 diff --git a/mqtpp/builder.go b/mqtpp/builder.go index 40c7dbe..a41345e 100644 --- a/mqtpp/builder.go +++ b/mqtpp/builder.go @@ -38,6 +38,13 @@ type NodeInfoBuildOptions struct { PublicKey []byte } +// AckBuildOptions 描述构造一个 Routing-NONE ACK(PSK 频道路径)所需字段。 +// RequestID 是被 ACK 原始包的 packet_id(写入 Data.request_id, tag 6)。 +type AckBuildOptions struct { + PacketBuildOptions + RequestID uint32 +} + func BuildTextMessageServiceEnvelope(opts TextMessageBuildOptions) ([]byte, error) { if opts.FromNodeNum == 0 { return nil, fmt.Errorf("from node number is required") @@ -91,6 +98,47 @@ func BuildNodeInfoServiceEnvelope(opts NodeInfoBuildOptions) ([]byte, error) { return buildServiceEnvelope(packet, opts.ChannelID, opts.GatewayID), nil } +// BuildAckServiceEnvelope 构造一个 PSK 频道上的 Routing ACK(error_reason=NONE)。 +// 与固件 MeshModule::allocAckNak/Router::sendAckNak 行为对齐: +// - portnum = ROUTING_APP(5) +// - Data.request_id = 原始包 ID +// - Routing.which_variant = error_reason,error_reason = NONE(0) +// +// 用 PacketBuildOptions 中 ChannelID + PSK 加密;调用方负责把 ToNodeNum 设为原 from。 +func BuildAckServiceEnvelope(opts AckBuildOptions) ([]byte, error) { + if opts.RequestID == 0 { + return nil, fmt.Errorf("ack request_id is required") + } + if opts.ChannelID == "" { + return nil, fmt.Errorf("channel id is required") + } + data := buildAckDataPacket(opts.RequestID) + packet, err := buildMeshPacket(opts.PacketBuildOptions, data) + if err != nil { + return nil, err + } + if strings.TrimSpace(opts.GatewayID) == "" { + opts.GatewayID = NodeNumToID(opts.FromNodeNum) + } + return buildServiceEnvelope(packet, opts.ChannelID, opts.GatewayID), nil +} + +// buildAckDataPacket 构造 Data { portnum=ROUTING_APP, payload=Routing{error_reason=NONE}, request_id=req }。 +func buildAckDataPacket(requestID uint32) []byte { + // Routing payload: oneof variant=error_reason(tag 3), value=NONE(0) → 0x18 0x00 + routing := protowire.AppendTag(nil, 3, protowire.VarintType) + routing = protowire.AppendVarint(routing, 0) + + var out []byte + out = protowire.AppendTag(out, 1, protowire.VarintType) + out = protowire.AppendVarint(out, uint64(routingApp)) + out = protowire.AppendTag(out, 2, protowire.BytesType) + out = protowire.AppendBytes(out, routing) + out = protowire.AppendTag(out, 6, protowire.Fixed32Type) + out = protowire.AppendFixed32(out, requestID) + return out +} + func NodeNumToID(nodeNum uint32) string { return nodeNumToID(nodeNum) } diff --git a/mqtpp/builder_test.go b/mqtpp/builder_test.go index bfc4c29..c69d5e1 100644 --- a/mqtpp/builder_test.go +++ b/mqtpp/builder_test.go @@ -167,6 +167,40 @@ func TestBuildNodeInfoTruncatesNanopbStrings(t *testing.T) { } } +func TestBuildAckServiceEnvelopeRoundTrip(t *testing.T) { + key, err := ExpandPSK("AQ==") + if err != nil { + t.Fatalf("ExpandPSK: %v", err) + } + const requestID uint32 = 0xabcd1234 + raw, err := BuildAckServiceEnvelope(AckBuildOptions{ + PacketBuildOptions: PacketBuildOptions{ + FromNodeNum: 0x10101010, + ToNodeNum: 0x20202020, + PacketID: 0x30303030, + ChannelID: "LongFast", + GatewayID: "!10101010", + PSK: key, + Encrypt: true, + ViaMQTT: true, + }, + RequestID: requestID, + }) + if err != nil { + t.Fatalf("BuildAckServiceEnvelope: %v", err) + } + valid, _, record := MQTTPP("msh/2/e/LongFast/!10101010", raw, key, Options{}) + if !valid { + t.Fatalf("MQTTPP not valid: %#v", record) + } + if record["portnum"] != "ROUTING_APP" { + t.Fatalf("portnum = %v", record["portnum"]) + } + if record["type"] != "routing" { + t.Fatalf("type = %v", record["type"]) + } +} + func TestParseNodeID(t *testing.T) { num, err := ParseNodeID("!1234abcd") if err != nil { diff --git a/mqtpp/mqtpp.go b/mqtpp/mqtpp.go index 3ece21d..a34f881 100644 --- a/mqtpp/mqtpp.go +++ b/mqtpp/mqtpp.go @@ -53,6 +53,7 @@ type meshPacket struct { Decoded *dataPacket Encrypted []byte ID uint32 + WantAck bool ViaMQTT bool PKIEncrypted bool PayloadVariant string @@ -259,6 +260,10 @@ func parseMeshPacket(payload []byte) (*meshPacket, error) { if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type { packet.ID = v } + case 10: + if v, ok := value.(uint64); ok && typ == protowire.VarintType { + packet.WantAck = v != 0 + } case 14: if v, ok := value.(uint64); ok && typ == protowire.VarintType { packet.ViaMQTT = v != 0 @@ -684,6 +689,7 @@ func describePacket(topic string, env *serviceEnvelope, key []byte, opts Options "packet_to_num": packet.To, "packet_id": packet.ID, "payload_variant": packet.PayloadVariant, + "want_ack": packet.WantAck, "via_mqtt": packet.ViaMQTT, "pki_encrypted": packet.PKIEncrypted, } diff --git a/mqtpp/pki.go b/mqtpp/pki.go index f955739..84266c1 100644 --- a/mqtpp/pki.go +++ b/mqtpp/pki.go @@ -94,7 +94,66 @@ func BuildPKITextMessageServiceEnvelope(opts PKITextMessageBuildOptions) ([]byte return buildServiceEnvelope(packet, PKIChannelID, opts.GatewayID), nil } -// pkiSharedKey 用 X25519 计算共享密钥,再做一次 SHA-256(与固件一致)。 +// PKIAckBuildOptions 描述构造一个 PKI 加密的 Routing-NONE ACK 所需字段。 +type PKIAckBuildOptions struct { + FromNodeNum uint32 // 我们(机器人)的节点号 + ToNodeNum uint32 // 原发送者 + PacketID uint32 // 新生成的 ACK 自身的 packet id + RequestID uint32 // 被 ACK 的原始包 packet id + GatewayID string + ViaMQTT bool + SenderPrivate []byte + RecipientPub []byte + SenderPublic []byte +} + +// BuildPKIAckServiceEnvelope 构造一条 PKI 加密的 Routing-NONE ACK,与固件 +// MeshModule::allocAckNak + Router::perhapsEncode (PKI 分支) 行为对齐。 +func BuildPKIAckServiceEnvelope(opts PKIAckBuildOptions) ([]byte, error) { + if opts.FromNodeNum == 0 { + return nil, fmt.Errorf("from node number is required") + } + if opts.ToNodeNum == 0 || opts.ToNodeNum == NodeNumBroadcast { + return nil, fmt.Errorf("pki ack requires a non-broadcast destination") + } + if opts.PacketID == 0 { + return nil, fmt.Errorf("packet id is required") + } + if opts.RequestID == 0 { + return nil, fmt.Errorf("request id is required") + } + if len(opts.SenderPrivate) != 32 || len(opts.RecipientPub) != 32 { + return nil, fmt.Errorf("pki keys must be 32 bytes each") + } + if strings.TrimSpace(opts.GatewayID) == "" { + opts.GatewayID = NodeNumToID(opts.FromNodeNum) + } + + plaintext := buildAckDataPacket(opts.RequestID) + + sharedKey, err := pkiSharedKey(opts.SenderPrivate, opts.RecipientPub) + if err != nil { + return nil, err + } + var extraNonceBuf [4]byte + if _, err := rand.Read(extraNonceBuf[:]); err != nil { + return nil, err + } + extraNonce := binary.LittleEndian.Uint32(extraNonceBuf[:]) + ciphertext, auth, err := aesCCMEncrypt(sharedKey, pkiNonce(opts.PacketID, opts.FromNodeNum, extraNonce), plaintext) + if err != nil { + return nil, err + } + encrypted := make([]byte, 0, len(ciphertext)+pkcOverhead) + encrypted = append(encrypted, ciphertext...) + encrypted = append(encrypted, auth...) + encrypted = append(encrypted, extraNonceBuf[:]...) + + packet := buildPKIMeshPacket(opts.FromNodeNum, opts.ToNodeNum, opts.PacketID, opts.ViaMQTT, encrypted, opts.SenderPublic) + return buildServiceEnvelope(packet, PKIChannelID, opts.GatewayID), nil +} + + func pkiSharedKey(privateKey, publicKey []byte) ([]byte, error) { curve := ecdh.X25519() priv, err := curve.NewPrivateKey(privateKey) diff --git a/mqtpp/pki_test.go b/mqtpp/pki_test.go index 002a039..016088d 100644 --- a/mqtpp/pki_test.go +++ b/mqtpp/pki_test.go @@ -207,3 +207,67 @@ func TestMQTTPPDecryptsPKIWithResolver(t *testing.T) { t.Fatalf("pki_encrypted = %v", record["pki_encrypted"]) } } + +func TestBuildPKIAckRoundTrip(t *testing.T) { + curve := ecdh.X25519() + botPriv, _ := curve.GenerateKey(rand.Reader) + devicePriv, _ := curve.GenerateKey(rand.Reader) + + const fromNum uint32 = 0x0000beef // bot + const toNum uint32 = 0xfeed0000 // 原 device + const ackPacketID uint32 = 0xaaaa5555 + const requestID uint32 = 0xdeadbeef + + raw, err := BuildPKIAckServiceEnvelope(PKIAckBuildOptions{ + FromNodeNum: fromNum, + ToNodeNum: toNum, + PacketID: ackPacketID, + RequestID: requestID, + GatewayID: NodeNumToID(fromNum), + ViaMQTT: true, + SenderPrivate: botPriv.Bytes(), + RecipientPub: devicePriv.PublicKey().Bytes(), + SenderPublic: botPriv.PublicKey().Bytes(), + }) + if err != nil { + t.Fatalf("BuildPKIAckServiceEnvelope: %v", err) + } + + // 设备侧解密 + env, err := parseServiceEnvelope(raw) + if err != nil { + t.Fatalf("parse: %v", err) + } + if env.ChannelID != PKIChannelID { + t.Fatalf("channel_id = %q", env.ChannelID) + } + pkt := env.Packet + if !pkt.PKIEncrypted || pkt.From != fromNum || pkt.To != toNum || pkt.ID != ackPacketID { + t.Fatalf("ack header mismatch: %+v", pkt) + } + encryptedLen := len(pkt.Encrypted) - pkcOverhead + cipher := pkt.Encrypted[:encryptedLen] + auth := pkt.Encrypted[encryptedLen : encryptedLen+8] + extraNonce := binary.LittleEndian.Uint32(pkt.Encrypted[encryptedLen+8:]) + sharedKey, err := pkiSharedKey(devicePriv.Bytes(), botPriv.PublicKey().Bytes()) + if err != nil { + t.Fatalf("shared: %v", err) + } + plain, err := aesCCMDecrypt(sharedKey, pkiNonce(ackPacketID, fromNum, extraNonce), cipher, auth) + if err != nil { + t.Fatalf("decrypt: %v", err) + } + data, err := parseDataPacket(plain) + if err != nil { + t.Fatalf("data: %v", err) + } + if data.Portnum != routingApp { + t.Fatalf("portnum = %d, want ROUTING_APP(%d)", data.Portnum, routingApp) + } + + // Routing payload 解析: 期望 oneof error_reason=NONE(0),即 wire 字节 0x18 0x00 + wantRouting := []byte{0x18, 0x00} + if !bytes.Equal(data.Payload, wantRouting) { + t.Fatalf("routing payload = % x, want % x", data.Payload, wantRouting) + } +}