增加数据库支持

This commit is contained in:
2026-06-03 14:09:48 +08:00
parent 44dfb14cf4
commit d887845909
8 changed files with 635 additions and 7 deletions
+41 -2
View File
@@ -25,6 +25,8 @@ go run .
- port`1883` - port`1883`
- PSK`AQ==` - PSK`AQ==`
- TLS:关闭 - TLS:关闭
- 数据库:SQLite
- SQLite 文件:Unix/Linux 为 `/srv/mesh_mqtt_go/mesh_mqtt_go.db`Windows 测试为 `./win/etc/mesh_mqtt_go/mesh_mqtt_go.db`
首次启动会自动生成配置文件;之后每次启动都会检查配置项,缺失项会自动补全并写回。 首次启动会自动生成配置文件;之后每次启动都会检查配置项,缺失项会自动补全并写回。
@@ -45,6 +47,12 @@ mqtt:
key_file: "" key_file: ""
meshtastic: meshtastic:
psk: AQ== psk: AQ==
database:
driver: sqlite
sqlite:
path: /srv/mesh_mqtt_go/mesh_mqtt_go.db
mysql:
dsn: ""
``` ```
配置优先级: 配置优先级:
@@ -66,8 +74,11 @@ go run . --host 127.0.0.1 --port 1883 --psk AQ==
--port MQTT broker listen port --port MQTT broker listen port
--psk Base64 channel PSK used to try decrypting encrypted packets --psk Base64 channel PSK used to try decrypting encrypted packets
--tls Enable MQTT TLS listener --tls Enable MQTT TLS listener
--tls-cert MQTT TLS certificate file --tls-cert MQTT TLS certificate file
--tls-key MQTT TLS private key file --tls-key MQTT TLS private key file
--db-driver Database driver: sqlite or mysql
--sqlite-path SQLite database file path
--mysql-dsn MySQL database DSN
``` ```
## TLS 配置示例 ## TLS 配置示例
@@ -86,6 +97,34 @@ meshtastic:
启用 TLS 后,`cert_file``key_file` 必须指向可读取的证书和私钥文件。 启用 TLS 后,`cert_file``key_file` 必须指向可读取的证书和私钥文件。
## 数据库持久化
程序默认启用 SQLite,并在收到 `nodeinfo` 数据包时写入 `nodeinfo` 表。
- 当前只持久化 `type == "nodeinfo"` 的记录
- 同一节点以 `node_id`(即解析结果中的 `from`,例如 `!a8dfd867`)作为主键
- 重复收到同一节点时不会插入重复行,只更新节点字段、`content_json``updated_at`
- `first_seen_at` 保留第一次写入时间
- `content_json` 保存完整的解析结果 JSON
SQLite 默认路径:
- Unix/Linux`/srv/mesh_mqtt_go/mesh_mqtt_go.db`
- Windows 测试:`./win/etc/mesh_mqtt_go/mesh_mqtt_go.db`
MySQL 配置示例:
```yaml
database:
driver: mysql
sqlite:
path: /srv/mesh_mqtt_go/mesh_mqtt_go.db
mysql:
dsn: mesh_user:mesh_pass@tcp(127.0.0.1:3306)/mesh_mqtt_go?parseTime=true&charset=utf8mb4,utf8
```
使用 MySQL 时,需要提前创建好 database/schema。
## 转发规则 ## 转发规则
程序监听所有传入 publish。payload 能被 `mqtpp.MQTTPP` 解析时,认为 `valid == true`broker 会继续把原始 MQTT 消息转发给订阅者;解析失败时,认为 `valid == false`broker 会拒绝并丢弃该 publish。 程序监听所有传入 publish。payload 能被 `mqtpp.MQTTPP` 解析时,认为 `valid == true`broker 会继续把原始 MQTT 消息转发给订阅者;解析失败时,认为 `valid == false`broker 会拒绝并丢弃该 publish。
+82
View File
@@ -15,6 +15,7 @@ const configFileName = "config.yaml"
type config struct { type config struct {
MQTT mqttConfig `yaml:"mqtt"` MQTT mqttConfig `yaml:"mqtt"`
Meshtastic meshtasticConfig `yaml:"meshtastic"` Meshtastic meshtasticConfig `yaml:"meshtastic"`
Database databaseConfig `yaml:"database"`
key []byte key []byte
} }
@@ -34,9 +35,24 @@ type meshtasticConfig struct {
PSK string `yaml:"psk"` PSK string `yaml:"psk"`
} }
type databaseConfig struct {
Driver string `yaml:"driver"`
SQLite sqliteConfig `yaml:"sqlite"`
MySQL mysqlConfig `yaml:"mysql"`
}
type sqliteConfig struct {
Path string `yaml:"path"`
}
type mysqlConfig struct {
DSN string `yaml:"dsn"`
}
type rawConfig struct { type rawConfig struct {
MQTT *rawMQTTConfig `yaml:"mqtt"` MQTT *rawMQTTConfig `yaml:"mqtt"`
Meshtastic *rawMeshtasticConfig `yaml:"meshtastic"` Meshtastic *rawMeshtasticConfig `yaml:"meshtastic"`
Database *rawDatabaseConfig `yaml:"database"`
} }
type rawMQTTConfig struct { type rawMQTTConfig struct {
@@ -55,6 +71,20 @@ type rawMeshtasticConfig struct {
PSK *string `yaml:"psk"` PSK *string `yaml:"psk"`
} }
type rawDatabaseConfig struct {
Driver *string `yaml:"driver"`
SQLite *rawSQLiteConfig `yaml:"sqlite"`
MySQL *rawMySQLConfig `yaml:"mysql"`
}
type rawSQLiteConfig struct {
Path *string `yaml:"path"`
}
type rawMySQLConfig struct {
DSN *string `yaml:"dsn"`
}
// defaultConfig 返回内置默认配置。 // defaultConfig 返回内置默认配置。
func defaultConfig() *config { func defaultConfig() *config {
return &config{ return &config{
@@ -70,6 +100,11 @@ func defaultConfig() *config {
Meshtastic: meshtasticConfig{ Meshtastic: meshtasticConfig{
PSK: "AQ==", PSK: "AQ==",
}, },
Database: databaseConfig{
Driver: "sqlite",
SQLite: sqliteConfig{Path: defaultSQLitePath()},
MySQL: mysqlConfig{DSN: ""},
},
} }
} }
@@ -86,6 +121,17 @@ func defaultConfigPath() string {
return filepath.Join(defaultConfigDir(), configFileName) return filepath.Join(defaultConfigDir(), configFileName)
} }
func defaultSQLitePath() string {
return defaultSQLitePathForGOOS(runtime.GOOS)
}
func defaultSQLitePathForGOOS(goos string) string {
if goos == "windows" {
return filepath.Join(".", "win", "etc", "mesh_mqtt_go", "mesh_mqtt_go.db")
}
return filepath.Join(string(filepath.Separator), "srv", "mesh_mqtt_go", "mesh_mqtt_go.db")
}
// loadConfig 加载配置文件;文件不存在时生成,字段缺失时自动补全并写回。 // loadConfig 加载配置文件;文件不存在时生成,字段缺失时自动补全并写回。
func loadConfig(path string) (*config, error) { func loadConfig(path string) (*config, error) {
if path == "" { if path == "" {
@@ -175,6 +221,30 @@ func normalizeConfig(raw rawConfig) (*config, bool) {
cfg.Meshtastic.PSK = *raw.Meshtastic.PSK cfg.Meshtastic.PSK = *raw.Meshtastic.PSK
} }
if raw.Database == nil {
changed = true
} else {
if raw.Database.Driver == nil {
changed = true
} else {
cfg.Database.Driver = *raw.Database.Driver
}
if raw.Database.SQLite == nil {
changed = true
} else if raw.Database.SQLite.Path == nil {
changed = true
} else {
cfg.Database.SQLite.Path = *raw.Database.SQLite.Path
}
if raw.Database.MySQL == nil {
changed = true
} else if raw.Database.MySQL.DSN == nil {
changed = true
} else {
cfg.Database.MySQL.DSN = *raw.Database.MySQL.DSN
}
}
return cfg, changed return cfg, changed
} }
@@ -182,6 +252,18 @@ func validateConfig(cfg *config) error {
if cfg.MQTT.Port <= 0 || cfg.MQTT.Port > 65535 { if cfg.MQTT.Port <= 0 || cfg.MQTT.Port > 65535 {
return fmt.Errorf("invalid mqtt port %d: must be 1-65535", cfg.MQTT.Port) return fmt.Errorf("invalid mqtt port %d: must be 1-65535", cfg.MQTT.Port)
} }
switch cfg.Database.Driver {
case "sqlite":
if cfg.Database.SQLite.Path == "" {
return fmt.Errorf("database.sqlite.path is required when database.driver is sqlite")
}
case "mysql":
if cfg.Database.MySQL.DSN == "" {
return fmt.Errorf("database.mysql.dsn is required when database.driver is mysql")
}
default:
return fmt.Errorf("invalid database.driver %q: must be sqlite or mysql", cfg.Database.Driver)
}
return nil return nil
} }
+45 -2
View File
@@ -26,6 +26,12 @@ func TestLoadConfigCreatesDefaultFile(t *testing.T) {
if cfg.Meshtastic.PSK != "AQ==" { if cfg.Meshtastic.PSK != "AQ==" {
t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK) t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK)
} }
if cfg.Database.Driver != "sqlite" {
t.Fatalf("database driver = %q, want sqlite", cfg.Database.Driver)
}
if cfg.Database.SQLite.Path == "" {
t.Fatalf("sqlite path is empty")
}
if _, err := os.Stat(path); err != nil { if _, err := os.Stat(path); err != nil {
t.Fatalf("default config was not written: %v", err) t.Fatalf("default config was not written: %v", err)
} }
@@ -53,13 +59,16 @@ func TestLoadConfigFillsMissingFields(t *testing.T) {
if cfg.Meshtastic.PSK != "AQ==" { if cfg.Meshtastic.PSK != "AQ==" {
t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK) t.Fatalf("psk = %q, want AQ==", cfg.Meshtastic.PSK)
} }
if cfg.Database.Driver != "sqlite" {
t.Fatalf("database driver = %q, want sqlite", cfg.Database.Driver)
}
data, err := os.ReadFile(path) data, err := os.ReadFile(path)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
text := string(data) text := string(data)
for _, want := range []string{"host:", "tls:", "enabled:", "cert_file:", "key_file:", "meshtastic:", "psk:"} { for _, want := range []string{"host:", "tls:", "enabled:", "cert_file:", "key_file:", "meshtastic:", "psk:", "database:", "driver:", "sqlite:", "mysql:", "dsn:"} {
if !strings.Contains(text, want) { if !strings.Contains(text, want) {
t.Fatalf("completed config missing %q in:\n%s", want, text) t.Fatalf("completed config missing %q in:\n%s", want, text)
} }
@@ -71,7 +80,7 @@ func TestLoadConfigPreservesExplicitFalse(t *testing.T) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
t.Fatal(err) t.Fatal(err)
} }
content := "mqtt:\n host: 127.0.0.1\n port: 1885\n tls:\n enabled: false\n cert_file: cert.pem\n key_file: key.pem\nmeshtastic:\n psk: AQ==\n" content := "mqtt:\n host: 127.0.0.1\n port: 1885\n tls:\n enabled: false\n cert_file: cert.pem\n key_file: key.pem\nmeshtastic:\n psk: AQ==\ndatabase:\n driver: sqlite\n sqlite:\n path: test.db\n mysql:\n dsn: \"\"\n"
if err := os.WriteFile(path, []byte(content), 0644); err != nil { if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -111,6 +120,40 @@ func TestLoadConfigMalformedYAMLDoesNotOverwrite(t *testing.T) {
} }
} }
func TestDefaultSQLitePathForGOOS(t *testing.T) {
windowsPath := defaultSQLitePathForGOOS("windows")
if !strings.Contains(windowsPath, filepath.Join("win", "etc", "mesh_mqtt_go", "mesh_mqtt_go.db")) {
t.Fatalf("windows sqlite path = %q", windowsPath)
}
linuxPath := defaultSQLitePathForGOOS("linux")
want := filepath.Join(string(filepath.Separator), "srv", "mesh_mqtt_go", "mesh_mqtt_go.db")
if linuxPath != want {
t.Fatalf("linux sqlite path = %q, want %q", linuxPath, want)
}
}
func TestValidateConfigDatabase(t *testing.T) {
cfg := defaultConfig()
cfg.Database.Driver = "postgres"
if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.driver") {
t.Fatalf("invalid driver error = %v, want database.driver error", err)
}
cfg = defaultConfig()
cfg.Database.SQLite.Path = ""
if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.sqlite.path") {
t.Fatalf("missing sqlite path error = %v, want database.sqlite.path error", err)
}
cfg = defaultConfig()
cfg.Database.Driver = "mysql"
cfg.Database.MySQL.DSN = ""
if err := validateConfig(cfg); err == nil || !strings.Contains(err.Error(), "database.mysql.dsn") {
t.Fatalf("missing mysql dsn error = %v, want database.mysql.dsn error", err)
}
}
func TestBuildTLSConfigDisabled(t *testing.T) { func TestBuildTLSConfigDisabled(t *testing.T) {
cfg, err := buildTLSConfig(tlsConfig{}) cfg, err := buildTLSConfig(tlsConfig{})
if err != nil { if err != nil {
+256
View File
@@ -0,0 +1,256 @@
package main
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
_ "github.com/go-sql-driver/mysql"
_ "modernc.org/sqlite"
)
const (
databaseDriverSQLite = "sqlite"
databaseDriverMySQL = "mysql"
)
type store struct {
db *sql.DB
driver string
}
type nodeInfoRecord struct {
NodeID string
NodeNum int64
UserID any
LongName any
ShortName any
HWModel any
Role any
IsLicensed bool
PublicKey any
ContentJSON []byte
}
func openStore(cfg databaseConfig) (*store, error) {
var dsn string
switch cfg.Driver {
case databaseDriverSQLite:
if err := os.MkdirAll(filepath.Dir(cfg.SQLite.Path), 0755); err != nil {
return nil, fmt.Errorf("create sqlite directory %s: %w", filepath.Dir(cfg.SQLite.Path), err)
}
dsn = cfg.SQLite.Path
case databaseDriverMySQL:
dsn = cfg.MySQL.DSN
default:
return nil, fmt.Errorf("unsupported database driver %q", cfg.Driver)
}
db, err := sql.Open(cfg.Driver, dsn)
if err != nil {
return nil, fmt.Errorf("open %s database: %w", cfg.Driver, err)
}
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("ping %s database: %w", cfg.Driver, err)
}
s := &store{db: db, driver: cfg.Driver}
if err := s.migrate(); err != nil {
db.Close()
return nil, err
}
return s, nil
}
func (s *store) Close() error {
if s == nil || s.db == nil {
return nil
}
return s.db.Close()
}
func (s *store) migrate() error {
var query string
switch s.driver {
case databaseDriverSQLite:
query = `CREATE TABLE IF NOT EXISTS nodeinfo (
node_id TEXT PRIMARY KEY,
node_num INTEGER NOT NULL,
user_id TEXT,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
role TEXT,
is_licensed BOOLEAN NOT NULL DEFAULT FALSE,
public_key TEXT,
content_json TEXT NOT NULL,
first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);`
case databaseDriverMySQL:
query = `CREATE TABLE IF NOT EXISTS nodeinfo (
node_id VARCHAR(32) NOT NULL PRIMARY KEY,
node_num BIGINT UNSIGNED NOT NULL,
user_id VARCHAR(128),
long_name TEXT,
short_name VARCHAR(64),
hw_model VARCHAR(128),
role VARCHAR(128),
is_licensed BOOLEAN NOT NULL DEFAULT FALSE,
public_key TEXT,
content_json JSON NOT NULL,
first_seen_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);`
default:
return fmt.Errorf("unsupported database driver %q", s.driver)
}
if _, err := s.db.Exec(query); err != nil {
return fmt.Errorf("migrate nodeinfo table: %w", err)
}
return nil
}
func (s *store) UpsertNodeInfo(record map[string]any) error {
node, err := nodeInfoFromRecord(record)
if err != nil {
return err
}
var query string
switch s.driver {
case databaseDriverSQLite:
query = `INSERT INTO nodeinfo (
node_id, node_num, user_id, long_name, short_name,
hw_model, role, is_licensed, public_key, content_json,
first_seen_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT(node_id) DO UPDATE SET
node_num = excluded.node_num,
user_id = excluded.user_id,
long_name = excluded.long_name,
short_name = excluded.short_name,
hw_model = excluded.hw_model,
role = excluded.role,
is_licensed = excluded.is_licensed,
public_key = excluded.public_key,
content_json = excluded.content_json,
updated_at = CURRENT_TIMESTAMP;`
case databaseDriverMySQL:
query = `INSERT INTO nodeinfo (
node_id, node_num, user_id, long_name, short_name,
hw_model, role, is_licensed, public_key, content_json,
first_seen_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON DUPLICATE KEY UPDATE
node_num = VALUES(node_num),
user_id = VALUES(user_id),
long_name = VALUES(long_name),
short_name = VALUES(short_name),
hw_model = VALUES(hw_model),
role = VALUES(role),
is_licensed = VALUES(is_licensed),
public_key = VALUES(public_key),
content_json = VALUES(content_json),
updated_at = CURRENT_TIMESTAMP;`
default:
return fmt.Errorf("unsupported database driver %q", s.driver)
}
_, err = s.db.Exec(query,
node.NodeID,
node.NodeNum,
node.UserID,
node.LongName,
node.ShortName,
node.HWModel,
node.Role,
node.IsLicensed,
node.PublicKey,
string(node.ContentJSON),
)
if err != nil {
return fmt.Errorf("upsert nodeinfo %s: %w", node.NodeID, err)
}
return nil
}
func nodeInfoFromRecord(record map[string]any) (*nodeInfoRecord, error) {
if record["type"] != "nodeinfo" {
return nil, fmt.Errorf("record type %v is not nodeinfo", record["type"])
}
nodeID, ok := record["from"].(string)
if !ok || nodeID == "" {
return nil, fmt.Errorf("nodeinfo missing from")
}
nodeNum, err := int64FromAny(record["from_num"])
if err != nil {
return nil, fmt.Errorf("nodeinfo from_num: %w", err)
}
contentJSON, err := json.Marshal(record)
if err != nil {
return nil, fmt.Errorf("encode nodeinfo content_json: %w", err)
}
return &nodeInfoRecord{
NodeID: nodeID,
NodeNum: nodeNum,
UserID: nullableString(record["user_id"]),
LongName: nullableString(record["long_name"]),
ShortName: nullableString(record["short_name"]),
HWModel: nullableString(record["hw_model"]),
Role: nullableString(record["role"]),
IsLicensed: boolFromAny(record["is_licensed"]),
PublicKey: nullableString(record["public_key"]),
ContentJSON: contentJSON,
}, nil
}
func int64FromAny(value any) (int64, error) {
switch v := value.(type) {
case int:
return int64(v), nil
case int8:
return int64(v), nil
case int16:
return int64(v), nil
case int32:
return int64(v), nil
case int64:
return v, nil
case uint:
return int64(v), nil
case uint8:
return int64(v), nil
case uint16:
return int64(v), nil
case uint32:
return int64(v), nil
case uint64:
return int64(v), nil
case float64:
return int64(v), nil
default:
return 0, fmt.Errorf("unsupported value %T", value)
}
}
func nullableString(value any) any {
if value == nil {
return nil
}
s, ok := value.(string)
if !ok || s == "" {
return nil
}
return s
}
func boolFromAny(value any) bool {
b, _ := value.(bool)
return b
}
+126
View File
@@ -0,0 +1,126 @@
package main
import (
"database/sql"
"path/filepath"
"strings"
"testing"
)
func TestOpenStoreCreatesNodeInfoTable(t *testing.T) {
st := openTestStore(t)
defer st.Close()
var name string
if err := st.db.QueryRow("SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'nodeinfo'").Scan(&name); err != nil {
t.Fatalf("nodeinfo table missing: %v", err)
}
if name != "nodeinfo" {
t.Fatalf("table name = %q, want nodeinfo", name)
}
}
func TestUpsertNodeInfoInsertsAndUpdatesSameNode(t *testing.T) {
st := openTestStore(t)
defer st.Close()
first := map[string]any{
"type": "nodeinfo",
"from": "!12345678",
"from_num": uint32(0x12345678),
"user_id": "!12345678",
"long_name": "first name",
"short_name": "fst",
"hw_model": "TEST_HW",
"role": "CLIENT",
"is_licensed": true,
"public_key": "abcd",
}
if err := st.UpsertNodeInfo(first); err != nil {
t.Fatalf("first UpsertNodeInfo() error = %v", err)
}
second := map[string]any{
"type": "nodeinfo",
"from": "!12345678",
"from_num": uint32(0x12345678),
"user_id": "!12345678",
"long_name": "second name",
"short_name": "snd",
"hw_model": "TEST_HW_2",
"role": "CLIENT_MUTE",
"is_licensed": false,
"public_key": nil,
}
if err := st.UpsertNodeInfo(second); err != nil {
t.Fatalf("second UpsertNodeInfo() error = %v", err)
}
var count int
if err := st.db.QueryRow("SELECT COUNT(*) FROM nodeinfo WHERE node_id = ?", "!12345678").Scan(&count); err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatalf("node row count = %d, want 1", count)
}
var longName, content string
if err := st.db.QueryRow("SELECT long_name, content_json FROM nodeinfo WHERE node_id = ?", "!12345678").Scan(&longName, &content); err != nil {
t.Fatal(err)
}
if longName != "second name" {
t.Fatalf("long_name = %q, want second name", longName)
}
if !strings.Contains(content, "second name") {
t.Fatalf("content_json = %q, want updated content", content)
}
}
func TestUpsertNodeInfoRequiresNodeFields(t *testing.T) {
st := openTestStore(t)
defer st.Close()
if err := st.UpsertNodeInfo(map[string]any{"type": "nodeinfo", "from_num": 1}); err == nil || !strings.Contains(err.Error(), "from") {
t.Fatalf("missing from error = %v, want from error", err)
}
if err := st.UpsertNodeInfo(map[string]any{"type": "nodeinfo", "from": "!00000001"}); err == nil || !strings.Contains(err.Error(), "from_num") {
t.Fatalf("missing from_num error = %v, want from_num error", err)
}
}
func openTestStore(t *testing.T) *store {
t.Helper()
st, err := openStore(databaseConfig{
Driver: databaseDriverSQLite,
SQLite: sqliteConfig{Path: filepath.Join(t.TempDir(), "mesh_mqtt_go.db")},
})
if err != nil {
t.Fatalf("openStore() error = %v", err)
}
return st
}
func TestNodeInfoFromRecordRejectsWrongType(t *testing.T) {
_, err := nodeInfoFromRecord(map[string]any{"type": "text_message"})
if err == nil {
t.Fatalf("nodeInfoFromRecord() error = nil, want error")
}
}
func TestNodeInfoNullablePublicKey(t *testing.T) {
st := openTestStore(t)
defer st.Close()
record := map[string]any{"type": "nodeinfo", "from": "!00000001", "from_num": 1, "public_key": nil}
if err := st.UpsertNodeInfo(record); err != nil {
t.Fatalf("UpsertNodeInfo() error = %v", err)
}
var publicKey sql.NullString
if err := st.db.QueryRow("SELECT public_key FROM nodeinfo WHERE node_id = ?", "!00000001").Scan(&publicKey); err != nil {
t.Fatal(err)
}
if publicKey.Valid {
t.Fatalf("public_key valid = true, want null")
}
}
+13 -1
View File
@@ -1,14 +1,26 @@
module meshtastic_mqtt_server module meshtastic_mqtt_server
go 1.23 go 1.25.0
require ( require (
github.com/go-sql-driver/mysql v1.10.0
github.com/mochi-mqtt/server/v2 v2.7.9 github.com/mochi-mqtt/server/v2 v2.7.9
google.golang.org/protobuf v1.36.11 google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.51.0
) )
require ( require (
filippo.io/edwards25519 v1.2.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rs/xid v1.4.0 // indirect github.com/rs/xid v1.4.0 // indirect
golang.org/x/sys v0.42.0 // indirect
modernc.org/libc v1.72.3 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
) )
+55
View File
@@ -1,20 +1,75 @@
filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo=
filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-sql-driver/mysql v1.10.0 h1:Q+1LV8DkHJvSYAdR83XzuhDaTykuDx0l6fkXxoWCWfw=
github.com/go-sql-driver/mysql v1.10.0/go.mod h1:M+cqaI7+xxXGG9swrdeUIoPG3Y3KCkF0pZej+SK+nWk=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI=
github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.28.2 h1:3tQ0lf2ADtoby2EtSP+J7IE2SHwEJdP8ioR59wx7XpY=
modernc.org/cc/v4 v4.28.2/go.mod h1:OnovgIhbbMXMu1aISnJ0wvVD1KnW+cAUJkIrAWh+kVI=
modernc.org/ccgo/v4 v4.34.0 h1:yRLPFZieg532OT4rp4JFNIVcquwalMX26G95WQDqwCQ=
modernc.org/ccgo/v4 v4.34.0/go.mod h1:AS5WYMyBakQ+fhsHhtP8mWB82KTGPkNNJDGfGQCe0/A=
modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM=
modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo=
modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.72.3 h1:ZnDF4tXn4NBXFutMMQC4vtbTFSXhhKzR73fv0beZEAU=
modernc.org/libc v1.72.3/go.mod h1:dn0dZNnnn1clLyvRxLxYExxiKRZIRENOfqQ8XEeg4Qs=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.2.0 h1:tGyef5ApycA7FSEOMraay9SaTk5zmbx7Tu+cJs4QKZg=
modernc.org/opt v0.2.0/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.51.0 h1:aH/MMSoayAIhozZ7uJbVTT9QO/VhzBf0J9tymmmuC/U=
modernc.org/sqlite v1.51.0/go.mod h1:tcNzv5p84E0skkmJn038y+hWJbLQXQqEnQfeh5r2JLM=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
+17 -2
View File
@@ -30,7 +30,8 @@ const (
type meshtasticFilterHook struct { type meshtasticFilterHook struct {
mqtt.HookBase mqtt.HookBase
key []byte key []byte
store *store
} }
// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。 // ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。
@@ -50,6 +51,11 @@ func (h *meshtasticFilterHook) OnPublish(_ *mqtt.Client, pk packets.Packet) (pac
return pk, packets.ErrRejectPacket return pk, packets.ErrRejectPacket
} }
if record["type"] == "nodeinfo" && h.store != nil {
if err := h.store.UpsertNodeInfo(record); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
if record["type"] != "empty_packet" { if record["type"] != "empty_packet" {
printJSON(record) printJSON(record)
} }
@@ -82,6 +88,9 @@ func parseArgs() (*config, error) {
flag.BoolVar(&cfg.MQTT.TLS.Enabled, "tls", cfg.MQTT.TLS.Enabled, "Enable MQTT TLS listener") flag.BoolVar(&cfg.MQTT.TLS.Enabled, "tls", cfg.MQTT.TLS.Enabled, "Enable MQTT TLS listener")
flag.StringVar(&cfg.MQTT.TLS.CertFile, "tls-cert", cfg.MQTT.TLS.CertFile, "MQTT TLS certificate file") flag.StringVar(&cfg.MQTT.TLS.CertFile, "tls-cert", cfg.MQTT.TLS.CertFile, "MQTT TLS certificate file")
flag.StringVar(&cfg.MQTT.TLS.KeyFile, "tls-key", cfg.MQTT.TLS.KeyFile, "MQTT TLS private key file") flag.StringVar(&cfg.MQTT.TLS.KeyFile, "tls-key", cfg.MQTT.TLS.KeyFile, "MQTT TLS private key file")
flag.StringVar(&cfg.Database.Driver, "db-driver", cfg.Database.Driver, "Database driver: sqlite or mysql")
flag.StringVar(&cfg.Database.SQLite.Path, "sqlite-path", cfg.Database.SQLite.Path, "SQLite database file path")
flag.StringVar(&cfg.Database.MySQL.DSN, "mysql-dsn", cfg.Database.MySQL.DSN, "MySQL database DSN")
flag.Parse() flag.Parse()
if err := validateConfig(cfg); err != nil { if err := validateConfig(cfg); err != nil {
@@ -97,11 +106,17 @@ func parseArgs() (*config, error) {
// run 创建 MQTT broker,监听传入 publish,并阻塞等待退出信号。 // run 创建 MQTT broker,监听传入 publish,并阻塞等待退出信号。
func run(cfg *config) error { func run(cfg *config) error {
store, err := openStore(cfg.Database)
if err != nil {
return err
}
defer store.Close()
server := mqtt.New(nil) server := mqtt.New(nil)
if err := server.AddHook(new(auth.AllowHook), nil); err != nil { if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
return err return err
} }
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key}, nil); err != nil { if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store}, nil); err != nil {
return err return err
} }