commit 6228441a23de6ed0313b706393485c82f270fbfb Author: kevin Date: Tue Jun 2 23:12:42 2026 +0800 Signed-off-by: kevin diff --git a/py/mqtt_nodeinfo_subscriber.py b/py/mqtt_nodeinfo_subscriber.py new file mode 100644 index 0000000..3cdef3e --- /dev/null +++ b/py/mqtt_nodeinfo_subscriber.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +"""Subscribe to a Meshtastic MQTT broker and print public/decoded node info. + +This helper is intended for MQTT brokers and channels you are authorized to +monitor. Encrypted mesh packets are decrypted when they match the configured +channel PSK; packets that cannot be decrypted are reported as metadata. + +Dependencies: + pip install paho-mqtt meshtastic protobuf cryptography + +Example: + python pytest/mqtt_nodeinfo_subscriber.py + python pytest/mqtt_nodeinfo_subscriber.py --topic 'msh/US/#' +""" + +from __future__ import annotations + +import argparse +import base64 +import json +import sys +from typing import Any + +import paho.mqtt.client as mqtt +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from google.protobuf.message import DecodeError +from meshtastic.protobuf import mesh_pb2, mqtt_pb2, portnums_pb2 + + +DEFAULT_HOST = "mqtt.meshtastic.org" +DEFAULT_USERNAME = "meshdev" +DEFAULT_PASSWORD = "large4cats" +DEFAULT_PSK = "AQ==" +DEFAULT_TOPICS = ("msh/US/#",) +ANSI_GREEN_BG_WHITE_TEXT = "\033[42;37m" +ANSI_RESET = "\033[0m" +DEFAULT_MESHTASTIC_PSK = bytes( + [0xD4, 0xF1, 0xBB, 0x3A, 0x20, 0x29, 0x07, 0x59, 0xF0, 0xBC, 0xFF, 0xAB, 0xCF, 0x4E, 0x69, 0x01] +) + + +def node_num_to_id(node_num: int) -> str: + return f"!{node_num:08x}" + + +def enum_name(enum_type: Any, value: int) -> str | int: + try: + return enum_type.Name(value) + except ValueError: + return value + + +def xor_hash(data: bytes) -> int: + result = 0 + for byte in data: + result ^= byte + return result + + +def expand_psk(psk_base64: str) -> bytes: + psk = base64.b64decode(psk_base64) + if len(psk) == 1: + psk_index = psk[0] + if psk_index == 0: + return b"" + key = bytearray(DEFAULT_MESHTASTIC_PSK) + key[-1] = (key[-1] + psk_index - 1) & 0xFF + return bytes(key) + if 0 < len(psk) < 16: + return psk.ljust(16, b"\x00") + if 16 < len(psk) < 32: + return psk.ljust(32, b"\x00") + return psk + + +def channel_hash(channel_name: str, key: bytes) -> int: + return xor_hash(channel_name.encode()) ^ xor_hash(key) + + +def decrypt_aes_ctr(key: bytes, from_num: int, packet_id: int, ciphertext: bytes) -> bytes: + nonce = bytearray(16) + nonce[0:8] = packet_id.to_bytes(8, "little") + nonce[8:12] = from_num.to_bytes(4, "little") + cipher = Cipher(algorithms.AES(key), modes.CTR(bytes(nonce))) + decryptor = cipher.decryptor() + return decryptor.update(ciphertext) + decryptor.finalize() + + +def try_decrypt_packet(packet: mesh_pb2.MeshPacket, channel_id: str, key: bytes) -> tuple[mesh_pb2.MeshPacket | None, str]: + if not key: + return None, "psk disables encryption" + if packet.channel != channel_hash(channel_id, key): + return None, "channel hash mismatch" + + plaintext = decrypt_aes_ctr(key, mesh_packet_from_field(packet), packet.id, packet.encrypted) + decoded = mesh_pb2.Data() + try: + decoded.ParseFromString(plaintext) + except DecodeError as exc: + return None, f"decrypted bytes are not Data protobuf: {exc}" + if decoded.portnum == portnums_pb2.UNKNOWN_APP: + return None, "decrypted protobuf has UNKNOWN_APP portnum" + + decrypted_packet = mesh_pb2.MeshPacket() + decrypted_packet.CopyFrom(packet) + decrypted_packet.ClearField("encrypted") + decrypted_packet.decoded.CopyFrom(decoded) + return decrypted_packet, "success" + + +def mesh_packet_from_field(packet: mesh_pb2.MeshPacket) -> int: + # The protobuf field is named "from" in proto, but generated Python exposes + # it as "from" via getattr because "from" is a Python keyword. + return getattr(packet, "from") + + +def decode_user(packet: mesh_pb2.MeshPacket) -> dict[str, Any]: + user = mesh_pb2.User() + user.ParseFromString(packet.decoded.payload) + + return { + "type": "nodeinfo", + "from": node_num_to_id(mesh_packet_from_field(packet)), + "from_num": mesh_packet_from_field(packet), + "user_id": user.id, + "long_name": user.long_name, + "short_name": user.short_name, + "hw_model": enum_name(mesh_pb2.HardwareModel, user.hw_model), + "role": enum_name(mesh_pb2.Config.DeviceConfig.Role, user.role), + "is_licensed": user.is_licensed, + "public_key": user.public_key.hex() if user.public_key else None, + } + + +def decode_map_report(packet: mesh_pb2.MeshPacket) -> dict[str, Any]: + report = mqtt_pb2.MapReport() + report.ParseFromString(packet.decoded.payload) + + return { + "type": "map_report", + "from": node_num_to_id(mesh_packet_from_field(packet)), + "from_num": mesh_packet_from_field(packet), + "long_name": report.long_name, + "short_name": report.short_name, + "role": enum_name(mesh_pb2.Config.DeviceConfig.Role, report.role), + "hw_model": enum_name(mesh_pb2.HardwareModel, report.hw_model), + "firmware_version": report.firmware_version, + "region": enum_name(mesh_pb2.Config.LoRaConfig.RegionCode, report.region), + "modem_preset": enum_name(mesh_pb2.Config.LoRaConfig.ModemPreset, report.modem_preset), + "latitude": report.latitude_i * 1e-7 if report.latitude_i else None, + "longitude": report.longitude_i * 1e-7 if report.longitude_i else None, + "altitude": report.altitude, + "position_precision": report.position_precision, + "num_online_local_nodes": report.num_online_local_nodes, + "has_opted_report_location": report.has_opted_report_location, + } + + +def describe_packet(topic: str, env: mqtt_pb2.ServiceEnvelope, key: bytes) -> dict[str, Any]: + packet = env.packet + from_num = mesh_packet_from_field(packet) + payload_variant = packet.WhichOneof("payload_variant") + + base = { + "topic": topic, + "channel_id": env.channel_id, + "gateway_id": env.gateway_id, + "packet_from": node_num_to_id(from_num), + "packet_from_num": from_num, + "packet_to": node_num_to_id(packet.to), + "packet_to_num": packet.to, + "packet_id": packet.id, + "payload_variant": payload_variant, + "via_mqtt": packet.via_mqtt, + "pki_encrypted": packet.pki_encrypted, + } + + if payload_variant == "encrypted": + decrypted_packet, decrypt_status = try_decrypt_packet(packet, env.channel_id, key) + if decrypted_packet is None: + return { + **base, + "type": "encrypted_packet", + "encrypted_len": len(packet.encrypted), + "decrypt_success": False, + "decrypt_status": decrypt_status, + } + + decrypted_env = mqtt_pb2.ServiceEnvelope() + decrypted_env.CopyFrom(env) + decrypted_env.packet.CopyFrom(decrypted_packet) + decrypted = describe_packet(topic, decrypted_env, key) + decrypted["payload_variant"] = "decoded" + decrypted["decrypt_success"] = True + decrypted["decrypt_status"] = decrypt_status + return decrypted + + if payload_variant != "decoded": + return {**base, "type": "empty_packet"} + + portnum = packet.decoded.portnum + decoded_base = { + **base, + "portnum": enum_name(portnums_pb2.PortNum, portnum), + "payload_len": len(packet.decoded.payload), + } + + if portnum == portnums_pb2.NODEINFO_APP: + return {**decoded_base, **decode_user(packet)} + + if portnum == portnums_pb2.MAP_REPORT_APP: + return {**decoded_base, **decode_map_report(packet)} + + return {**decoded_base, "type": "decoded_packet"} + + +def print_json(record: dict[str, Any]) -> None: + text = json.dumps(record, ensure_ascii=False, sort_keys=True) + if record.get("decrypt_success") is True: + text = f"{ANSI_GREEN_BG_WHITE_TEXT}{text}{ANSI_RESET}" + print(text, flush=True) + + +def on_connect(client: mqtt.Client, userdata: argparse.Namespace, flags: Any, reason_code: Any, properties: Any = None) -> None: + print_json({"event": "connected", "reason_code": str(reason_code)}) + for topic in userdata.topics: + client.subscribe(topic, qos=userdata.qos) + print_json({"event": "subscribed", "topic": topic, "qos": userdata.qos}) + + +def on_message(client: mqtt.Client, userdata: argparse.Namespace, msg: mqtt.MQTTMessage) -> None: + try: + env = mqtt_pb2.ServiceEnvelope() + env.ParseFromString(msg.payload) + print_json(describe_packet(msg.topic, env, userdata.key)) + except DecodeError as exc: + print_json({"topic": msg.topic, "error": f"protobuf decode failed: {exc}", "payload_len": len(msg.payload)}) + except Exception as exc: # Keep the subscriber alive while reporting malformed packets. + print_json({"topic": msg.topic, "error": str(exc), "payload_len": len(msg.payload)}) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Subscribe to Meshtastic MQTT and print decoded public node info as JSONL.") + parser.add_argument("--host", default=DEFAULT_HOST, help="MQTT broker hostname") + parser.add_argument("--port", type=int, default=1883, help="MQTT broker port") + parser.add_argument("--username", default=DEFAULT_USERNAME, help="MQTT username") + parser.add_argument("--password", default=DEFAULT_PASSWORD, help="MQTT password") + parser.add_argument("--psk", default=DEFAULT_PSK, help="Base64 channel PSK used to try decrypting encrypted packets") + parser.add_argument( + "--topic", + action="append", + dest="topics", + help="Topic to subscribe; may be repeated. Defaults to msh/US/#", + ) + parser.add_argument("--qos", type=int, default=0, choices=(0, 1, 2), help="MQTT subscription QoS") + parser.add_argument("--client-id", default="meshtastic-nodeinfo-subscriber", help="MQTT client id") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + if not args.topics: + args.topics = list(DEFAULT_TOPICS) + args.key = expand_psk(args.psk) + + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=args.client_id) + client.user_data_set(args) + client.on_connect = on_connect + client.on_message = on_message + + if args.username is not None: + client.username_pw_set(args.username, args.password) + + client.connect(args.host, args.port, keepalive=60) + client.loop_forever() + return 0 + + +if __name__ == "__main__": + sys.exit(main())