更新数据包处理
This commit is contained in:
+468
-2
@@ -8,6 +8,7 @@ import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"unicode/utf8"
|
||||
|
||||
"google.golang.org/protobuf/encoding/protowire"
|
||||
@@ -80,6 +81,38 @@ type mapReport struct {
|
||||
HasOptedReportLocation bool
|
||||
}
|
||||
|
||||
type positionInfo struct {
|
||||
LatitudeI *int32
|
||||
LongitudeI *int32
|
||||
Altitude *int32
|
||||
Time uint32
|
||||
LocationSource uint64
|
||||
AltitudeSource uint64
|
||||
Timestamp uint32
|
||||
TimestampMillisAdjust int32
|
||||
AltitudeHAE *int32
|
||||
AltitudeGeoidalSeparation *int32
|
||||
PDOP uint32
|
||||
HDOP uint32
|
||||
VDOP uint32
|
||||
GPSAccuracy uint32
|
||||
GroundSpeed *uint32
|
||||
GroundTrack *uint32
|
||||
FixQuality uint32
|
||||
FixType uint32
|
||||
SatsInView uint32
|
||||
SensorID uint32
|
||||
NextUpdate uint32
|
||||
SeqNumber uint32
|
||||
PrecisionBits uint32
|
||||
}
|
||||
|
||||
type telemetryInfo struct {
|
||||
Time uint32
|
||||
Type string
|
||||
Metrics map[string]any
|
||||
}
|
||||
|
||||
// MQTTPP 处理一个 MQTT 原始 payload,返回合规状态、原始数据和解码后的 JSON。
|
||||
// 第一个返回值表示数据是否合规;第二个返回值在不合规时为 nil;第三个返回值是解码结果 JSON。
|
||||
func MQTTPP(topic string, raw []byte, key []byte) (bool, []byte, []byte) {
|
||||
@@ -315,6 +348,243 @@ func parseMapReport(payload []byte) (*mapReport, error) {
|
||||
return report, err
|
||||
}
|
||||
|
||||
// parsePosition 解析 POSITION_APP 的 Position payload。
|
||||
func parsePosition(payload []byte) (*positionInfo, error) {
|
||||
position := &positionInfo{}
|
||||
err := walkFields(payload, func(num protowire.Number, typ protowire.Type, value any) error {
|
||||
switch num {
|
||||
case 1:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
position.LatitudeI = int32Ptr(int32(v))
|
||||
}
|
||||
case 2:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
position.LongitudeI = int32Ptr(int32(v))
|
||||
}
|
||||
case 3:
|
||||
if typ == protowire.VarintType {
|
||||
position.Altitude = int32Ptr(int32(varintValue(typ, value)))
|
||||
}
|
||||
case 4:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
position.Time = v
|
||||
}
|
||||
case 5:
|
||||
position.LocationSource = varintValue(typ, value)
|
||||
case 6:
|
||||
position.AltitudeSource = varintValue(typ, value)
|
||||
case 7:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
position.Timestamp = v
|
||||
}
|
||||
case 8:
|
||||
if typ == protowire.VarintType {
|
||||
position.TimestampMillisAdjust = int32(varintValue(typ, value))
|
||||
}
|
||||
case 9:
|
||||
if typ == protowire.VarintType {
|
||||
position.AltitudeHAE = int32Ptr(decodeZigZag32(varintValue(typ, value)))
|
||||
}
|
||||
case 10:
|
||||
if typ == protowire.VarintType {
|
||||
position.AltitudeGeoidalSeparation = int32Ptr(decodeZigZag32(varintValue(typ, value)))
|
||||
}
|
||||
case 11:
|
||||
position.PDOP = uint32(varintValue(typ, value))
|
||||
case 12:
|
||||
position.HDOP = uint32(varintValue(typ, value))
|
||||
case 13:
|
||||
position.VDOP = uint32(varintValue(typ, value))
|
||||
case 14:
|
||||
position.GPSAccuracy = uint32(varintValue(typ, value))
|
||||
case 15:
|
||||
position.GroundSpeed = uint32Ptr(uint32(varintValue(typ, value)))
|
||||
case 16:
|
||||
position.GroundTrack = uint32Ptr(uint32(varintValue(typ, value)))
|
||||
case 17:
|
||||
position.FixQuality = uint32(varintValue(typ, value))
|
||||
case 18:
|
||||
position.FixType = uint32(varintValue(typ, value))
|
||||
case 19:
|
||||
position.SatsInView = uint32(varintValue(typ, value))
|
||||
case 20:
|
||||
position.SensorID = uint32(varintValue(typ, value))
|
||||
case 21:
|
||||
position.NextUpdate = uint32(varintValue(typ, value))
|
||||
case 22:
|
||||
position.SeqNumber = uint32(varintValue(typ, value))
|
||||
case 23:
|
||||
position.PrecisionBits = uint32(varintValue(typ, value))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return position, err
|
||||
}
|
||||
|
||||
// parseTelemetry 解析 TELEMETRY_APP 的 Telemetry payload 和具体 telemetry variant。
|
||||
func parseTelemetry(payload []byte) (*telemetryInfo, error) {
|
||||
telemetry := &telemetryInfo{}
|
||||
err := walkFields(payload, func(num protowire.Number, typ protowire.Type, value any) error {
|
||||
switch num {
|
||||
case 1:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
telemetry.Time = v
|
||||
}
|
||||
case 2:
|
||||
telemetry.Type = "device_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, deviceMetricFields)
|
||||
case 3:
|
||||
telemetry.Type = "environment_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, environmentMetricFields)
|
||||
case 4:
|
||||
telemetry.Type = "air_quality_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, airQualityMetricFields)
|
||||
case 5:
|
||||
telemetry.Type = "power_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, powerMetricFields)
|
||||
case 6:
|
||||
telemetry.Type = "local_stats"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, localStatsFields)
|
||||
case 7:
|
||||
telemetry.Type = "health_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, healthMetricFields)
|
||||
case 8:
|
||||
telemetry.Type = "host_metrics"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, hostMetricFields)
|
||||
case 9:
|
||||
telemetry.Type = "traffic_management_stats"
|
||||
telemetry.Metrics = parseMetricBytes(typ, value, trafficManagementFields)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return telemetry, err
|
||||
}
|
||||
|
||||
type metricKind int
|
||||
|
||||
const (
|
||||
metricUint32 metricKind = iota
|
||||
metricUint64
|
||||
metricInt32
|
||||
metricFloat32
|
||||
metricString
|
||||
metricRepeatedFloat32
|
||||
)
|
||||
|
||||
type metricField struct {
|
||||
Name string
|
||||
Kind metricKind
|
||||
}
|
||||
|
||||
// parseMetricBytes 按字段定义表解析 telemetry variant 的指标字段。
|
||||
func parseMetricBytes(typ protowire.Type, value any, fields map[protowire.Number]metricField) map[string]any {
|
||||
metrics := map[string]any{}
|
||||
payload, ok := value.([]byte)
|
||||
if !ok || typ != protowire.BytesType {
|
||||
return metrics
|
||||
}
|
||||
_ = walkFields(payload, func(num protowire.Number, typ protowire.Type, value any) error {
|
||||
field, ok := fields[num]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
switch field.Kind {
|
||||
case metricUint32:
|
||||
metrics[field.Name] = uint32(varintValue(typ, value))
|
||||
case metricUint64:
|
||||
metrics[field.Name] = varintValue(typ, value)
|
||||
case metricInt32:
|
||||
metrics[field.Name] = int32(varintValue(typ, value))
|
||||
case metricFloat32:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
metrics[field.Name] = float64(math.Float32frombits(v))
|
||||
}
|
||||
case metricString:
|
||||
metrics[field.Name] = stringBytes(typ, value)
|
||||
case metricRepeatedFloat32:
|
||||
if v, ok := value.(uint32); ok && typ == protowire.Fixed32Type {
|
||||
appendMetric(metrics, field.Name, float64(math.Float32frombits(v)))
|
||||
}
|
||||
if payload, ok := value.([]byte); ok && typ == protowire.BytesType {
|
||||
for len(payload) > 0 {
|
||||
v, n := protowire.ConsumeFixed32(payload)
|
||||
if n < 0 {
|
||||
break
|
||||
}
|
||||
appendMetric(metrics, field.Name, float64(math.Float32frombits(v)))
|
||||
payload = payload[n:]
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return metrics
|
||||
}
|
||||
|
||||
// appendMetric 追加 repeated telemetry 字段值。
|
||||
func appendMetric(metrics map[string]any, name string, value any) {
|
||||
if existing, ok := metrics[name]; ok {
|
||||
metrics[name] = append(existing.([]any), value)
|
||||
return
|
||||
}
|
||||
metrics[name] = []any{value}
|
||||
}
|
||||
|
||||
// int32Ptr 返回 int32 指针,用于记录 proto optional 字段是否出现。
|
||||
func int32Ptr(v int32) *int32 {
|
||||
return &v
|
||||
}
|
||||
|
||||
// uint32Ptr 返回 uint32 指针,用于记录 proto optional 字段是否出现。
|
||||
func uint32Ptr(v uint32) *uint32 {
|
||||
return &v
|
||||
}
|
||||
|
||||
// decodeZigZag32 解码 protobuf sint32 的 zig-zag 编码。
|
||||
func decodeZigZag32(v uint64) int32 {
|
||||
return int32((v >> 1) ^ uint64(-int64(v&1)))
|
||||
}
|
||||
|
||||
// optionalInt32 把 optional int32 指针转换成 JSON 可表达的值。
|
||||
func optionalInt32(v *int32) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return *v
|
||||
}
|
||||
|
||||
// optionalUint32 把 optional uint32 指针转换成 JSON 可表达的值。
|
||||
func optionalUint32(v *uint32) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return *v
|
||||
}
|
||||
|
||||
// optionalCoordinate 把 Meshtastic 1e-7 度坐标转换成浮点经纬度。
|
||||
func optionalCoordinate(v *int32) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return float64(*v) * 1e-7
|
||||
}
|
||||
|
||||
// optionalDegrees100 把 1/100 度单位转换成度。
|
||||
func optionalDegrees100(v *uint32) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return float64(*v) / 100
|
||||
}
|
||||
|
||||
// dopValue 把 1/100 精度因子转换成浮点值,未设置时返回 nil。
|
||||
func dopValue(v uint32) any {
|
||||
if v == 0 {
|
||||
return nil
|
||||
}
|
||||
return float64(v) / 100
|
||||
}
|
||||
|
||||
// walkFields 遍历 protobuf wire 字段,并把字段号、类型和值交给回调处理。
|
||||
func walkFields(payload []byte, handle func(protowire.Number, protowire.Type, any) error) error {
|
||||
for len(payload) > 0 {
|
||||
@@ -454,9 +724,17 @@ func describePacket(topic string, env *serviceEnvelope, key []byte) (map[string]
|
||||
case textMessageApp:
|
||||
return merge(decodedBase, decodeTextMessage(packet)), nil
|
||||
case positionApp:
|
||||
return merge(decodedBase, map[string]any{"type": "position"}), nil
|
||||
record, err := decodePosition(packet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return merge(decodedBase, record), nil
|
||||
case telemetryApp:
|
||||
return merge(decodedBase, map[string]any{"type": "telemetry"}), nil
|
||||
record, err := decodeTelemetry(packet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return merge(decodedBase, record), nil
|
||||
case routingApp:
|
||||
return merge(decodedBase, map[string]any{"type": "routing"}), nil
|
||||
case tracerouteApp:
|
||||
@@ -555,6 +833,60 @@ func decodeMapReport(packet *meshPacket) (map[string]any, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// decodePosition 将 POSITION_APP payload 解码为位置 JSON 字段。
|
||||
func decodePosition(packet *meshPacket) (map[string]any, error) {
|
||||
position, err := parsePosition(packet.Decoded.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"type": "position",
|
||||
"from": nodeNumToID(packet.From),
|
||||
"from_num": packet.From,
|
||||
"latitude": optionalCoordinate(position.LatitudeI),
|
||||
"longitude": optionalCoordinate(position.LongitudeI),
|
||||
"altitude": optionalInt32(position.Altitude),
|
||||
"time": position.Time,
|
||||
"location_source": enumName(locationSourceNames, position.LocationSource),
|
||||
"altitude_source": enumName(altitudeSourceNames, position.AltitudeSource),
|
||||
"timestamp": position.Timestamp,
|
||||
"timestamp_millis_adjust": position.TimestampMillisAdjust,
|
||||
"altitude_hae": optionalInt32(position.AltitudeHAE),
|
||||
"altitude_geoidal_separation": optionalInt32(position.AltitudeGeoidalSeparation),
|
||||
"pdop": dopValue(position.PDOP),
|
||||
"hdop": dopValue(position.HDOP),
|
||||
"vdop": dopValue(position.VDOP),
|
||||
"gps_accuracy": position.GPSAccuracy,
|
||||
"ground_speed": optionalUint32(position.GroundSpeed),
|
||||
"ground_track": optionalDegrees100(position.GroundTrack),
|
||||
"fix_quality": position.FixQuality,
|
||||
"fix_type": position.FixType,
|
||||
"sats_in_view": position.SatsInView,
|
||||
"sensor_id": position.SensorID,
|
||||
"next_update": position.NextUpdate,
|
||||
"seq_number": position.SeqNumber,
|
||||
"precision_bits": position.PrecisionBits,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// decodeTelemetry 将 TELEMETRY_APP payload 解码为遥测 JSON 字段。
|
||||
func decodeTelemetry(packet *meshPacket) (map[string]any, error) {
|
||||
telemetry, err := parseTelemetry(packet.Decoded.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"type": "telemetry",
|
||||
"from": nodeNumToID(packet.From),
|
||||
"from_num": packet.From,
|
||||
"time": telemetry.Time,
|
||||
"telemetry_type": telemetry.Type,
|
||||
"metrics": telemetry.Metrics,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// decodeTextMessage 将 TEXT_MESSAGE_APP payload 解码为聊天文本 JSON 字段。
|
||||
func decodeTextMessage(packet *meshPacket) map[string]any {
|
||||
text := string(packet.Decoded.Payload)
|
||||
@@ -624,6 +956,140 @@ func enumName(names map[uint64]string, value uint64) any {
|
||||
return value
|
||||
}
|
||||
|
||||
var locationSourceNames = map[uint64]string{
|
||||
0: "LOC_UNSET",
|
||||
1: "LOC_MANUAL",
|
||||
2: "LOC_INTERNAL",
|
||||
3: "LOC_EXTERNAL",
|
||||
}
|
||||
|
||||
var altitudeSourceNames = map[uint64]string{
|
||||
0: "ALT_UNSET",
|
||||
1: "ALT_MANUAL",
|
||||
2: "ALT_INTERNAL",
|
||||
3: "ALT_EXTERNAL",
|
||||
4: "ALT_BAROMETRIC",
|
||||
}
|
||||
|
||||
var deviceMetricFields = map[protowire.Number]metricField{
|
||||
1: {"battery_level", metricUint32},
|
||||
2: {"voltage", metricFloat32},
|
||||
3: {"channel_utilization", metricFloat32},
|
||||
4: {"air_util_tx", metricFloat32},
|
||||
5: {"uptime_seconds", metricUint32},
|
||||
}
|
||||
|
||||
var environmentMetricFields = map[protowire.Number]metricField{
|
||||
1: {"temperature", metricFloat32},
|
||||
2: {"relative_humidity", metricFloat32},
|
||||
3: {"barometric_pressure", metricFloat32},
|
||||
4: {"gas_resistance", metricFloat32},
|
||||
5: {"voltage", metricFloat32},
|
||||
6: {"current", metricFloat32},
|
||||
7: {"iaq", metricUint32},
|
||||
8: {"distance", metricFloat32},
|
||||
9: {"lux", metricFloat32},
|
||||
10: {"white_lux", metricFloat32},
|
||||
11: {"ir_lux", metricFloat32},
|
||||
12: {"uv_lux", metricFloat32},
|
||||
13: {"wind_direction", metricUint32},
|
||||
14: {"wind_speed", metricFloat32},
|
||||
15: {"weight", metricFloat32},
|
||||
16: {"wind_gust", metricFloat32},
|
||||
17: {"wind_lull", metricFloat32},
|
||||
18: {"radiation", metricFloat32},
|
||||
19: {"rainfall_1h", metricFloat32},
|
||||
20: {"rainfall_24h", metricFloat32},
|
||||
21: {"soil_moisture", metricUint32},
|
||||
22: {"soil_temperature", metricFloat32},
|
||||
23: {"one_wire_temperature", metricRepeatedFloat32},
|
||||
}
|
||||
|
||||
var airQualityMetricFields = map[protowire.Number]metricField{
|
||||
1: {"pm10_standard", metricUint32},
|
||||
2: {"pm25_standard", metricUint32},
|
||||
3: {"pm100_standard", metricUint32},
|
||||
4: {"pm10_environmental", metricUint32},
|
||||
5: {"pm25_environmental", metricUint32},
|
||||
6: {"pm100_environmental", metricUint32},
|
||||
7: {"particles_03um", metricUint32},
|
||||
8: {"particles_05um", metricUint32},
|
||||
9: {"particles_10um", metricUint32},
|
||||
10: {"particles_25um", metricUint32},
|
||||
11: {"particles_50um", metricUint32},
|
||||
12: {"particles_100um", metricUint32},
|
||||
13: {"co2", metricUint32},
|
||||
14: {"co2_temperature", metricFloat32},
|
||||
15: {"co2_humidity", metricFloat32},
|
||||
16: {"form_formaldehyde", metricFloat32},
|
||||
17: {"form_humidity", metricFloat32},
|
||||
18: {"form_temperature", metricFloat32},
|
||||
19: {"pm40_standard", metricUint32},
|
||||
20: {"particles_40um", metricUint32},
|
||||
21: {"pm_temperature", metricFloat32},
|
||||
22: {"pm_humidity", metricFloat32},
|
||||
23: {"pm_voc_idx", metricFloat32},
|
||||
24: {"pm_nox_idx", metricFloat32},
|
||||
25: {"particles_tps", metricFloat32},
|
||||
}
|
||||
|
||||
var powerMetricFields = map[protowire.Number]metricField{
|
||||
1: {"ch1_voltage", metricFloat32}, 2: {"ch1_current", metricFloat32},
|
||||
3: {"ch2_voltage", metricFloat32}, 4: {"ch2_current", metricFloat32},
|
||||
5: {"ch3_voltage", metricFloat32}, 6: {"ch3_current", metricFloat32},
|
||||
7: {"ch4_voltage", metricFloat32}, 8: {"ch4_current", metricFloat32},
|
||||
9: {"ch5_voltage", metricFloat32}, 10: {"ch5_current", metricFloat32},
|
||||
11: {"ch6_voltage", metricFloat32}, 12: {"ch6_current", metricFloat32},
|
||||
13: {"ch7_voltage", metricFloat32}, 14: {"ch7_current", metricFloat32},
|
||||
15: {"ch8_voltage", metricFloat32}, 16: {"ch8_current", metricFloat32},
|
||||
}
|
||||
|
||||
var localStatsFields = map[protowire.Number]metricField{
|
||||
1: {"uptime_seconds", metricUint32},
|
||||
2: {"channel_utilization", metricFloat32},
|
||||
3: {"air_util_tx", metricFloat32},
|
||||
4: {"num_packets_tx", metricUint32},
|
||||
5: {"num_packets_rx", metricUint32},
|
||||
6: {"num_packets_rx_bad", metricUint32},
|
||||
7: {"num_online_nodes", metricUint32},
|
||||
8: {"num_total_nodes", metricUint32},
|
||||
9: {"num_rx_dupe", metricUint32},
|
||||
10: {"num_tx_relay", metricUint32},
|
||||
11: {"num_tx_relay_canceled", metricUint32},
|
||||
12: {"heap_total_bytes", metricUint32},
|
||||
13: {"heap_free_bytes", metricUint32},
|
||||
14: {"num_tx_dropped", metricUint32},
|
||||
15: {"noise_floor", metricInt32},
|
||||
}
|
||||
|
||||
var healthMetricFields = map[protowire.Number]metricField{
|
||||
1: {"heart_bpm", metricUint32},
|
||||
2: {"spO2", metricUint32},
|
||||
3: {"temperature", metricFloat32},
|
||||
}
|
||||
|
||||
var hostMetricFields = map[protowire.Number]metricField{
|
||||
1: {"uptime_seconds", metricUint32},
|
||||
2: {"freemem_bytes", metricUint64},
|
||||
3: {"diskfree1_bytes", metricUint64},
|
||||
4: {"diskfree2_bytes", metricUint64},
|
||||
5: {"diskfree3_bytes", metricUint64},
|
||||
6: {"load1", metricUint32},
|
||||
7: {"load5", metricUint32},
|
||||
8: {"load15", metricUint32},
|
||||
9: {"user_string", metricString},
|
||||
}
|
||||
|
||||
var trafficManagementFields = map[protowire.Number]metricField{
|
||||
1: {"packets_inspected", metricUint32},
|
||||
2: {"position_dedup_drops", metricUint32},
|
||||
3: {"nodeinfo_cache_hits", metricUint32},
|
||||
4: {"rate_limit_drops", metricUint32},
|
||||
5: {"unknown_packet_drops", metricUint32},
|
||||
6: {"hop_exhausted_packets", metricUint32},
|
||||
7: {"router_hops_preserved", metricUint32},
|
||||
}
|
||||
|
||||
var portNumNames = map[uint64]string{
|
||||
0: "UNKNOWN_APP", 1: "TEXT_MESSAGE_APP", 2: "REMOTE_HARDWARE_APP", 3: "POSITION_APP", 4: "NODEINFO_APP",
|
||||
5: "ROUTING_APP", 6: "ADMIN_APP", 7: "TEXT_MESSAGE_COMPRESSED_APP", 8: "WAYPOINT_APP", 9: "AUDIO_APP",
|
||||
|
||||
Reference in New Issue
Block a user