数据库改GORM 操作

This commit is contained in:
2026-06-03 15:33:56 +08:00
parent 8c1e1ef414
commit f0d6c5a96a
5 changed files with 243 additions and 352 deletions
+198 -337
View File
@@ -1,14 +1,16 @@
package main
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"time"
_ "github.com/go-sql-driver/mysql"
_ "modernc.org/sqlite"
"github.com/glebarez/sqlite"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
const (
@@ -17,15 +19,10 @@ const (
)
type store struct {
db *sql.DB
db *gorm.DB
driver string
}
type migrationQuery struct {
name string
query string
}
type mqttClientInfo struct {
ClientID string
Username string
@@ -36,81 +33,97 @@ type mqttClientInfo struct {
}
type nodeInfoMapRecord struct {
NodeID string
NodeNum int64
LatestType string
UserID any
LongName any
ShortName any
HWModel any
Role any
IsLicensed any
PublicKey any
FirmwareVersion any
Region any
ModemPreset any
Latitude any
Longitude any
Altitude any
PositionPrecision any
NumOnlineLocalNodes any
HasOptedReportLocation any
ContentJSON []byte
NodeID string `gorm:"column:node_id;primaryKey;not null"`
NodeNum int64 `gorm:"column:node_num;not null"`
LatestType string `gorm:"column:latest_type;not null"`
UserID *string `gorm:"column:user_id"`
LongName *string `gorm:"column:long_name"`
ShortName *string `gorm:"column:short_name"`
HWModel *string `gorm:"column:hw_model"`
Role *string `gorm:"column:role"`
IsLicensed *bool `gorm:"column:is_licensed"`
PublicKey *string `gorm:"column:public_key"`
FirmwareVersion *string `gorm:"column:firmware_version"`
Region *string `gorm:"column:region"`
ModemPreset *string `gorm:"column:modem_preset"`
Latitude *float64 `gorm:"column:latitude"`
Longitude *float64 `gorm:"column:longitude"`
Altitude *int64 `gorm:"column:altitude"`
PositionPrecision *int64 `gorm:"column:position_precision"`
NumOnlineLocalNodes *int64 `gorm:"column:num_online_local_nodes"`
HasOptedReportLocation *bool `gorm:"column:has_opted_report_location"`
ContentJSON string `gorm:"column:content_json;not null"`
FirstSeenAt time.Time `gorm:"column:first_seen_at;autoCreateTime"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (nodeInfoMapRecord) TableName() string {
return "nodeinfo_map"
}
type textMessageRecord struct {
FromID string
FromNum int64
Text any
PayloadHex any
Topic string
ChannelID any
GatewayID any
PacketID any
PacketTo any
PacketToNum any
Portnum any
PayloadLen any
PayloadVariant any
ViaMQTT any
PKIEncrypted any
DecryptSuccess any
DecryptStatus any
MQTTClientID any
MQTTUsername any
MQTTListener any
MQTTRemoteAddr any
MQTTRemoteHost any
MQTTRemotePort any
ContentJSON []byte
ID uint64 `gorm:"column:id;primaryKey;autoIncrement"`
FromID string `gorm:"column:from_id;not null"`
FromNum int64 `gorm:"column:from_num;not null;index:idx_text_message_from_num_created_at,priority:1"`
Text *string `gorm:"column:text"`
PayloadHex *string `gorm:"column:payload_hex"`
Topic string `gorm:"column:topic;not null"`
ChannelID *string `gorm:"column:channel_id"`
GatewayID *string `gorm:"column:gateway_id"`
PacketID *int64 `gorm:"column:packet_id;index:idx_text_message_packet_id"`
PacketTo *string `gorm:"column:packet_to"`
PacketToNum *int64 `gorm:"column:packet_to_num"`
Portnum *string `gorm:"column:portnum"`
PayloadLen *int64 `gorm:"column:payload_len"`
PayloadVariant *string `gorm:"column:payload_variant"`
ViaMQTT *bool `gorm:"column:via_mqtt"`
PKIEncrypted *bool `gorm:"column:pki_encrypted"`
DecryptSuccess *bool `gorm:"column:decrypt_success"`
DecryptStatus *string `gorm:"column:decrypt_status"`
MQTTClientID *string `gorm:"column:mqtt_client_id"`
MQTTUsername *string `gorm:"column:mqtt_username"`
MQTTListener *string `gorm:"column:mqtt_listener"`
MQTTRemoteAddr *string `gorm:"column:mqtt_remote_addr"`
MQTTRemoteHost *string `gorm:"column:mqtt_remote_host"`
MQTTRemotePort *string `gorm:"column:mqtt_remote_port"`
ContentJSON string `gorm:"column:content_json;not null"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index:idx_text_message_from_num_created_at,priority:2;index:idx_text_message_created_at"`
}
func (textMessageRecord) TableName() string {
return "text_message"
}
func openStore(cfg databaseConfig) (*store, error) {
var dsn string
var dialector gorm.Dialector
switch cfg.Driver {
case databaseDriverSQLite:
if err := os.MkdirAll(filepath.Dir(cfg.SQLite.Path), 0755); err != nil {
return nil, fmt.Errorf("create sqlite directory %s: %w", filepath.Dir(cfg.SQLite.Path), err)
}
dsn = cfg.SQLite.Path
dialector = sqlite.Open(cfg.SQLite.Path)
case databaseDriverMySQL:
dsn = cfg.MySQL.DSN
dialector = mysql.Open(cfg.MySQL.DSN)
default:
return nil, fmt.Errorf("unsupported database driver %q", cfg.Driver)
}
db, err := sql.Open(cfg.Driver, dsn)
db, err := gorm.Open(dialector, &gorm.Config{})
if err != nil {
return nil, fmt.Errorf("open %s database: %w", cfg.Driver, err)
}
if err := db.Ping(); err != nil {
db.Close()
sqlDB, err := db.DB()
if err != nil {
return nil, fmt.Errorf("get %s database handle: %w", cfg.Driver, err)
}
if err := sqlDB.Ping(); err != nil {
sqlDB.Close()
return nil, fmt.Errorf("ping %s database: %w", cfg.Driver, err)
}
s := &store{db: db, driver: cfg.Driver}
if err := s.migrate(); err != nil {
db.Close()
sqlDB.Close()
return nil, err
}
return s, nil
@@ -120,143 +133,39 @@ func (s *store) Close() error {
if s == nil || s.db == nil {
return nil
}
return s.db.Close()
}
func (s *store) migrate() error {
queries, err := s.migrationQueries()
sqlDB, err := s.db.DB()
if err != nil {
return err
}
for _, q := range queries {
if _, err := s.db.Exec(q.query); err != nil {
return fmt.Errorf("migrate %s: %w", q.name, err)
}
}
return nil
return sqlDB.Close()
}
func (s *store) migrationQueries() ([]migrationQuery, error) {
switch s.driver {
case databaseDriverSQLite:
return []migrationQuery{
{name: "nodeinfo_map table", query: `CREATE TABLE IF NOT EXISTS nodeinfo_map (
node_id TEXT PRIMARY KEY,
node_num INTEGER NOT NULL,
latest_type TEXT NOT NULL,
user_id TEXT,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
role TEXT,
is_licensed BOOLEAN,
public_key TEXT,
firmware_version TEXT,
region TEXT,
modem_preset TEXT,
latitude REAL,
longitude REAL,
altitude INTEGER,
position_precision INTEGER,
num_online_local_nodes INTEGER,
has_opted_report_location BOOLEAN,
content_json TEXT NOT NULL,
first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);`},
{name: "text_message table", query: `CREATE TABLE IF NOT EXISTS text_message (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_id TEXT NOT NULL,
from_num INTEGER NOT NULL,
text TEXT,
payload_hex TEXT,
topic TEXT NOT NULL,
channel_id TEXT,
gateway_id TEXT,
packet_id INTEGER,
packet_to TEXT,
packet_to_num INTEGER,
portnum TEXT,
payload_len INTEGER,
payload_variant TEXT,
via_mqtt BOOLEAN,
pki_encrypted BOOLEAN,
decrypt_success BOOLEAN,
decrypt_status TEXT,
mqtt_client_id TEXT,
mqtt_username TEXT,
mqtt_listener TEXT,
mqtt_remote_addr TEXT,
mqtt_remote_host TEXT,
mqtt_remote_port TEXT,
content_json TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);`},
{name: "text_message from_num index", query: `CREATE INDEX IF NOT EXISTS idx_text_message_from_num_created_at ON text_message (from_num, created_at);`},
{name: "text_message created_at index", query: `CREATE INDEX IF NOT EXISTS idx_text_message_created_at ON text_message (created_at);`},
{name: "text_message packet_id index", query: `CREATE INDEX IF NOT EXISTS idx_text_message_packet_id ON text_message (packet_id);`},
}, nil
case databaseDriverMySQL:
return []migrationQuery{
{name: "nodeinfo_map table", query: `CREATE TABLE IF NOT EXISTS nodeinfo_map (
node_id VARCHAR(32) NOT NULL PRIMARY KEY,
node_num BIGINT UNSIGNED NOT NULL,
latest_type VARCHAR(32) NOT NULL,
user_id VARCHAR(128),
long_name TEXT,
short_name VARCHAR(64),
hw_model VARCHAR(128),
role VARCHAR(128),
is_licensed BOOLEAN,
public_key TEXT,
firmware_version VARCHAR(128),
region VARCHAR(128),
modem_preset VARCHAR(128),
latitude DOUBLE,
longitude DOUBLE,
altitude INT,
position_precision INT UNSIGNED,
num_online_local_nodes INT UNSIGNED,
has_opted_report_location BOOLEAN,
content_json JSON NOT NULL,
first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);`},
{name: "text_message table", query: `CREATE TABLE IF NOT EXISTS text_message (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
from_id VARCHAR(32) NOT NULL,
from_num BIGINT UNSIGNED NOT NULL,
text TEXT,
payload_hex TEXT,
topic TEXT NOT NULL,
channel_id VARCHAR(128),
gateway_id VARCHAR(128),
packet_id BIGINT UNSIGNED,
packet_to VARCHAR(32),
packet_to_num BIGINT UNSIGNED,
portnum VARCHAR(64),
payload_len INT UNSIGNED,
payload_variant VARCHAR(32),
via_mqtt BOOLEAN,
pki_encrypted BOOLEAN,
decrypt_success BOOLEAN,
decrypt_status VARCHAR(255),
mqtt_client_id VARCHAR(255),
mqtt_username VARCHAR(255),
mqtt_listener VARCHAR(128),
mqtt_remote_addr VARCHAR(255),
mqtt_remote_host VARCHAR(255),
mqtt_remote_port VARCHAR(16),
content_json JSON NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_text_message_from_num_created_at (from_num, created_at),
INDEX idx_text_message_created_at (created_at),
INDEX idx_text_message_packet_id (packet_id)
);`},
}, nil
default:
return nil, fmt.Errorf("unsupported database driver %q", s.driver)
}
func (s *store) migrate() error {
return s.db.Transaction(func(tx *gorm.DB) error {
migrator := tx.Migrator()
if !migrator.HasTable(&nodeInfoMapRecord{}) {
if err := migrator.CreateTable(&nodeInfoMapRecord{}); err != nil {
return fmt.Errorf("migrate nodeinfo_map table: %w", err)
}
}
if !migrator.HasTable(&textMessageRecord{}) {
if err := migrator.CreateTable(&textMessageRecord{}); err != nil {
return fmt.Errorf("migrate text_message table: %w", err)
}
}
for _, indexName := range []string{
"idx_text_message_from_num_created_at",
"idx_text_message_created_at",
"idx_text_message_packet_id",
} {
if !migrator.HasIndex(&textMessageRecord{}, indexName) {
if err := migrator.CreateIndex(&textMessageRecord{}, indexName); err != nil {
return fmt.Errorf("migrate text_message index %s: %w", indexName, err)
}
}
}
return nil
})
}
func (s *store) UpsertNodeInfoMap(record map[string]any) error {
@@ -264,140 +173,66 @@ func (s *store) UpsertNodeInfoMap(record map[string]any) error {
if err != nil {
return err
}
var query string
switch s.driver {
case databaseDriverSQLite:
query = `INSERT INTO nodeinfo_map (
node_id, node_num, latest_type, user_id, long_name, short_name,
hw_model, role, is_licensed, public_key, firmware_version,
region, modem_preset, latitude, longitude, altitude,
position_precision, num_online_local_nodes, has_opted_report_location,
content_json, first_seen_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT(node_id) DO UPDATE SET
node_num = excluded.node_num,
latest_type = excluded.latest_type,
user_id = COALESCE(excluded.user_id, nodeinfo_map.user_id),
long_name = COALESCE(excluded.long_name, nodeinfo_map.long_name),
short_name = COALESCE(excluded.short_name, nodeinfo_map.short_name),
hw_model = COALESCE(excluded.hw_model, nodeinfo_map.hw_model),
role = COALESCE(excluded.role, nodeinfo_map.role),
is_licensed = COALESCE(excluded.is_licensed, nodeinfo_map.is_licensed),
public_key = COALESCE(excluded.public_key, nodeinfo_map.public_key),
firmware_version = COALESCE(excluded.firmware_version, nodeinfo_map.firmware_version),
region = COALESCE(excluded.region, nodeinfo_map.region),
modem_preset = COALESCE(excluded.modem_preset, nodeinfo_map.modem_preset),
latitude = COALESCE(excluded.latitude, nodeinfo_map.latitude),
longitude = COALESCE(excluded.longitude, nodeinfo_map.longitude),
altitude = COALESCE(excluded.altitude, nodeinfo_map.altitude),
position_precision = COALESCE(excluded.position_precision, nodeinfo_map.position_precision),
num_online_local_nodes = COALESCE(excluded.num_online_local_nodes, nodeinfo_map.num_online_local_nodes),
has_opted_report_location = COALESCE(excluded.has_opted_report_location, nodeinfo_map.has_opted_report_location),
content_json = excluded.content_json,
updated_at = CURRENT_TIMESTAMP;`
case databaseDriverMySQL:
query = `INSERT INTO nodeinfo_map (
node_id, node_num, latest_type, user_id, long_name, short_name,
hw_model, role, is_licensed, public_key, firmware_version,
region, modem_preset, latitude, longitude, altitude,
position_precision, num_online_local_nodes, has_opted_report_location,
content_json, first_seen_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON DUPLICATE KEY UPDATE
node_num = VALUES(node_num),
latest_type = VALUES(latest_type),
user_id = COALESCE(VALUES(user_id), user_id),
long_name = COALESCE(VALUES(long_name), long_name),
short_name = COALESCE(VALUES(short_name), short_name),
hw_model = COALESCE(VALUES(hw_model), hw_model),
role = COALESCE(VALUES(role), role),
is_licensed = COALESCE(VALUES(is_licensed), is_licensed),
public_key = COALESCE(VALUES(public_key), public_key),
firmware_version = COALESCE(VALUES(firmware_version), firmware_version),
region = COALESCE(VALUES(region), region),
modem_preset = COALESCE(VALUES(modem_preset), modem_preset),
latitude = COALESCE(VALUES(latitude), latitude),
longitude = COALESCE(VALUES(longitude), longitude),
altitude = COALESCE(VALUES(altitude), altitude),
position_precision = COALESCE(VALUES(position_precision), position_precision),
num_online_local_nodes = COALESCE(VALUES(num_online_local_nodes), num_online_local_nodes),
has_opted_report_location = COALESCE(VALUES(has_opted_report_location), has_opted_report_location),
content_json = VALUES(content_json),
updated_at = CURRENT_TIMESTAMP;`
default:
return fmt.Errorf("unsupported database driver %q", s.driver)
}
_, err = s.db.Exec(query,
node.NodeID,
node.NodeNum,
node.LatestType,
node.UserID,
node.LongName,
node.ShortName,
node.HWModel,
node.Role,
node.IsLicensed,
node.PublicKey,
node.FirmwareVersion,
node.Region,
node.ModemPreset,
node.Latitude,
node.Longitude,
node.Altitude,
node.PositionPrecision,
node.NumOnlineLocalNodes,
node.HasOptedReportLocation,
string(node.ContentJSON),
)
if err != nil {
if err := s.upsertNodeInfoMapRecord(node); err != nil {
return fmt.Errorf("upsert nodeinfo_map %s: %w", node.NodeID, err)
}
return nil
}
func (s *store) upsertNodeInfoMapRecord(node *nodeInfoMapRecord) error {
return s.db.Transaction(func(tx *gorm.DB) error {
var existing nodeInfoMapRecord
err := tx.Where("node_id = ?", node.NodeID).Take(&existing).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
if err := tx.Create(node).Error; err != nil {
return s.updateNodeInfoMapRecord(tx, node)
}
return nil
}
if err != nil {
return err
}
return s.updateNodeInfoMapRecord(tx, node)
})
}
func (s *store) updateNodeInfoMapRecord(tx *gorm.DB, node *nodeInfoMapRecord) error {
updates := nodeInfoMapUpdates(node)
return tx.Model(&nodeInfoMapRecord{}).Where("node_id = ?", node.NodeID).Updates(updates).Error
}
func nodeInfoMapUpdates(node *nodeInfoMapRecord) map[string]any {
updates := map[string]any{
"node_num": node.NodeNum,
"latest_type": node.LatestType,
"content_json": node.ContentJSON,
"updated_at": time.Now(),
}
addStringUpdate(updates, "user_id", node.UserID)
addStringUpdate(updates, "long_name", node.LongName)
addStringUpdate(updates, "short_name", node.ShortName)
addStringUpdate(updates, "hw_model", node.HWModel)
addStringUpdate(updates, "role", node.Role)
addBoolUpdate(updates, "is_licensed", node.IsLicensed)
addStringUpdate(updates, "public_key", node.PublicKey)
addStringUpdate(updates, "firmware_version", node.FirmwareVersion)
addStringUpdate(updates, "region", node.Region)
addStringUpdate(updates, "modem_preset", node.ModemPreset)
addFloat64Update(updates, "latitude", node.Latitude)
addFloat64Update(updates, "longitude", node.Longitude)
addInt64Update(updates, "altitude", node.Altitude)
addInt64Update(updates, "position_precision", node.PositionPrecision)
addInt64Update(updates, "num_online_local_nodes", node.NumOnlineLocalNodes)
addBoolUpdate(updates, "has_opted_report_location", node.HasOptedReportLocation)
return updates
}
func (s *store) InsertTextMessage(record map[string]any, clientInfo mqttClientInfo) error {
message, err := textMessageFromRecord(record, clientInfo)
if err != nil {
return err
}
query := `INSERT INTO text_message (
from_id, from_num, text, payload_hex, topic, channel_id, gateway_id,
packet_id, packet_to, packet_to_num, portnum, payload_len,
payload_variant, via_mqtt, pki_encrypted, decrypt_success, decrypt_status,
mqtt_client_id, mqtt_username, mqtt_listener, mqtt_remote_addr,
mqtt_remote_host, mqtt_remote_port, content_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`
_, err = s.db.Exec(query,
message.FromID,
message.FromNum,
message.Text,
message.PayloadHex,
message.Topic,
message.ChannelID,
message.GatewayID,
message.PacketID,
message.PacketTo,
message.PacketToNum,
message.Portnum,
message.PayloadLen,
message.PayloadVariant,
message.ViaMQTT,
message.PKIEncrypted,
message.DecryptSuccess,
message.DecryptStatus,
message.MQTTClientID,
message.MQTTUsername,
message.MQTTListener,
message.MQTTRemoteAddr,
message.MQTTRemoteHost,
message.MQTTRemotePort,
string(message.ContentJSON),
)
if err != nil {
if err := s.db.Create(message).Error; err != nil {
return fmt.Errorf("insert text_message from %s: %w", message.FromID, err)
}
return nil
@@ -441,7 +276,7 @@ func nodeInfoMapFromRecord(record map[string]any) (*nodeInfoMapRecord, error) {
PositionPrecision: nullableInt64(record["position_precision"]),
NumOnlineLocalNodes: nullableInt64(record["num_online_local_nodes"]),
HasOptedReportLocation: nullableBool(record["has_opted_report_location"]),
ContentJSON: contentJSON,
ContentJSON: string(contentJSON),
}, nil
}
@@ -491,7 +326,7 @@ func textMessageFromRecord(record map[string]any, clientInfo mqttClientInfo) (*t
MQTTRemoteAddr: nullableString(clientInfo.RemoteAddr),
MQTTRemoteHost: nullableString(clientInfo.RemoteHost),
MQTTRemotePort: nullableString(clientInfo.RemotePort),
ContentJSON: contentJSON,
ContentJSON: string(contentJSON),
}, nil
}
@@ -524,7 +359,7 @@ func int64FromAny(value any) (int64, error) {
}
}
func nullableString(value any) any {
func nullableString(value any) *string {
if value == nil {
return nil
}
@@ -532,18 +367,18 @@ func nullableString(value any) any {
if !ok || s == "" {
return nil
}
return s
return &s
}
func nullableBool(value any) any {
func nullableBool(value any) *bool {
b, ok := value.(bool)
if !ok {
return nil
}
return b
return &b
}
func nullableInt64(value any) any {
func nullableInt64(value any) *int64 {
if value == nil {
return nil
}
@@ -551,36 +386,62 @@ func nullableInt64(value any) any {
if err != nil {
return nil
}
return v
return &v
}
func nullableFloat64(value any) any {
func nullableFloat64(value any) *float64 {
var out float64
switch v := value.(type) {
case float32:
return float64(v)
out = float64(v)
case float64:
return v
out = v
case int:
return float64(v)
out = float64(v)
case int8:
return float64(v)
out = float64(v)
case int16:
return float64(v)
out = float64(v)
case int32:
return float64(v)
out = float64(v)
case int64:
return float64(v)
out = float64(v)
case uint:
return float64(v)
out = float64(v)
case uint8:
return float64(v)
out = float64(v)
case uint16:
return float64(v)
out = float64(v)
case uint32:
return float64(v)
out = float64(v)
case uint64:
return float64(v)
out = float64(v)
default:
return nil
}
return &out
}
func addStringUpdate(updates map[string]any, column string, value *string) {
if value != nil {
updates[column] = *value
}
}
func addBoolUpdate(updates map[string]any, column string, value *bool) {
if value != nil {
updates[column] = *value
}
}
func addInt64Update(updates map[string]any, column string, value *int64) {
if value != nil {
updates[column] = *value
}
}
func addFloat64Update(updates map[string]any, column string, value *float64) {
if value != nil {
updates[column] = *value
}
}