From d887845909c92f8608d75b5032eca990300a68d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Wed, 3 Jun 2026 14:09:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 43 ++++++++- config.go | 82 ++++++++++++++++ config_test.go | 47 ++++++++- db.go | 256 +++++++++++++++++++++++++++++++++++++++++++++++++ db_test.go | 126 ++++++++++++++++++++++++ go.mod | 14 ++- go.sum | 55 +++++++++++ main.go | 19 +++- 8 files changed, 635 insertions(+), 7 deletions(-) create mode 100644 db.go create mode 100644 db_test.go diff --git a/README.md b/README.md index 0425db3..fdadc13 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ go run . - port:`1883` - PSK:`AQ==` - TLS:关闭 +- 数据库:SQLite +- SQLite 文件:Unix/Linux 为 `/srv/mesh_mqtt_go/mesh_mqtt_go.db`,Windows 测试为 `./win/etc/mesh_mqtt_go/mesh_mqtt_go.db` 首次启动会自动生成配置文件;之后每次启动都会检查配置项,缺失项会自动补全并写回。 @@ -45,6 +47,12 @@ mqtt: key_file: "" meshtastic: psk: AQ== +database: + driver: sqlite + sqlite: + path: /srv/mesh_mqtt_go/mesh_mqtt_go.db + mysql: + dsn: "" ``` 配置优先级: @@ -66,8 +74,11 @@ go run . --host 127.0.0.1 --port 1883 --psk AQ== --port MQTT broker listen port --psk Base64 channel PSK used to try decrypting encrypted packets --tls Enable MQTT TLS listener ---tls-cert MQTT TLS certificate file ---tls-key MQTT TLS private key file +--tls-cert MQTT TLS certificate file +--tls-key MQTT TLS private key file +--db-driver Database driver: sqlite or mysql +--sqlite-path SQLite database file path +--mysql-dsn MySQL database DSN ``` ## TLS 配置示例 @@ -86,6 +97,34 @@ meshtastic: 启用 TLS 后,`cert_file` 和 `key_file` 必须指向可读取的证书和私钥文件。 +## 数据库持久化 + +程序默认启用 SQLite,并在收到 `nodeinfo` 数据包时写入 `nodeinfo` 表。 + +- 当前只持久化 `type == "nodeinfo"` 的记录 +- 同一节点以 `node_id`(即解析结果中的 `from`,例如 `!a8dfd867`)作为主键 +- 重复收到同一节点时不会插入重复行,只更新节点字段、`content_json` 和 `updated_at` +- `first_seen_at` 保留第一次写入时间 +- `content_json` 保存完整的解析结果 JSON + +SQLite 默认路径: + +- Unix/Linux:`/srv/mesh_mqtt_go/mesh_mqtt_go.db` +- Windows 测试:`./win/etc/mesh_mqtt_go/mesh_mqtt_go.db` + +MySQL 配置示例: + +```yaml +database: + driver: mysql + sqlite: + path: /srv/mesh_mqtt_go/mesh_mqtt_go.db + mysql: + dsn: mesh_user:mesh_pass@tcp(127.0.0.1:3306)/mesh_mqtt_go?parseTime=true&charset=utf8mb4,utf8 +``` + +使用 MySQL 时,需要提前创建好 database/schema。 + ## 转发规则 程序监听所有传入 publish。payload 能被 `mqtpp.MQTTPP` 解析时,认为 `valid == true`,broker 会继续把原始 MQTT 消息转发给订阅者;解析失败时,认为 `valid == false`,broker 会拒绝并丢弃该 publish。 diff --git a/config.go b/config.go index 32579be..514b624 100644 --- a/config.go +++ b/config.go @@ -15,6 +15,7 @@ const configFileName = "config.yaml" type config struct { MQTT mqttConfig `yaml:"mqtt"` Meshtastic meshtasticConfig `yaml:"meshtastic"` + Database databaseConfig `yaml:"database"` key []byte } @@ -34,9 +35,24 @@ type meshtasticConfig struct { PSK string `yaml:"psk"` } +type databaseConfig struct { + Driver string `yaml:"driver"` + SQLite sqliteConfig `yaml:"sqlite"` + MySQL mysqlConfig `yaml:"mysql"` +} + +type sqliteConfig struct { + Path string `yaml:"path"` +} + +type mysqlConfig struct { + DSN string `yaml:"dsn"` +} + type rawConfig struct { MQTT *rawMQTTConfig `yaml:"mqtt"` Meshtastic *rawMeshtasticConfig `yaml:"meshtastic"` + Database *rawDatabaseConfig `yaml:"database"` } type rawMQTTConfig struct { @@ -55,6 +71,20 @@ type rawMeshtasticConfig struct { PSK *string `yaml:"psk"` } +type rawDatabaseConfig struct { + Driver *string `yaml:"driver"` + SQLite *rawSQLiteConfig `yaml:"sqlite"` + MySQL *rawMySQLConfig `yaml:"mysql"` +} + +type rawSQLiteConfig struct { + Path *string `yaml:"path"` +} + +type rawMySQLConfig struct { + DSN *string `yaml:"dsn"` +} + // defaultConfig 返回内置默认配置。 func defaultConfig() *config { return &config{ @@ -70,6 +100,11 @@ func defaultConfig() *config { Meshtastic: meshtasticConfig{ PSK: "AQ==", }, + Database: databaseConfig{ + Driver: "sqlite", + SQLite: sqliteConfig{Path: defaultSQLitePath()}, + MySQL: mysqlConfig{DSN: ""}, + }, } } @@ -86,6 +121,17 @@ func defaultConfigPath() string { return filepath.Join(defaultConfigDir(), configFileName) } +func defaultSQLitePath() string { + return defaultSQLitePathForGOOS(runtime.GOOS) +} + +func defaultSQLitePathForGOOS(goos string) string { + if goos == "windows" { + return filepath.Join(".", "win", "etc", "mesh_mqtt_go", "mesh_mqtt_go.db") + } + return filepath.Join(string(filepath.Separator), "srv", "mesh_mqtt_go", "mesh_mqtt_go.db") +} + // loadConfig 加载配置文件;文件不存在时生成,字段缺失时自动补全并写回。 func loadConfig(path string) (*config, error) { if path == "" { @@ -175,6 +221,30 @@ func normalizeConfig(raw rawConfig) (*config, bool) { cfg.Meshtastic.PSK = *raw.Meshtastic.PSK } + if raw.Database == nil { + changed = true + } else { + if raw.Database.Driver == nil { + changed = true + } else { + cfg.Database.Driver = *raw.Database.Driver + } + if raw.Database.SQLite == nil { + changed = true + } else if raw.Database.SQLite.Path == nil { + changed = true + } else { + cfg.Database.SQLite.Path = *raw.Database.SQLite.Path + } + if raw.Database.MySQL == nil { + changed = true + } else if raw.Database.MySQL.DSN == nil { + changed = true + } else { + cfg.Database.MySQL.DSN = *raw.Database.MySQL.DSN + } + } + return cfg, changed } @@ -182,6 +252,18 @@ func validateConfig(cfg *config) error { if cfg.MQTT.Port <= 0 || cfg.MQTT.Port > 65535 { return fmt.Errorf("invalid mqtt port %d: must be 1-65535", cfg.MQTT.Port) } + switch cfg.Database.Driver { + case "sqlite": + if cfg.Database.SQLite.Path == "" { + return fmt.Errorf("database.sqlite.path is required when database.driver is sqlite") + } + case "mysql": + if cfg.Database.MySQL.DSN == "" { + return fmt.Errorf("database.mysql.dsn is required when database.driver is mysql") + } + default: + return fmt.Errorf("invalid database.driver %q: must be sqlite or mysql", cfg.Database.Driver) + } return nil } diff --git a/config_test.go b/config_test.go index 4c0f4a7..46b5825 100644 --- a/config_test.go +++ b/config_test.go @@ -26,6 +26,12 @@ func TestLoadConfigCreatesDefaultFile(t *testing.T) { if cfg.Meshtastic.PSK != "AQ==" { t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK) } + if cfg.Database.Driver != "sqlite" { + t.Fatalf("database driver = %q, want sqlite", cfg.Database.Driver) + } + if cfg.Database.SQLite.Path == "" { + t.Fatalf("sqlite path is empty") + } if _, err := os.Stat(path); err != nil { t.Fatalf("default config was not written: %v", err) } @@ -53,13 +59,16 @@ func TestLoadConfigFillsMissingFields(t *testing.T) { if cfg.Meshtastic.PSK != "AQ==" { t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK) } + if cfg.Database.Driver != "sqlite" { + t.Fatalf("database driver = %q, want sqlite", cfg.Database.Driver) + } data, err := os.ReadFile(path) if err != nil { t.Fatal(err) } text := string(data) - for _, want := range []string{"host:", "tls:", "enabled:", "cert_file:", "key_file:", "meshtastic:", "psk:"} { + for _, want := range []string{"host:", "tls:", "enabled:", "cert_file:", "key_file:", "meshtastic:", "psk:", "database:", "driver:", "sqlite:", "mysql:", "dsn:"} { if !strings.Contains(text, want) { t.Fatalf("completed config missing %q in:\n%s", want, text) } @@ -71,7 +80,7 @@ func TestLoadConfigPreservesExplicitFalse(t *testing.T) { if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { t.Fatal(err) } - content := "mqtt:\n host: 127.0.0.1\n port: 1885\n tls:\n enabled: false\n cert_file: cert.pem\n key_file: key.pem\nmeshtastic:\n psk: AQ==\n" + content := "mqtt:\n host: 127.0.0.1\n port: 1885\n tls:\n enabled: false\n cert_file: cert.pem\n key_file: key.pem\nmeshtastic:\n psk: AQ==\ndatabase:\n driver: sqlite\n sqlite:\n path: test.db\n mysql:\n dsn: \"\"\n" if err := os.WriteFile(path, []byte(content), 0644); err != nil { t.Fatal(err) } @@ -111,6 +120,40 @@ func TestLoadConfigMalformedYAMLDoesNotOverwrite(t *testing.T) { } } +func TestDefaultSQLitePathForGOOS(t *testing.T) { + windowsPath := defaultSQLitePathForGOOS("windows") + if !strings.Contains(windowsPath, filepath.Join("win", "etc", "mesh_mqtt_go", "mesh_mqtt_go.db")) { + t.Fatalf("windows sqlite path = %q", windowsPath) + } + + linuxPath := defaultSQLitePathForGOOS("linux") + want := filepath.Join(string(filepath.Separator), "srv", "mesh_mqtt_go", "mesh_mqtt_go.db") + if linuxPath != want { + t.Fatalf("linux sqlite path = %q, want %q", linuxPath, want) + } +} + +func TestValidateConfigDatabase(t *testing.T) { + cfg := defaultConfig() + cfg.Database.Driver = "postgres" + if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.driver") { + t.Fatalf("invalid driver error = %v, want database.driver error", err) + } + + cfg = defaultConfig() + cfg.Database.SQLite.Path = "" + if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.sqlite.path") { + t.Fatalf("missing sqlite path error = %v, want database.sqlite.path error", err) + } + + cfg = defaultConfig() + cfg.Database.Driver = "mysql" + cfg.Database.MySQL.DSN = "" + if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.mysql.dsn") { + t.Fatalf("missing mysql dsn error = %v, want database.mysql.dsn error", err) + } +} + func TestBuildTLSConfigDisabled(t *testing.T) { cfg, err := buildTLSConfig(tlsConfig{}) if err != nil { diff --git a/db.go b/db.go new file mode 100644 index 0000000..201718e --- /dev/null +++ b/db.go @@ -0,0 +1,256 @@ +package main + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + + _ "github.com/go-sql-driver/mysql" + _ "modernc.org/sqlite" +) + +const ( + databaseDriverSQLite = "sqlite" + databaseDriverMySQL = "mysql" +) + +type store struct { + db *sql.DB + driver string +} + +type nodeInfoRecord struct { + NodeID string + NodeNum int64 + UserID any + LongName any + ShortName any + HWModel any + Role any + IsLicensed bool + PublicKey any + ContentJSON []byte +} + +func openStore(cfg databaseConfig) (*store, error) { + var dsn string + switch cfg.Driver { + case databaseDriverSQLite: + if err := os.MkdirAll(filepath.Dir(cfg.SQLite.Path), 0755); err != nil { + return nil, fmt.Errorf("create sqlite directory %s: %w", filepath.Dir(cfg.SQLite.Path), err) + } + dsn = cfg.SQLite.Path + case databaseDriverMySQL: + dsn = cfg.MySQL.DSN + default: + return nil, fmt.Errorf("unsupported database driver %q", cfg.Driver) + } + + db, err := sql.Open(cfg.Driver, dsn) + if err != nil { + return nil, fmt.Errorf("open %s database: %w", cfg.Driver, err) + } + if err := db.Ping(); err != nil { + db.Close() + return nil, fmt.Errorf("ping %s database: %w", cfg.Driver, err) + } + + s := &store{db: db, driver: cfg.Driver} + if err := s.migrate(); err != nil { + db.Close() + return nil, err + } + return s, nil +} + +func (s *store) Close() error { + if s == nil || s.db == nil { + return nil + } + return s.db.Close() +} + +func (s *store) migrate() error { + var query string + switch s.driver { + case databaseDriverSQLite: + query = `CREATE TABLE IF NOT EXISTS nodeinfo ( + node_id TEXT PRIMARY KEY, + node_num INTEGER NOT NULL, + user_id TEXT, + long_name TEXT, + short_name TEXT, + hw_model TEXT, + role TEXT, + is_licensed BOOLEAN NOT NULL DEFAULT FALSE, + public_key TEXT, + content_json TEXT NOT NULL, + first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +);` + case databaseDriverMySQL: + query = `CREATE TABLE IF NOT EXISTS nodeinfo ( + node_id VARCHAR(32) NOT NULL PRIMARY KEY, + node_num BIGINT UNSIGNED NOT NULL, + user_id VARCHAR(128), + long_name TEXT, + short_name VARCHAR(64), + hw_model VARCHAR(128), + role VARCHAR(128), + is_licensed BOOLEAN NOT NULL DEFAULT FALSE, + public_key TEXT, + content_json JSON NOT NULL, + first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +);` + default: + return fmt.Errorf("unsupported database driver %q", s.driver) + } + + if _, err := s.db.Exec(query); err != nil { + return fmt.Errorf("migrate nodeinfo table: %w", err) + } + return nil +} + +func (s *store) UpsertNodeInfo(record map[string]any) error { + node, err := nodeInfoFromRecord(record) + if err != nil { + return err + } + + var query string + switch s.driver { + case databaseDriverSQLite: + query = `INSERT INTO nodeinfo ( + node_id, node_num, user_id, long_name, short_name, + hw_model, role, is_licensed, public_key, content_json, + first_seen_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +ON CONFLICT(node_id) DO UPDATE SET + node_num = excluded.node_num, + user_id = excluded.user_id, + long_name = excluded.long_name, + short_name = excluded.short_name, + hw_model = excluded.hw_model, + role = excluded.role, + is_licensed = excluded.is_licensed, + public_key = excluded.public_key, + content_json = excluded.content_json, + updated_at = CURRENT_TIMESTAMP;` + case databaseDriverMySQL: + query = `INSERT INTO nodeinfo ( + node_id, node_num, user_id, long_name, short_name, + hw_model, role, is_licensed, public_key, content_json, + first_seen_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) +ON DUPLICATE KEY UPDATE + node_num = VALUES(node_num), + user_id = VALUES(user_id), + long_name = VALUES(long_name), + short_name = VALUES(short_name), + hw_model = VALUES(hw_model), + role = VALUES(role), + is_licensed = VALUES(is_licensed), + public_key = VALUES(public_key), + content_json = VALUES(content_json), + updated_at = CURRENT_TIMESTAMP;` + default: + return fmt.Errorf("unsupported database driver %q", s.driver) + } + + _, err = s.db.Exec(query, + node.NodeID, + node.NodeNum, + node.UserID, + node.LongName, + node.ShortName, + node.HWModel, + node.Role, + node.IsLicensed, + node.PublicKey, + string(node.ContentJSON), + ) + if err != nil { + return fmt.Errorf("upsert nodeinfo %s: %w", node.NodeID, err) + } + return nil +} + +func nodeInfoFromRecord(record map[string]any) (*nodeInfoRecord, error) { + if record["type"] != "nodeinfo" { + return nil, fmt.Errorf("record type %v is not nodeinfo", record["type"]) + } + nodeID, ok := record["from"].(string) + if !ok || nodeID == "" { + return nil, fmt.Errorf("nodeinfo missing from") + } + nodeNum, err := int64FromAny(record["from_num"]) + if err != nil { + return nil, fmt.Errorf("nodeinfo from_num: %w", err) + } + contentJSON, err := json.Marshal(record) + if err != nil { + return nil, fmt.Errorf("encode nodeinfo content_json: %w", err) + } + + return &nodeInfoRecord{ + NodeID: nodeID, + NodeNum: nodeNum, + UserID: nullableString(record["user_id"]), + LongName: nullableString(record["long_name"]), + ShortName: nullableString(record["short_name"]), + HWModel: nullableString(record["hw_model"]), + Role: nullableString(record["role"]), + IsLicensed: boolFromAny(record["is_licensed"]), + PublicKey: nullableString(record["public_key"]), + ContentJSON: contentJSON, + }, nil +} + +func int64FromAny(value any) (int64, error) { + switch v := value.(type) { + case int: + return int64(v), nil + case int8: + return int64(v), nil + case int16: + return int64(v), nil + case int32: + return int64(v), nil + case int64: + return v, nil + case uint: + return int64(v), nil + case uint8: + return int64(v), nil + case uint16: + return int64(v), nil + case uint32: + return int64(v), nil + case uint64: + return int64(v), nil + case float64: + return int64(v), nil + default: + return 0, fmt.Errorf("unsupported value %T", value) + } +} + +func nullableString(value any) any { + if value == nil { + return nil + } + s, ok := value.(string) + if !ok || s == "" { + return nil + } + return s +} + +func boolFromAny(value any) bool { + b, _ := value.(bool) + return b +} diff --git a/db_test.go b/db_test.go new file mode 100644 index 0000000..6034dbf --- /dev/null +++ b/db_test.go @@ -0,0 +1,126 @@ +package main + +import ( + "database/sql" + "path/filepath" + "strings" + "testing" +) + +func TestOpenStoreCreatesNodeInfoTable(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + var name string + if err := st.db.QueryRow("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'nodeinfo'").Scan(&name); err != nil { + t.Fatalf("nodeinfo table missing: %v", err) + } + if name != "nodeinfo" { + t.Fatalf("table name = %q, want nodeinfo", name) + } +} + +func TestUpsertNodeInfoInsertsAndUpdatesSameNode(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + first := map[string]any{ + "type": "nodeinfo", + "from": "!12345678", + "from_num": uint32(0x12345678), + "user_id": "!12345678", + "long_name": "first name", + "short_name": "fst", + "hw_model": "TEST_HW", + "role": "CLIENT", + "is_licensed": true, + "public_key": "abcd", + } + if err := st.UpsertNodeInfo(first); err != nil { + t.Fatalf("first UpsertNodeInfo() error = %v", err) + } + + second := map[string]any{ + "type": "nodeinfo", + "from": "!12345678", + "from_num": uint32(0x12345678), + "user_id": "!12345678", + "long_name": "second name", + "short_name": "snd", + "hw_model": "TEST_HW_2", + "role": "CLIENT_MUTE", + "is_licensed": false, + "public_key": nil, + } + if err := st.UpsertNodeInfo(second); err != nil { + t.Fatalf("second UpsertNodeInfo() error = %v", err) + } + + var count int + if err := st.db.QueryRow("SELECT COUNT(*) FROM nodeinfo WHERE node_id = ?", "!12345678").Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Fatalf("node row count = %d, want 1", count) + } + + var longName, content string + if err := st.db.QueryRow("SELECT long_name, content_json FROM nodeinfo WHERE node_id = ?", "!12345678").Scan(&longName, &content); err != nil { + t.Fatal(err) + } + if longName != "second name" { + t.Fatalf("long_name = %q, want second name", longName) + } + if !strings.Contains(content, "second name") { + t.Fatalf("content_json = %q, want updated content", content) + } +} + +func TestUpsertNodeInfoRequiresNodeFields(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + if err := st.UpsertNodeInfo(map[string]any{"type": "nodeinfo", "from_num": 1}); err == nil || !strings.Contains(err.Error(), "from") { + t.Fatalf("missing from error = %v, want from error", err) + } + if err := st.UpsertNodeInfo(map[string]any{"type": "nodeinfo", "from": "!00000001"}); err == nil || !strings.Contains(err.Error(), "from_num") { + t.Fatalf("missing from_num error = %v, want from_num error", err) + } +} + +func openTestStore(t *testing.T) *store { + t.Helper() + st, err := openStore(databaseConfig{ + Driver: databaseDriverSQLite, + SQLite: sqliteConfig{Path: filepath.Join(t.TempDir(), "mesh_mqtt_go.db")}, + }) + if err != nil { + t.Fatalf("openStore() error = %v", err) + } + return st +} + +func TestNodeInfoFromRecordRejectsWrongType(t *testing.T) { + _, err := nodeInfoFromRecord(map[string]any{"type": "text_message"}) + if err == nil { + t.Fatalf("nodeInfoFromRecord() error = nil, want error") + } +} + +func TestNodeInfoNullablePublicKey(t *testing.T) { + st := openTestStore(t) + defer st.Close() + + record := map[string]any{"type": "nodeinfo", "from": "!00000001", "from_num": 1, "public_key": nil} + if err := st.UpsertNodeInfo(record); err != nil { + t.Fatalf("UpsertNodeInfo() error = %v", err) + } + + var publicKey sql.NullString + if err := st.db.QueryRow("SELECT public_key FROM nodeinfo WHERE node_id = ?", "!00000001").Scan(&publicKey); err != nil { + t.Fatal(err) + } + if publicKey.Valid { + t.Fatalf("public_key valid = true, want null") + } +} diff --git a/go.mod b/go.mod index 35c793d..fc60410 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,26 @@ module meshtastic_mqtt_server -go 1.23 +go 1.25.0 require ( + github.com/go-sql-driver/mysql v1.10.0 github.com/mochi-mqtt/server/v2 v2.7.9 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.51.0 ) require ( + filippo.io/edwards25519 v1.2.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/xid v1.4.0 // indirect + golang.org/x/sys v0.42.0 // indirect + modernc.org/libc v1.72.3 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index a2b9fe3..96ea3b3 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,75 @@ +filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= +filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/go-sql-driver/mysql v1.10.0 h1:Q+1LV8DkHJvSYAdR83XzuhDaTykuDx0l6fkXxoWCWfw= +github.com/go-sql-driver/mysql v1.10.0/go.mod h1:M+cqaI7+xxXGG9swrdeUIoPG3Y3KCkF0pZej+SK+nWk= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY= +modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI= +modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ= +modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A= +modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM= +modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo= +modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU= +modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg= +modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.51.0 h1:aH/MMSoayAIhozZ7uJbVTT9QO/VhzBf0J9tymmmuC/U= +modernc.org/sqlite v1.51.0/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/main.go b/main.go index ee319ab..9546a83 100644 --- a/main.go +++ b/main.go @@ -30,7 +30,8 @@ const ( type meshtasticFilterHook struct { mqtt.HookBase - key []byte + key []byte + store *store } // ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。 @@ -50,6 +51,11 @@ func (h *meshtasticFilterHook) OnPublish(_ *mqtt.Client, pk packets.Packet) (pac return pk, packets.ErrRejectPacket } + if record["type"] == "nodeinfo" && h.store != nil { + if err := h.store.UpsertNodeInfo(record); err != nil { + printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()}) + } + } if record["type"] != "empty_packet" { printJSON(record) } @@ -82,6 +88,9 @@ func parseArgs() (*config, error) { flag.BoolVar(&cfg.MQTT.TLS.Enabled, "tls", cfg.MQTT.TLS.Enabled, "Enable MQTT TLS listener") flag.StringVar(&cfg.MQTT.TLS.CertFile, "tls-cert", cfg.MQTT.TLS.CertFile, "MQTT TLS certificate file") flag.StringVar(&cfg.MQTT.TLS.KeyFile, "tls-key", cfg.MQTT.TLS.KeyFile, "MQTT TLS private key file") + flag.StringVar(&cfg.Database.Driver, "db-driver", cfg.Database.Driver, "Database driver: sqlite or mysql") + flag.StringVar(&cfg.Database.SQLite.Path, "sqlite-path", cfg.Database.SQLite.Path, "SQLite database file path") + flag.StringVar(&cfg.Database.MySQL.DSN, "mysql-dsn", cfg.Database.MySQL.DSN, "MySQL database DSN") flag.Parse() if err := validateConfig(cfg); err != nil { @@ -97,11 +106,17 @@ func parseArgs() (*config, error) { // run 创建 MQTT broker,监听传入 publish,并阻塞等待退出信号。 func run(cfg *config) error { + store, err := openStore(cfg.Database) + if err != nil { + return err + } + defer store.Close() + server := mqtt.New(nil) if err := server.AddHook(new(auth.AllowHook), nil); err != nil { return err } - if err := server.AddHook(&meshtasticFilterHook{key: cfg.key}, nil); err != nil { + if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store}, nil); err != nil { return err }