异步保存sql消息

This commit is contained in:
2026-06-05 13:34:54 +08:00
parent 91eb43d00c
commit 081600eff0
6 changed files with 245 additions and 54 deletions
+123
View File
@@ -0,0 +1,123 @@
package main
import "sync"
type dbWriteQueue struct {
store *store
jobs chan dbWriteJob
wg sync.WaitGroup
}
type dbWriteJob struct {
typeName string
from any
run func() error
errorEvent map[string]any
}
func newDBWriteQueue(store *store) *dbWriteQueue {
if store == nil {
return nil
}
q := &dbWriteQueue{
store: store,
jobs: make(chan dbWriteJob, 1024),
}
q.wg.Add(1)
go q.run()
return q
}
func (q *dbWriteQueue) EnqueueRecord(record map[string]any, clientInfo mqttClientInfo) {
if q == nil {
return
}
record = cloneDBWriteRecord(record)
switch record["type"] {
case "nodeinfo":
q.enqueue(dbWriteJob{typeName: "nodeinfo", from: record["from"], run: func() error {
return q.store.UpsertNodeInfo(record)
}})
case "map_report":
q.enqueue(dbWriteJob{typeName: "map_report", from: record["from"], run: func() error {
return q.store.UpsertMapReport(record)
}})
case "text_message":
q.enqueue(dbWriteJob{typeName: "text_message", from: record["from"], run: func() error {
return q.store.InsertTextMessage(record, clientInfo)
}})
case "position":
q.enqueue(dbWriteJob{typeName: "position", from: record["from"], run: func() error {
return q.store.InsertPosition(record, clientInfo)
}})
case "telemetry":
q.enqueue(dbWriteJob{typeName: "telemetry", from: record["from"], run: func() error {
return q.store.InsertTelemetry(record, clientInfo)
}})
case "routing":
q.enqueue(dbWriteJob{typeName: "routing", from: record["from"], run: func() error {
return q.store.InsertRouting(record, clientInfo)
}})
case "traceroute":
q.enqueue(dbWriteJob{typeName: "traceroute", from: record["from"], run: func() error {
return q.store.InsertTraceroute(record, clientInfo)
}})
}
}
func (q *dbWriteQueue) EnqueueDiscard(record map[string]any, raw []byte, clientInfo mqttClientInfo) {
if q == nil {
return
}
record = cloneDBWriteRecord(record)
raw = append([]byte(nil), raw...)
q.enqueue(dbWriteJob{typeName: "discard_details", from: record["from"], errorEvent: map[string]any{"event": "db_error", "type": "discard_details", "topic": record["topic"]}, run: func() error {
return q.store.InsertDiscardDetails(record, raw, clientInfo)
}})
}
func (q *dbWriteQueue) Close() {
if q == nil {
return
}
close(q.jobs)
q.wg.Wait()
}
func (q *dbWriteQueue) Len() int {
if q == nil {
return 0
}
return len(q.jobs)
}
func (q *dbWriteQueue) enqueue(job dbWriteJob) {
q.jobs <- job
}
func (q *dbWriteQueue) run() {
defer q.wg.Done()
for job := range q.jobs {
if err := job.run(); err != nil {
event := job.errorEvent
if event == nil {
event = map[string]any{"event": "db_error", "type": job.typeName, "from": job.from}
} else {
event = cloneDBWriteRecord(event)
}
event["error"] = err.Error()
printJSON(event)
}
}
}
func cloneDBWriteRecord(record map[string]any) map[string]any {
if record == nil {
return nil
}
cloned := make(map[string]any, len(record))
for key, value := range record {
cloned[key] = value
}
return cloned
}
+104
View File
@@ -0,0 +1,104 @@
package main
import (
"database/sql"
"testing"
)
func TestDBWriteQueueWritesRecordsAsync(t *testing.T) {
st := openTestStore(t)
defer st.Close()
queue := newDBWriteQueue(st)
record := textMessageTestRecord("queued")
queue.EnqueueRecord(record, mqttClientInfo{ClientID: "client-1"})
record["text"] = "mutated after enqueue"
queue.Close()
var text, clientID string
if err := rawTestDB(t, st).QueryRow("SELECT text, mqtt_client_id FROM text_message WHERE from_id = ?", "!12345678").Scan(&text, &clientID); err != nil {
t.Fatal(err)
}
if text != "queued" || clientID != "client-1" {
t.Fatalf("queued row = text %q client %q, want queued/client-1", text, clientID)
}
}
func TestDBWriteQueueWritesDiscardAsync(t *testing.T) {
st := openTestStore(t)
defer st.Close()
queue := newDBWriteQueue(st)
record := map[string]any{"topic": "msh/test", "error": "bad packet"}
queue.EnqueueDiscard(record, []byte{1, 2, 3}, mqttClientInfo{RemoteAddr: "127.0.0.1:1883"})
record["error"] = "mutated after enqueue"
queue.Close()
var topic, reason, rawBase64, remoteAddr string
if err := rawTestDB(t, st).QueryRow("SELECT topic, error, raw_base64, mqtt_remote_addr FROM discard_details").Scan(&topic, &reason, &rawBase64, &remoteAddr); err != nil {
t.Fatal(err)
}
if topic != "msh/test" || reason != "bad packet" || rawBase64 != "AQID" || remoteAddr != "127.0.0.1:1883" {
t.Fatalf("discard row = %q/%q/%q/%q, want queued values", topic, reason, rawBase64, remoteAddr)
}
}
func TestDBWriteQueueLen(t *testing.T) {
queue := &dbWriteQueue{jobs: make(chan dbWriteJob, 1)}
queue.enqueue(dbWriteJob{run: func() error { return nil }})
if queue.Len() != 1 {
t.Fatalf("queue.Len() = %d, want 1", queue.Len())
}
}
func TestDBWriteQueueIgnoresUnsupportedRecordType(t *testing.T) {
st := openTestStore(t)
defer st.Close()
queue := newDBWriteQueue(st)
queue.EnqueueRecord(map[string]any{"type": "empty_packet", "from": "!12345678"}, mqttClientInfo{})
queue.Close()
var count int
if err := rawTestDB(t, st).QueryRow("SELECT COUNT(*) FROM text_message").Scan(&count); err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatalf("text_message count = %d, want 0", count)
}
}
func TestDBWriteQueueNilStore(t *testing.T) {
if queue := newDBWriteQueue(nil); queue != nil {
t.Fatalf("newDBWriteQueue(nil) = %#v, want nil", queue)
}
var queue *dbWriteQueue
queue.EnqueueRecord(textMessageTestRecord("ignored"), mqttClientInfo{})
queue.EnqueueDiscard(map[string]any{"topic": "ignored"}, []byte{1}, mqttClientInfo{})
queue.Close()
}
func TestDBWriteQueueRecordValidationErrorDoesNotStopWorker(t *testing.T) {
st := openTestStore(t)
defer st.Close()
queue := newDBWriteQueue(st)
badRecord := textMessageTestRecord("bad")
delete(badRecord, "from")
queue.EnqueueRecord(badRecord, mqttClientInfo{})
queue.EnqueueRecord(textMessageTestRecord("good"), mqttClientInfo{})
queue.Close()
var text string
if err := rawTestDB(t, st).QueryRow("SELECT text FROM text_message").Scan(&text); err != nil {
t.Fatal(err)
}
if text != "good" {
t.Fatalf("text = %q, want good", text)
}
var missing sql.NullString
if err := rawTestDB(t, st).QueryRow("SELECT text FROM text_message WHERE text = ?", "bad").Scan(&missing); err != sql.ErrNoRows {
t.Fatalf("bad row error = %v, want sql.ErrNoRows", err)
}
}
+12 -53
View File
@@ -36,7 +36,7 @@ const (
type meshtasticFilterHook struct { type meshtasticFilterHook struct {
mqtt.HookBase mqtt.HookBase
key []byte key []byte
store *store dbQueue *dbWriteQueue
stats *meshtasticMessageStats stats *meshtasticMessageStats
blocking *blockingCache blocking *blockingCache
} }
@@ -77,50 +77,7 @@ func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (pa
} }
h.stats.IncForwarded() h.stats.IncForwarded()
switch record["type"] { h.dbQueue.EnqueueRecord(record, mqttClientInfoFromClient(cl))
case "nodeinfo":
if h.store != nil {
if err := h.store.UpsertNodeInfo(record); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "map_report":
if h.store != nil {
if err := h.store.UpsertMapReport(record); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "text_message":
if h.store != nil {
if err := h.store.InsertTextMessage(record, mqttClientInfoFromClient(cl)); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "position":
if h.store != nil {
if err := h.store.InsertPosition(record, mqttClientInfoFromClient(cl)); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "telemetry":
if h.store != nil {
if err := h.store.InsertTelemetry(record, mqttClientInfoFromClient(cl)); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "routing":
if h.store != nil {
if err := h.store.InsertRouting(record, mqttClientInfoFromClient(cl)); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
case "traceroute":
if h.store != nil {
if err := h.store.InsertTraceroute(record, mqttClientInfoFromClient(cl)); err != nil {
printJSON(map[string]any{"event": "db_error", "type": record["type"], "from": record["from"], "error": err.Error()})
}
}
}
if record["type"] != "empty_packet" { if record["type"] != "empty_packet" {
printJSON(record) printJSON(record)
} }
@@ -131,11 +88,11 @@ func (h *meshtasticFilterHook) rejectPublish(cl *mqtt.Client, pk packets.Packet,
if h.stats != nil { if h.stats != nil {
h.stats.IncDropped() h.stats.IncDropped()
} }
if h.store != nil { if record == nil {
if err := h.store.InsertDiscardDetails(record, pk.Payload, mqttClientInfoFromClient(cl)); err != nil { record = map[string]any{}
printJSON(map[string]any{"event": "db_error", "type": "discard_details", "topic": pk.TopicName, "error": err.Error()})
}
} }
record["topic"] = pk.TopicName
h.dbQueue.EnqueueDiscard(record, pk.Payload, mqttClientInfoFromClient(cl))
} }
func blockingViolationForRecord(blocking *blockingCache, record map[string]any) map[string]any { func blockingViolationForRecord(blocking *blockingCache, record map[string]any) map[string]any {
@@ -246,6 +203,8 @@ func run(cfg *config) error {
return err return err
} }
defer store.Close() defer store.Close()
dbQueue := newDBWriteQueue(store)
defer dbQueue.Close()
if err := store.EnsureDefaultAdmin(cfg.Web.Admin.Username, cfg.Web.Admin.Password); err != nil { if err := store.EnsureDefaultAdmin(cfg.Web.Admin.Username, cfg.Web.Admin.Password); err != nil {
return err return err
} }
@@ -256,7 +215,7 @@ func run(cfg *config) error {
} }
messageStats := &meshtasticMessageStats{} messageStats := &meshtasticMessageStats{}
server, mqttAddr, err := startMQTTServer(cfg, store, messageStats, blocking) server, mqttAddr, err := startMQTTServer(cfg, dbQueue, messageStats, blocking)
if err != nil { if err != nil {
return err return err
} }
@@ -268,7 +227,7 @@ func run(cfg *config) error {
if err != nil { if err != nil {
return err return err
} }
mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled, stats: messageStats} mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled, stats: messageStats, dbQueue: dbQueue}
httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus, blocking) httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus, blocking)
webAddress := httpServer.Addr webAddress := httpServer.Addr
go func() { go func() {
@@ -310,12 +269,12 @@ func run(cfg *config) error {
return runErr return runErr
} }
func startMQTTServer(cfg *config, store *store, stats *meshtasticMessageStats, blocking *blockingCache) (*mqtt.Server, string, error) { func startMQTTServer(cfg *config, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache) (*mqtt.Server, string, error) {
server := mqtt.New(nil) server := mqtt.New(nil)
if err := server.AddHook(new(auth.AllowHook), nil); err != nil { if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
return nil, "", err return nil, "", err
} }
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store, stats: stats, blocking: blocking}, nil); err != nil { if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, dbQueue: dbQueue, stats: stats, blocking: blocking}, nil); err != nil {
return nil, "", err return nil, "", err
} }
@@ -62,6 +62,7 @@ onBeforeUnmount(() => {
<div><span>当前连接</span><strong>{{ status.clients_connected }}</strong></div> <div><span>当前连接</span><strong>{{ status.clients_connected }}</strong></div>
<div><span>订阅数</span><strong>{{ status.subscriptions }}</strong></div> <div><span>订阅数</span><strong>{{ status.subscriptions }}</strong></div>
<div><span>转发消息</span><strong>{{ status.messages_sent }}</strong></div> <div><span>转发消息</span><strong>{{ status.messages_sent }}</strong></div>
<div><span>数据库队列</span><strong>{{ status.db_write_queue_length }}</strong></div>
<a class="status-card-link" href="/admin/discard_details"><span>丢弃消息</span><strong>{{ status.messages_dropped }}</strong></a> <a class="status-card-link" href="/admin/discard_details"><span>丢弃消息</span><strong>{{ status.messages_dropped }}</strong></a>
<div><span>收到包</span><strong>{{ status.packets_received }}</strong></div> <div><span>收到包</span><strong>{{ status.packets_received }}</strong></div>
<div><span>发送包</span><strong>{{ status.packets_sent }}</strong></div> <div><span>发送包</span><strong>{{ status.packets_sent }}</strong></div>
+1
View File
@@ -220,6 +220,7 @@ export interface AdminMqttStatus {
messages_received: number messages_received: number
messages_sent: number messages_sent: number
messages_dropped: number messages_dropped: number
db_write_queue_length: number
retained: number retained: number
inflight: number inflight: number
inflight_dropped: number inflight_dropped: number
+4 -1
View File
@@ -13,6 +13,7 @@ type mqttRuntimeStatus struct {
address string address string
tls bool tls bool
stats *meshtasticMessageStats stats *meshtasticMessageStats
dbQueue *dbWriteQueue
} }
type adminMqttStatus struct { type adminMqttStatus struct {
@@ -31,6 +32,7 @@ type adminMqttStatus struct {
MessagesReceived int64 `json:"messages_received"` MessagesReceived int64 `json:"messages_received"`
MessagesSent int64 `json:"messages_sent"` MessagesSent int64 `json:"messages_sent"`
MessagesDropped int64 `json:"messages_dropped"` MessagesDropped int64 `json:"messages_dropped"`
DBWriteQueueLength int `json:"db_write_queue_length"`
Retained int64 `json:"retained"` Retained int64 `json:"retained"`
Inflight int64 `json:"inflight"` Inflight int64 `json:"inflight"`
InflightDropped int64 `json:"inflight_dropped"` InflightDropped int64 `json:"inflight_dropped"`
@@ -51,7 +53,7 @@ type adminMqttClient struct {
func (m mqttRuntimeStatus) Status() adminMqttStatus { func (m mqttRuntimeStatus) Status() adminMqttStatus {
if m.server == nil || m.server.Info == nil { if m.server == nil || m.server.Info == nil {
return adminMqttStatus{Running: false, Address: m.address, TLS: m.tls} return adminMqttStatus{Running: false, Address: m.address, TLS: m.tls, DBWriteQueueLength: m.dbQueue.Len()}
} }
info := m.server.Info.Clone() info := m.server.Info.Clone()
status := adminMqttStatus{ status := adminMqttStatus{
@@ -70,6 +72,7 @@ func (m mqttRuntimeStatus) Status() adminMqttStatus {
MessagesReceived: info.MessagesReceived, MessagesReceived: info.MessagesReceived,
MessagesSent: m.stats.Forwarded(), MessagesSent: m.stats.Forwarded(),
MessagesDropped: m.stats.Dropped(), MessagesDropped: m.stats.Dropped(),
DBWriteQueueLength: m.dbQueue.Len(),
Retained: info.Retained, Retained: info.Retained,
Inflight: info.Inflight, Inflight: info.Inflight,
InflightDropped: info.InflightDropped, InflightDropped: info.InflightDropped,