diff --git a/README.md b/README.md index 8cc9be6..bcf887b 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,10 @@ meshtastic: - `nodeinfo_map`:融合 `type == "nodeinfo"` 和 `type == "map_report"` 的节点信息 - `text_message`:追加保存 `type == "text_message"` 的聊天消息 +- `position`:追加保存 `type == "position"` 的位置包 +- `telemetry`:追加保存 `type == "telemetry"` 的遥测包 +- `routing`:追加保存 `type == "routing"` 的路由控制包 +- `traceroute`:追加保存 `type == "traceroute"` 的路径追踪包 `nodeinfo_map` 规则: @@ -120,6 +124,15 @@ meshtastic: - 保存 `from_id`、`from_num`、`text`、`payload_hex`、topic、packet 元数据和完整 `content_json` - 保存 MQTT 客户端信息:`mqtt_client_id`、`mqtt_username`、`mqtt_listener`、`mqtt_remote_addr`、`mqtt_remote_host`、`mqtt_remote_port` +`position` / `telemetry` / `routing` / `traceroute` 规则: + +- 都使用自增 `id` 作为主键 +- 每条有效记录都会新增一行,不做去重 +- 保存通用 packet 元数据、MQTT 客户端信息和完整 `content_json` +- `position` 额外保存经纬度、海拔、时间、定位来源、精度、速度、卫星数等字段 +- `telemetry` 额外保存 `telemetry_type`,并把动态 `metrics` 对象保存为 `metrics_json` +- `routing` 和 `traceroute` 当前保存通用元数据和完整 JSON;后续如果解析更多 payload 字段,可继续扩展列 + 查询最近聊天消息示例: ```sql @@ -129,6 +142,24 @@ ORDER BY id DESC LIMIT 20; ``` +查询位置包示例: + +```sql +SELECT id, created_at, from_id, latitude, longitude, altitude +FROM position +ORDER BY id DESC +LIMIT 20; +``` + +查询遥测包示例: + +```sql +SELECT id, created_at, from_id, telemetry_type, metrics_json +FROM telemetry +ORDER BY id DESC +LIMIT 20; +``` + SQLite 默认路径: - Unix/Linux:`/srv/mesh_mqtt_go/mesh_mqtt_go.db` diff --git a/db.go b/db.go index 9933cf7..45ee6a2 100644 --- a/db.go +++ b/db.go @@ -32,6 +32,36 @@ type mqttClientInfo struct { RemotePort string } +type AppendPacketFields struct { + ID uint64 `gorm:"column:id;primaryKey;autoIncrement"` + FromID string `gorm:"column:from_id;not null;index"` + FromNum int64 `gorm:"column:from_num;not null;index"` + Topic string `gorm:"column:topic;not null"` + ChannelID *string `gorm:"column:channel_id"` + GatewayID *string `gorm:"column:gateway_id"` + PacketID *int64 `gorm:"column:packet_id;index"` + PacketTo *string `gorm:"column:packet_to"` + PacketToNum *int64 `gorm:"column:packet_to_num"` + Portnum *string `gorm:"column:portnum"` + PayloadLen *int64 `gorm:"column:payload_len"` + PayloadVariant *string `gorm:"column:payload_variant"` + ViaMQTT *bool `gorm:"column:via_mqtt"` + PKIEncrypted *bool `gorm:"column:pki_encrypted"` + DecryptSuccess *bool `gorm:"column:decrypt_success"` + DecryptStatus *string `gorm:"column:decrypt_status"` + ContentJSON string `gorm:"column:content_json;not null"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index"` +} + +type MQTTClientRecordFields struct { + MQTTClientID *string `gorm:"column:mqtt_client_id"` + MQTTUsername *string `gorm:"column:mqtt_username"` + MQTTListener *string `gorm:"column:mqtt_listener"` + MQTTRemoteAddr *string `gorm:"column:mqtt_remote_addr"` + MQTTRemoteHost *string `gorm:"column:mqtt_remote_host"` + MQTTRemotePort *string `gorm:"column:mqtt_remote_port"` +} + type nodeInfoMapRecord struct { NodeID string `gorm:"column:node_id;primaryKey;not null"` NodeNum int64 `gorm:"column:node_num;not null"` @@ -94,6 +124,68 @@ func (textMessageRecord) TableName() string { return "text_message" } +type positionRecord struct { + AppendPacketFields `gorm:"embedded"` + MQTTClientRecordFields `gorm:"embedded"` + Latitude *float64 `gorm:"column:latitude"` + Longitude *float64 `gorm:"column:longitude"` + Altitude *int64 `gorm:"column:altitude"` + PositionTime *int64 `gorm:"column:position_time"` + LocationSource *string `gorm:"column:location_source"` + AltitudeSource *string `gorm:"column:altitude_source"` + Timestamp *int64 `gorm:"column:timestamp"` + TimestampMillisAdjust *int64 `gorm:"column:timestamp_millis_adjust"` + AltitudeHAE *int64 `gorm:"column:altitude_hae"` + AltitudeGeoidalSeparation *int64 `gorm:"column:altitude_geoidal_separation"` + PDOP *float64 `gorm:"column:pdop"` + HDOP *float64 `gorm:"column:hdop"` + VDOP *float64 `gorm:"column:vdop"` + GPSAccuracy *int64 `gorm:"column:gps_accuracy"` + GroundSpeed *int64 `gorm:"column:ground_speed"` + GroundTrack *float64 `gorm:"column:ground_track"` + FixQuality *int64 `gorm:"column:fix_quality"` + FixType *int64 `gorm:"column:fix_type"` + SatsInView *int64 `gorm:"column:sats_in_view"` + SensorID *int64 `gorm:"column:sensor_id"` + NextUpdate *int64 `gorm:"column:next_update"` + SeqNumber *int64 `gorm:"column:seq_number"` + PrecisionBits *int64 `gorm:"column:precision_bits"` +} + +func (positionRecord) TableName() string { + return "position" +} + +type telemetryRecord struct { + AppendPacketFields `gorm:"embedded"` + MQTTClientRecordFields `gorm:"embedded"` + TelemetryTime *int64 `gorm:"column:telemetry_time"` + TelemetryType *string `gorm:"column:telemetry_type;index"` + MetricsJSON *string `gorm:"column:metrics_json"` +} + +func (telemetryRecord) TableName() string { + return "telemetry" +} + +type routingRecord struct { + AppendPacketFields `gorm:"embedded"` + MQTTClientRecordFields `gorm:"embedded"` +} + +func (routingRecord) TableName() string { + return "routing" +} + +type tracerouteRecord struct { + AppendPacketFields `gorm:"embedded"` + MQTTClientRecordFields `gorm:"embedded"` +} + +func (tracerouteRecord) TableName() string { + return "traceroute" +} + func openStore(cfg databaseConfig) (*store, error) { var dialector gorm.Dialector switch cfg.Driver { @@ -143,31 +235,49 @@ func (s *store) Close() error { func (s *store) migrate() error { return s.db.Transaction(func(tx *gorm.DB) error { migrator := tx.Migrator() - if !migrator.HasTable(&nodeInfoMapRecord{}) { - if err := migrator.CreateTable(&nodeInfoMapRecord{}); err != nil { - return fmt.Errorf("migrate nodeinfo_map table: %w", err) - } - } - if !migrator.HasTable(&textMessageRecord{}) { - if err := migrator.CreateTable(&textMessageRecord{}); err != nil { - return fmt.Errorf("migrate text_message table: %w", err) - } - } - for _, indexName := range []string{ - "idx_text_message_from_num_created_at", - "idx_text_message_created_at", - "idx_text_message_packet_id", + for _, item := range []struct { + label string + model any + }{ + {label: "nodeinfo_map", model: &nodeInfoMapRecord{}}, + {label: "text_message", model: &textMessageRecord{}}, + {label: "position", model: &positionRecord{}}, + {label: "telemetry", model: &telemetryRecord{}}, + {label: "routing", model: &routingRecord{}}, + {label: "traceroute", model: &tracerouteRecord{}}, } { - if !migrator.HasIndex(&textMessageRecord{}, indexName) { - if err := migrator.CreateIndex(&textMessageRecord{}, indexName); err != nil { - return fmt.Errorf("migrate text_message index %s: %w", indexName, err) + if !migrator.HasTable(item.model) { + if err := migrator.CreateTable(item.model); err != nil { + return fmt.Errorf("migrate %s table: %w", item.label, err) } } } + for _, item := range []struct { + label string + model any + indexes []string + }{ + {label: "text_message", model: &textMessageRecord{}, indexes: []string{"idx_text_message_from_num_created_at", "idx_text_message_created_at", "idx_text_message_packet_id"}}, + } { + if err := createMissingIndexes(migrator, item.model, item.label, item.indexes); err != nil { + return err + } + } return nil }) } +func createMissingIndexes(migrator gorm.Migrator, model any, label string, indexNames []string) error { + for _, indexName := range indexNames { + if !migrator.HasIndex(model, indexName) { + if err := migrator.CreateIndex(model, indexName); err != nil { + return fmt.Errorf("migrate %s index %s: %w", label, indexName, err) + } + } + } + return nil +} + func (s *store) UpsertNodeInfoMap(record map[string]any) error { node, err := nodeInfoMapFromRecord(record) if err != nil { @@ -238,6 +348,50 @@ func (s *store) InsertTextMessage(record map[string]any, clientInfo mqttClientIn return nil } +func (s *store) InsertPosition(record map[string]any, clientInfo mqttClientInfo) error { + position, err := positionFromRecord(record, clientInfo) + if err != nil { + return err + } + if err := s.db.Create(position).Error; err != nil { + return fmt.Errorf("insert position from %s: %w", position.FromID, err) + } + return nil +} + +func (s *store) InsertTelemetry(record map[string]any, clientInfo mqttClientInfo) error { + telemetry, err := telemetryFromRecord(record, clientInfo) + if err != nil { + return err + } + if err := s.db.Create(telemetry).Error; err != nil { + return fmt.Errorf("insert telemetry from %s: %w", telemetry.FromID, err) + } + return nil +} + +func (s *store) InsertRouting(record map[string]any, clientInfo mqttClientInfo) error { + routing, err := routingFromRecord(record, clientInfo) + if err != nil { + return err + } + if err := s.db.Create(routing).Error; err != nil { + return fmt.Errorf("insert routing from %s: %w", routing.FromID, err) + } + return nil +} + +func (s *store) InsertTraceroute(record map[string]any, clientInfo mqttClientInfo) error { + traceroute, err := tracerouteFromRecord(record, clientInfo) + if err != nil { + return err + } + if err := s.db.Create(traceroute).Error; err != nil { + return fmt.Errorf("insert traceroute from %s: %w", traceroute.FromID, err) + } + return nil +} + func nodeInfoMapFromRecord(record map[string]any) (*nodeInfoMapRecord, error) { latestType, ok := record["type"].(string) if !ok || (latestType != "nodeinfo" && latestType != "map_report") { @@ -285,49 +439,153 @@ func textMessageFromRecord(record map[string]any, clientInfo mqttClientInfo) (*t if !ok || recordType != "text_message" { return nil, fmt.Errorf("record type %v is not text_message", record["type"]) } + common, clientFields, err := AppendPacketFieldsFromRecord(record, "text_message", clientInfo) + if err != nil { + return nil, err + } + return &textMessageRecord{ + FromID: common.FromID, + FromNum: common.FromNum, + Text: nullableString(record["text"]), + PayloadHex: nullableString(record["payload_hex"]), + Topic: common.Topic, + ChannelID: common.ChannelID, + GatewayID: common.GatewayID, + PacketID: common.PacketID, + PacketTo: common.PacketTo, + PacketToNum: common.PacketToNum, + Portnum: common.Portnum, + PayloadLen: common.PayloadLen, + PayloadVariant: common.PayloadVariant, + ViaMQTT: common.ViaMQTT, + PKIEncrypted: common.PKIEncrypted, + DecryptSuccess: common.DecryptSuccess, + DecryptStatus: common.DecryptStatus, + MQTTClientID: clientFields.MQTTClientID, + MQTTUsername: clientFields.MQTTUsername, + MQTTListener: clientFields.MQTTListener, + MQTTRemoteAddr: clientFields.MQTTRemoteAddr, + MQTTRemoteHost: clientFields.MQTTRemoteHost, + MQTTRemotePort: clientFields.MQTTRemotePort, + ContentJSON: common.ContentJSON, + }, nil +} + +func positionFromRecord(record map[string]any, clientInfo mqttClientInfo) (*positionRecord, error) { + common, clientFields, err := AppendPacketFieldsFromRecord(record, "position", clientInfo) + if err != nil { + return nil, err + } + return &positionRecord{ + AppendPacketFields: common, + MQTTClientRecordFields: clientFields, + Latitude: nullableFloat64(record["latitude"]), + Longitude: nullableFloat64(record["longitude"]), + Altitude: nullableInt64(record["altitude"]), + PositionTime: nullableInt64(record["time"]), + LocationSource: nullableStringValue(record["location_source"]), + AltitudeSource: nullableStringValue(record["altitude_source"]), + Timestamp: nullableInt64(record["timestamp"]), + TimestampMillisAdjust: nullableInt64(record["timestamp_millis_adjust"]), + AltitudeHAE: nullableInt64(record["altitude_hae"]), + AltitudeGeoidalSeparation: nullableInt64(record["altitude_geoidal_separation"]), + PDOP: nullableFloat64(record["pdop"]), + HDOP: nullableFloat64(record["hdop"]), + VDOP: nullableFloat64(record["vdop"]), + GPSAccuracy: nullableInt64(record["gps_accuracy"]), + GroundSpeed: nullableInt64(record["ground_speed"]), + GroundTrack: nullableFloat64(record["ground_track"]), + FixQuality: nullableInt64(record["fix_quality"]), + FixType: nullableInt64(record["fix_type"]), + SatsInView: nullableInt64(record["sats_in_view"]), + SensorID: nullableInt64(record["sensor_id"]), + NextUpdate: nullableInt64(record["next_update"]), + SeqNumber: nullableInt64(record["seq_number"]), + PrecisionBits: nullableInt64(record["precision_bits"]), + }, nil +} + +func telemetryFromRecord(record map[string]any, clientInfo mqttClientInfo) (*telemetryRecord, error) { + common, clientFields, err := AppendPacketFieldsFromRecord(record, "telemetry", clientInfo) + if err != nil { + return nil, err + } + metricsJSON, err := nullableJSON(record["metrics"]) + if err != nil { + return nil, fmt.Errorf("encode telemetry metrics_json: %w", err) + } + return &telemetryRecord{ + AppendPacketFields: common, + MQTTClientRecordFields: clientFields, + TelemetryTime: nullableInt64(record["time"]), + TelemetryType: nullableString(record["telemetry_type"]), + MetricsJSON: metricsJSON, + }, nil +} + +func routingFromRecord(record map[string]any, clientInfo mqttClientInfo) (*routingRecord, error) { + common, clientFields, err := AppendPacketFieldsFromRecord(record, "routing", clientInfo) + if err != nil { + return nil, err + } + return &routingRecord{AppendPacketFields: common, MQTTClientRecordFields: clientFields}, nil +} + +func tracerouteFromRecord(record map[string]any, clientInfo mqttClientInfo) (*tracerouteRecord, error) { + common, clientFields, err := AppendPacketFieldsFromRecord(record, "traceroute", clientInfo) + if err != nil { + return nil, err + } + return &tracerouteRecord{AppendPacketFields: common, MQTTClientRecordFields: clientFields}, nil +} + +func AppendPacketFieldsFromRecord(record map[string]any, wantType string, clientInfo mqttClientInfo) (AppendPacketFields, MQTTClientRecordFields, error) { + recordType, ok := record["type"].(string) + if !ok || recordType != wantType { + return AppendPacketFields{}, MQTTClientRecordFields{}, fmt.Errorf("record type %v is not %s", record["type"], wantType) + } fromID, ok := record["from"].(string) if !ok || fromID == "" { - return nil, fmt.Errorf("text_message missing from") + return AppendPacketFields{}, MQTTClientRecordFields{}, fmt.Errorf("%s missing from", wantType) } fromNum, err := int64FromAny(record["from_num"]) if err != nil { - return nil, fmt.Errorf("text_message from_num: %w", err) + return AppendPacketFields{}, MQTTClientRecordFields{}, fmt.Errorf("%s from_num: %w", wantType, err) } topic, ok := record["topic"].(string) if !ok || topic == "" { - return nil, fmt.Errorf("text_message missing topic") + return AppendPacketFields{}, MQTTClientRecordFields{}, fmt.Errorf("%s missing topic", wantType) } contentJSON, err := json.Marshal(record) if err != nil { - return nil, fmt.Errorf("encode text_message content_json: %w", err) + return AppendPacketFields{}, MQTTClientRecordFields{}, fmt.Errorf("encode %s content_json: %w", wantType, err) } - return &textMessageRecord{ - FromID: fromID, - FromNum: fromNum, - Text: nullableString(record["text"]), - PayloadHex: nullableString(record["payload_hex"]), - Topic: topic, - ChannelID: nullableString(record["channel_id"]), - GatewayID: nullableString(record["gateway_id"]), - PacketID: nullableInt64(record["packet_id"]), - PacketTo: nullableString(record["packet_to"]), - PacketToNum: nullableInt64(record["packet_to_num"]), - Portnum: nullableString(record["portnum"]), - PayloadLen: nullableInt64(record["payload_len"]), - PayloadVariant: nullableString(record["payload_variant"]), - ViaMQTT: nullableBool(record["via_mqtt"]), - PKIEncrypted: nullableBool(record["pki_encrypted"]), - DecryptSuccess: nullableBool(record["decrypt_success"]), - DecryptStatus: nullableString(record["decrypt_status"]), - MQTTClientID: nullableString(clientInfo.ClientID), - MQTTUsername: nullableString(clientInfo.Username), - MQTTListener: nullableString(clientInfo.Listener), - MQTTRemoteAddr: nullableString(clientInfo.RemoteAddr), - MQTTRemoteHost: nullableString(clientInfo.RemoteHost), - MQTTRemotePort: nullableString(clientInfo.RemotePort), - ContentJSON: string(contentJSON), - }, nil + return AppendPacketFields{ + FromID: fromID, + FromNum: fromNum, + Topic: topic, + ChannelID: nullableString(record["channel_id"]), + GatewayID: nullableString(record["gateway_id"]), + PacketID: nullableInt64(record["packet_id"]), + PacketTo: nullableString(record["packet_to"]), + PacketToNum: nullableInt64(record["packet_to_num"]), + Portnum: nullableString(record["portnum"]), + PayloadLen: nullableInt64(record["payload_len"]), + PayloadVariant: nullableString(record["payload_variant"]), + ViaMQTT: nullableBool(record["via_mqtt"]), + PKIEncrypted: nullableBool(record["pki_encrypted"]), + DecryptSuccess: nullableBool(record["decrypt_success"]), + DecryptStatus: nullableString(record["decrypt_status"]), + ContentJSON: string(contentJSON), + }, MQTTClientRecordFields{ + MQTTClientID: nullableString(clientInfo.ClientID), + MQTTUsername: nullableString(clientInfo.Username), + MQTTListener: nullableString(clientInfo.Listener), + MQTTRemoteAddr: nullableString(clientInfo.RemoteAddr), + MQTTRemoteHost: nullableString(clientInfo.RemoteHost), + MQTTRemotePort: nullableString(clientInfo.RemotePort), + }, nil } func int64FromAny(value any) (int64, error) { @@ -370,6 +628,23 @@ func nullableString(value any) *string { return &s } +func nullableStringValue(value any) *string { + if value == nil { + return nil + } + if s, ok := value.(string); ok { + if s == "" { + return nil + } + return &s + } + s := fmt.Sprint(value) + if s == "" || s == "" { + return nil + } + return &s +} + func nullableBool(value any) *bool { b, ok := value.(bool) if !ok { @@ -422,6 +697,18 @@ func nullableFloat64(value any) *float64 { return &out } +func nullableJSON(value any) (*string, error) { + if value == nil { + return nil, nil + } + data, err := json.Marshal(value) + if err != nil { + return nil, err + } + s := string(data) + return &s, nil +} + func addStringUpdate(updates map[string]any, column string, value *string) { if value != nil { updates[column] = *value diff --git a/db_test.go b/db_test.go index b9f5030..e616b7f 100644 --- a/db_test.go +++ b/db_test.go @@ -11,7 +11,7 @@ func TestOpenStoreCreatesTables(t *testing.T) { st := openTestStore(t) defer st.Close() - for _, table := range []string{"nodeinfo_map", "text_message"} { + for _, table := range []string{"nodeinfo_map", "text_message", "position", "telemetry", "routing", "traceroute"} { var name string if err := rawTestDB(t, st).QueryRow("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", table).Scan(&name); err != nil { t.Fatalf("%s table missing: %v", table, err) @@ -283,6 +283,139 @@ func TestInsertTextMessageRequiresFields(t *testing.T) { } } +func TestInsertPositionAppendsRows(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + clientInfo := mqttClientInfo{ClientID: "client-1", RemoteAddr: "127.0.0.1:54321", RemoteHost: "127.0.0.1", RemotePort: "54321"} + if err := st.InsertPosition(positionTestRecord(), clientInfo); err != nil { + t.Fatalf("first InsertPosition() error = %v", err) + } + if err := st.InsertPosition(positionTestRecord(), clientInfo); err != nil { + t.Fatalf("second InsertPosition() error = %v", err) + } + + var count int + if err := rawTestDB(t, st).QueryRow("SELECT COUNT(*) FROM position WHERE from_id = ?", "!12345678").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 2 { + t.Fatalf("position count = %d, want 2", count) + } + + var latitude, longitude float64 + var altitude int64 + var locationSource, remoteHost string + if err := rawTestDB(t, st).QueryRow("SELECT latitude, longitude, altitude, location_source, mqtt_remote_host FROM position ORDER BY id LIMIT 1").Scan(&latitude, &longitude, &altitude, &locationSource, &remoteHost); err != nil { + t.Fatal(err) + } + if latitude != 42.5 || longitude != -83.1 || altitude != 200 || locationSource != "LOC_INTERNAL" || remoteHost != "127.0.0.1" { + t.Fatalf("position row = lat %v lon %v alt %v source %q remote %q", latitude, longitude, altitude, locationSource, remoteHost) + } +} + +func TestInsertTelemetryAppendsRowsAndStoresMetricsJSON(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + if err := st.InsertTelemetry(telemetryTestRecord(), mqttClientInfo{}); err != nil { + t.Fatalf("InsertTelemetry() error = %v", err) + } + + var telemetryType, metricsJSON, contentJSON string + if err := rawTestDB(t, st).QueryRow("SELECT telemetry_type, metrics_json, content_json FROM telemetry LIMIT 1").Scan(&telemetryType, &metricsJSON, &contentJSON); err != nil { + t.Fatal(err) + } + if telemetryType != "device_metrics" { + t.Fatalf("telemetry_type = %q, want device_metrics", telemetryType) + } + if !strings.Contains(metricsJSON, "battery_level") || !strings.Contains(metricsJSON, "voltage") { + t.Fatalf("metrics_json = %q, want battery_level and voltage", metricsJSON) + } + if !strings.Contains(contentJSON, "telemetry") { + t.Fatalf("content_json = %q, want telemetry", contentJSON) + } +} + +func TestInsertRoutingAndTracerouteAppendRows(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + if err := st.InsertRouting(routingTestRecord(), mqttClientInfo{}); err != nil { + t.Fatalf("first InsertRouting() error = %v", err) + } + if err := st.InsertRouting(routingTestRecord(), mqttClientInfo{}); err != nil { + t.Fatalf("second InsertRouting() error = %v", err) + } + if err := st.InsertTraceroute(tracerouteTestRecord(), mqttClientInfo{}); err != nil { + t.Fatalf("first InsertTraceroute() error = %v", err) + } + if err := st.InsertTraceroute(tracerouteTestRecord(), mqttClientInfo{}); err != nil { + t.Fatalf("second InsertTraceroute() error = %v", err) + } + + for _, table := range []string{"routing", "traceroute"} { + var count int + if err := rawTestDB(t, st).QueryRow("SELECT COUNT(*) FROM "+table+" WHERE from_id = ?", "!12345678").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 2 { + t.Fatalf("%s count = %d, want 2", table, count) + } + + var packetID int64 + var contentJSON string + if err := rawTestDB(t, st).QueryRow("SELECT packet_id, content_json FROM "+table+" ORDER BY id LIMIT 1").Scan(&packetID, &contentJSON); err != nil { + t.Fatal(err) + } + if packetID != 42 || !strings.Contains(contentJSON, table) { + t.Fatalf("%s row packet_id=%d content_json=%q", table, packetID, contentJSON) + } + } +} + +func TestInsertPacketTablesRequireFields(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + tests := []struct { + name string + insert func(map[string]any) error + record map[string]any + }{ + {name: "position", insert: func(r map[string]any) error { return st.InsertPosition(r, mqttClientInfo{}) }, record: positionTestRecord()}, + {name: "telemetry", insert: func(r map[string]any) error { return st.InsertTelemetry(r, mqttClientInfo{}) }, record: telemetryTestRecord()}, + {name: "routing", insert: func(r map[string]any) error { return st.InsertRouting(r, mqttClientInfo{}) }, record: routingTestRecord()}, + {name: "traceroute", insert: func(r map[string]any) error { return st.InsertTraceroute(r, mqttClientInfo{}) }, record: tracerouteTestRecord()}, + } + + for _, tt := range tests { + wrongType := cloneRecord(tt.record) + wrongType["type"] = "text_message" + if err := tt.insert(wrongType); err == nil || !strings.Contains(err.Error(), tt.name) { + t.Fatalf("%s wrong type error = %v, want %s", tt.name, err, tt.name) + } + + missingFrom := cloneRecord(tt.record) + delete(missingFrom, "from") + if err := tt.insert(missingFrom); err == nil || !strings.Contains(err.Error(), "from") { + t.Fatalf("%s missing from error = %v, want from error", tt.name, err) + } + + missingFromNum := cloneRecord(tt.record) + delete(missingFromNum, "from_num") + if err := tt.insert(missingFromNum); err == nil || !strings.Contains(err.Error(), "from_num") { + t.Fatalf("%s missing from_num error = %v, want from_num error", tt.name, err) + } + + missingTopic := cloneRecord(tt.record) + delete(missingTopic, "topic") + if err := tt.insert(missingTopic); err == nil || !strings.Contains(err.Error(), "topic") { + t.Fatalf("%s missing topic error = %v, want topic error", tt.name, err) + } + } +} + func openTestStore(t *testing.T) *store { t.Helper() st, err := openStore(databaseConfig{ @@ -341,21 +474,78 @@ func mapReportRecord(longName string) map[string]any { } func textMessageTestRecord(text any) map[string]any { + record := commonPacketTestRecord("text_message", "TEXT_MESSAGE_APP") + record["text"] = text + return record +} + +func positionTestRecord() map[string]any { + record := commonPacketTestRecord("position", "POSITION_APP") + record["latitude"] = 42.5 + record["longitude"] = -83.1 + record["altitude"] = int32(200) + record["time"] = uint32(123456) + record["location_source"] = "LOC_INTERNAL" + record["altitude_source"] = "ALT_INTERNAL" + record["timestamp"] = uint32(123456) + record["timestamp_millis_adjust"] = uint32(10) + record["altitude_hae"] = int32(210) + record["altitude_geoidal_separation"] = int32(20) + record["pdop"] = 1.1 + record["hdop"] = 1.2 + record["vdop"] = 1.3 + record["gps_accuracy"] = uint32(1000) + record["ground_speed"] = uint32(2) + record["ground_track"] = 180.5 + record["fix_quality"] = uint32(1) + record["fix_type"] = uint32(3) + record["sats_in_view"] = uint32(8) + record["sensor_id"] = uint32(1) + record["next_update"] = uint32(60) + record["seq_number"] = uint32(7) + record["precision_bits"] = uint32(16) + return record +} + +func telemetryTestRecord() map[string]any { + record := commonPacketTestRecord("telemetry", "TELEMETRY_APP") + record["time"] = uint32(123456) + record["telemetry_type"] = "device_metrics" + record["metrics"] = map[string]any{"battery_level": 85, "voltage": 4.1} + return record +} + +func routingTestRecord() map[string]any { + return commonPacketTestRecord("routing", "ROUTING_APP") +} + +func tracerouteTestRecord() map[string]any { + return commonPacketTestRecord("traceroute", "TRACEROUTE_APP") +} + +func commonPacketTestRecord(recordType, portnum string) map[string]any { return map[string]any{ - "type": "text_message", + "type": recordType, "topic": "msh/US/test", "channel_id": "LongFast", "gateway_id": "!gateway", "from": "!12345678", "from_num": uint32(0x12345678), - "text": text, "packet_id": uint32(42), "packet_to": "!ffffffff", "packet_to_num": uint32(0xffffffff), - "portnum": "TEXT_MESSAGE_APP", + "portnum": portnum, "payload_len": 5, "payload_variant": "decoded", "via_mqtt": true, "pki_encrypted": false, } } + +func cloneRecord(record map[string]any) map[string]any { + clone := make(map[string]any, len(record)) + for key, value := range record { + clone[key] = value + } + return clone +} diff --git a/main.go b/main.go index 0422e35..e6f22d4 100644 --- a/main.go +++ b/main.go @@ -64,6 +64,30 @@ func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (pa printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) } } + case "position": + if h.store != nil { + if err := h.store.InsertPosition(record, mqttClientInfoFromClient(cl)); err != nil { + printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) + } + } + case "telemetry": + if h.store != nil { + if err := h.store.InsertTelemetry(record, mqttClientInfoFromClient(cl)); err != nil { + printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) + } + } + case "routing": + if h.store != nil { + if err := h.store.InsertRouting(record, mqttClientInfoFromClient(cl)); err != nil { + printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) + } + } + case "traceroute": + if h.store != nil { + if err := h.store.InsertTraceroute(record, mqttClientInfoFromClient(cl)); err != nil { + printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) + } + } } if record["type"] != "empty_packet" { printJSON(record)