分表
This commit is contained in:
@@ -62,33 +62,48 @@ type MQTTClientRecordFields struct {
|
||||
MQTTRemotePort *string `gorm:"column:mqtt_remote_port"`
|
||||
}
|
||||
|
||||
type nodeInfoMapRecord struct {
|
||||
type nodeInfoRecord struct {
|
||||
NodeID string `gorm:"column:node_id;primaryKey;not null"`
|
||||
NodeNum int64 `gorm:"column:node_num;not null;index"`
|
||||
UserID *string `gorm:"column:user_id"`
|
||||
LongName *string `gorm:"column:long_name"`
|
||||
ShortName *string `gorm:"column:short_name"`
|
||||
HWModel *string `gorm:"column:hw_model"`
|
||||
Role *string `gorm:"column:role"`
|
||||
IsLicensed *bool `gorm:"column:is_licensed"`
|
||||
PublicKey *string `gorm:"column:public_key"`
|
||||
ContentJSON string `gorm:"column:content_json;not null"`
|
||||
FirstSeenAt time.Time `gorm:"column:first_seen_at;autoCreateTime"`
|
||||
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime;index"`
|
||||
}
|
||||
|
||||
func (nodeInfoRecord) TableName() string {
|
||||
return "nodeinfo"
|
||||
}
|
||||
|
||||
type mapReportRecord struct {
|
||||
NodeID string `gorm:"column:node_id;primaryKey;not null"`
|
||||
NodeNum int64 `gorm:"column:node_num;not null"`
|
||||
LatestType string `gorm:"column:latest_type;not null"`
|
||||
UserID *string `gorm:"column:user_id"`
|
||||
NodeNum int64 `gorm:"column:node_num;not null;index"`
|
||||
LongName *string `gorm:"column:long_name"`
|
||||
ShortName *string `gorm:"column:short_name"`
|
||||
HWModel *string `gorm:"column:hw_model"`
|
||||
Role *string `gorm:"column:role"`
|
||||
IsLicensed *bool `gorm:"column:is_licensed"`
|
||||
PublicKey *string `gorm:"column:public_key"`
|
||||
FirmwareVersion *string `gorm:"column:firmware_version"`
|
||||
Region *string `gorm:"column:region"`
|
||||
ModemPreset *string `gorm:"column:modem_preset"`
|
||||
Latitude *float64 `gorm:"column:latitude"`
|
||||
Longitude *float64 `gorm:"column:longitude"`
|
||||
Latitude *float64 `gorm:"column:latitude;index"`
|
||||
Longitude *float64 `gorm:"column:longitude;index"`
|
||||
Altitude *int64 `gorm:"column:altitude"`
|
||||
PositionPrecision *int64 `gorm:"column:position_precision"`
|
||||
NumOnlineLocalNodes *int64 `gorm:"column:num_online_local_nodes"`
|
||||
HasOptedReportLocation *bool `gorm:"column:has_opted_report_location"`
|
||||
ContentJSON string `gorm:"column:content_json;not null"`
|
||||
FirstSeenAt time.Time `gorm:"column:first_seen_at;autoCreateTime"`
|
||||
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
|
||||
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime;index"`
|
||||
}
|
||||
|
||||
func (nodeInfoMapRecord) TableName() string {
|
||||
return "nodeinfo_map"
|
||||
func (mapReportRecord) TableName() string {
|
||||
return "map_report"
|
||||
}
|
||||
|
||||
type textMessageRecord struct {
|
||||
@@ -239,7 +254,8 @@ func (s *store) migrate() error {
|
||||
label string
|
||||
model any
|
||||
}{
|
||||
{label: "nodeinfo_map", model: &nodeInfoMapRecord{}},
|
||||
{label: "nodeinfo", model: &nodeInfoRecord{}},
|
||||
{label: "map_report", model: &mapReportRecord{}},
|
||||
{label: "text_message", model: &textMessageRecord{}},
|
||||
{label: "position", model: &positionRecord{}},
|
||||
{label: "telemetry", model: &telemetryRecord{}},
|
||||
@@ -278,43 +294,75 @@ func createMissingIndexes(migrator gorm.Migrator, model any, label string, index
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) UpsertNodeInfoMap(record map[string]any) error {
|
||||
node, err := nodeInfoMapFromRecord(record)
|
||||
func (s *store) UpsertNodeInfo(record map[string]any) error {
|
||||
node, err := nodeInfoFromRecord(record)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.upsertNodeInfoMapRecord(node); err != nil {
|
||||
return fmt.Errorf("upsert nodeinfo_map %s: %w", node.NodeID, err)
|
||||
if err := s.upsertNodeInfoRecord(node); err != nil {
|
||||
return fmt.Errorf("upsert nodeinfo %s: %w", node.NodeID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) upsertNodeInfoMapRecord(node *nodeInfoMapRecord) error {
|
||||
func (s *store) UpsertMapReport(record map[string]any) error {
|
||||
report, err := mapReportFromRecord(record)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.upsertMapReportRecord(report); err != nil {
|
||||
return fmt.Errorf("upsert map_report %s: %w", report.NodeID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) upsertNodeInfoRecord(node *nodeInfoRecord) error {
|
||||
return s.db.Transaction(func(tx *gorm.DB) error {
|
||||
var existing nodeInfoMapRecord
|
||||
var existing nodeInfoRecord
|
||||
err := tx.Where("node_id = ?", node.NodeID).Take(&existing).Error
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
if err := tx.Create(node).Error; err != nil {
|
||||
return s.updateNodeInfoMapRecord(tx, node)
|
||||
return s.updateNodeInfoRecord(tx, node)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.updateNodeInfoMapRecord(tx, node)
|
||||
return s.updateNodeInfoRecord(tx, node)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *store) updateNodeInfoMapRecord(tx *gorm.DB, node *nodeInfoMapRecord) error {
|
||||
updates := nodeInfoMapUpdates(node)
|
||||
return tx.Model(&nodeInfoMapRecord{}).Where("node_id = ?", node.NodeID).Updates(updates).Error
|
||||
func (s *store) upsertMapReportRecord(report *mapReportRecord) error {
|
||||
return s.db.Transaction(func(tx *gorm.DB) error {
|
||||
var existing mapReportRecord
|
||||
err := tx.Where("node_id = ?", report.NodeID).Take(&existing).Error
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
if err := tx.Create(report).Error; err != nil {
|
||||
return s.updateMapReportRecord(tx, report)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.updateMapReportRecord(tx, report)
|
||||
})
|
||||
}
|
||||
|
||||
func nodeInfoMapUpdates(node *nodeInfoMapRecord) map[string]any {
|
||||
func (s *store) updateNodeInfoRecord(tx *gorm.DB, node *nodeInfoRecord) error {
|
||||
updates := nodeInfoUpdates(node)
|
||||
return tx.Model(&nodeInfoRecord{}).Where("node_id = ?", node.NodeID).Updates(updates).Error
|
||||
}
|
||||
|
||||
func (s *store) updateMapReportRecord(tx *gorm.DB, report *mapReportRecord) error {
|
||||
updates := mapReportUpdates(report)
|
||||
return tx.Model(&mapReportRecord{}).Where("node_id = ?", report.NodeID).Updates(updates).Error
|
||||
}
|
||||
|
||||
func nodeInfoUpdates(node *nodeInfoRecord) map[string]any {
|
||||
updates := map[string]any{
|
||||
"node_num": node.NodeNum,
|
||||
"latest_type": node.LatestType,
|
||||
"content_json": node.ContentJSON,
|
||||
"updated_at": time.Now(),
|
||||
}
|
||||
@@ -325,15 +373,28 @@ func nodeInfoMapUpdates(node *nodeInfoMapRecord) map[string]any {
|
||||
addStringUpdate(updates, "role", node.Role)
|
||||
addBoolUpdate(updates, "is_licensed", node.IsLicensed)
|
||||
addStringUpdate(updates, "public_key", node.PublicKey)
|
||||
addStringUpdate(updates, "firmware_version", node.FirmwareVersion)
|
||||
addStringUpdate(updates, "region", node.Region)
|
||||
addStringUpdate(updates, "modem_preset", node.ModemPreset)
|
||||
addFloat64Update(updates, "latitude", node.Latitude)
|
||||
addFloat64Update(updates, "longitude", node.Longitude)
|
||||
addInt64Update(updates, "altitude", node.Altitude)
|
||||
addInt64Update(updates, "position_precision", node.PositionPrecision)
|
||||
addInt64Update(updates, "num_online_local_nodes", node.NumOnlineLocalNodes)
|
||||
addBoolUpdate(updates, "has_opted_report_location", node.HasOptedReportLocation)
|
||||
return updates
|
||||
}
|
||||
|
||||
func mapReportUpdates(report *mapReportRecord) map[string]any {
|
||||
updates := map[string]any{
|
||||
"node_num": report.NodeNum,
|
||||
"content_json": report.ContentJSON,
|
||||
"updated_at": time.Now(),
|
||||
}
|
||||
addStringUpdate(updates, "long_name", report.LongName)
|
||||
addStringUpdate(updates, "short_name", report.ShortName)
|
||||
addStringUpdate(updates, "hw_model", report.HWModel)
|
||||
addStringUpdate(updates, "role", report.Role)
|
||||
addStringUpdate(updates, "firmware_version", report.FirmwareVersion)
|
||||
addStringUpdate(updates, "region", report.Region)
|
||||
addStringUpdate(updates, "modem_preset", report.ModemPreset)
|
||||
addFloat64Update(updates, "latitude", report.Latitude)
|
||||
addFloat64Update(updates, "longitude", report.Longitude)
|
||||
addInt64Update(updates, "altitude", report.Altitude)
|
||||
addInt64Update(updates, "position_precision", report.PositionPrecision)
|
||||
addInt64Update(updates, "num_online_local_nodes", report.NumOnlineLocalNodes)
|
||||
addBoolUpdate(updates, "has_opted_report_location", report.HasOptedReportLocation)
|
||||
return updates
|
||||
}
|
||||
|
||||
@@ -392,35 +453,47 @@ func (s *store) InsertTraceroute(record map[string]any, clientInfo mqttClientInf
|
||||
return nil
|
||||
}
|
||||
|
||||
func nodeInfoMapFromRecord(record map[string]any) (*nodeInfoMapRecord, error) {
|
||||
latestType, ok := record["type"].(string)
|
||||
if !ok || (latestType != "nodeinfo" && latestType != "map_report") {
|
||||
return nil, fmt.Errorf("record type %v is not nodeinfo or map_report", record["type"])
|
||||
func nodeInfoFromRecord(record map[string]any) (*nodeInfoRecord, error) {
|
||||
recordType, ok := record["type"].(string)
|
||||
if !ok || recordType != "nodeinfo" {
|
||||
return nil, fmt.Errorf("record type %v is not nodeinfo", record["type"])
|
||||
}
|
||||
nodeID, ok := record["from"].(string)
|
||||
if !ok || nodeID == "" {
|
||||
return nil, fmt.Errorf("nodeinfo_map missing from")
|
||||
}
|
||||
nodeNum, err := int64FromAny(record["from_num"])
|
||||
nodeID, nodeNum, contentJSON, err := nodeRecordBase(record, "nodeinfo")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nodeinfo_map from_num: %w", err)
|
||||
}
|
||||
contentJSON, err := json.Marshal(record)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encode nodeinfo_map content_json: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &nodeInfoMapRecord{
|
||||
return &nodeInfoRecord{
|
||||
NodeID: nodeID,
|
||||
NodeNum: nodeNum,
|
||||
UserID: nullableString(record["user_id"]),
|
||||
LongName: nullableString(record["long_name"]),
|
||||
ShortName: nullableString(record["short_name"]),
|
||||
HWModel: nullableString(record["hw_model"]),
|
||||
Role: nullableString(record["role"]),
|
||||
IsLicensed: nullableBool(record["is_licensed"]),
|
||||
PublicKey: nullableString(record["public_key"]),
|
||||
ContentJSON: contentJSON,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func mapReportFromRecord(record map[string]any) (*mapReportRecord, error) {
|
||||
recordType, ok := record["type"].(string)
|
||||
if !ok || recordType != "map_report" {
|
||||
return nil, fmt.Errorf("record type %v is not map_report", record["type"])
|
||||
}
|
||||
nodeID, nodeNum, contentJSON, err := nodeRecordBase(record, "map_report")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &mapReportRecord{
|
||||
NodeID: nodeID,
|
||||
NodeNum: nodeNum,
|
||||
LatestType: latestType,
|
||||
UserID: nullableString(record["user_id"]),
|
||||
LongName: nullableString(record["long_name"]),
|
||||
ShortName: nullableString(record["short_name"]),
|
||||
HWModel: nullableString(record["hw_model"]),
|
||||
Role: nullableString(record["role"]),
|
||||
IsLicensed: nullableBool(record["is_licensed"]),
|
||||
PublicKey: nullableString(record["public_key"]),
|
||||
FirmwareVersion: nullableString(record["firmware_version"]),
|
||||
Region: nullableString(record["region"]),
|
||||
ModemPreset: nullableString(record["modem_preset"]),
|
||||
@@ -430,10 +503,26 @@ func nodeInfoMapFromRecord(record map[string]any) (*nodeInfoMapRecord, error) {
|
||||
PositionPrecision: nullableInt64(record["position_precision"]),
|
||||
NumOnlineLocalNodes: nullableInt64(record["num_online_local_nodes"]),
|
||||
HasOptedReportLocation: nullableBool(record["has_opted_report_location"]),
|
||||
ContentJSON: string(contentJSON),
|
||||
ContentJSON: contentJSON,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func nodeRecordBase(record map[string]any, label string) (string, int64, string, error) {
|
||||
nodeID, ok := record["from"].(string)
|
||||
if !ok || nodeID == "" {
|
||||
return "", 0, "", fmt.Errorf("%s missing from", label)
|
||||
}
|
||||
nodeNum, err := int64FromAny(record["from_num"])
|
||||
if err != nil {
|
||||
return "", 0, "", fmt.Errorf("%s from_num: %w", label, err)
|
||||
}
|
||||
contentJSON, err := json.Marshal(record)
|
||||
if err != nil {
|
||||
return "", 0, "", fmt.Errorf("encode %s content_json: %w", label, err)
|
||||
}
|
||||
return nodeID, nodeNum, string(contentJSON), nil
|
||||
}
|
||||
|
||||
func textMessageFromRecord(record map[string]any, clientInfo mqttClientInfo) (*textMessageRecord, error) {
|
||||
recordType, ok := record["type"].(string)
|
||||
if !ok || recordType != "text_message" {
|
||||
|
||||
Reference in New Issue
Block a user