From 7c1b30b3a06087096dea9aa9b62827f2094b8408 Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 4 Jun 2026 00:09:16 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E7=9B=B8=E5=85=B3=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filter_stats.go | 34 ++++++++++++++++++++++++++++++++++ main.go | 12 ++++++++---- mqtt_status.go | 5 +++-- 3 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 filter_stats.go diff --git a/filter_stats.go b/filter_stats.go new file mode 100644 index 0000000..27ba54f --- /dev/null +++ b/filter_stats.go @@ -0,0 +1,34 @@ +package main + +import "sync/atomic" + +type meshtasticMessageStats struct { + forwarded atomic.Int64 + dropped atomic.Int64 +} + +func (s *meshtasticMessageStats) IncForwarded() { + if s != nil { + s.forwarded.Add(1) + } +} + +func (s *meshtasticMessageStats) IncDropped() { + if s != nil { + s.dropped.Add(1) + } +} + +func (s *meshtasticMessageStats) Forwarded() int64 { + if s == nil { + return 0 + } + return s.forwarded.Load() +} + +func (s *meshtasticMessageStats) Dropped() int64 { + if s == nil { + return 0 + } + return s.dropped.Load() +} diff --git a/main.go b/main.go index 3b33e3b..ff9d558 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ type meshtasticFilterHook struct { mqtt.HookBase key []byte store *store + stats *meshtasticMessageStats } // ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。 @@ -52,8 +53,10 @@ func (h *meshtasticFilterHook) Provides(b byte) bool { func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key) if !valid { + h.stats.IncDropped() return pk, packets.ErrRejectPacket } + h.stats.IncForwarded() switch record["type"] { case "nodeinfo": @@ -193,7 +196,8 @@ func run(cfg *config) error { return err } - server, mqttAddr, err := startMQTTServer(cfg, store) + messageStats := &meshtasticMessageStats{} + server, mqttAddr, err := startMQTTServer(cfg, store, messageStats) if err != nil { return err } @@ -205,7 +209,7 @@ func run(cfg *config) error { if err != nil { return err } - mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled} + mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled, stats: messageStats} httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus) go func() { if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { @@ -237,12 +241,12 @@ func run(cfg *config) error { return runErr } -func startMQTTServer(cfg *config, store *store) (*mqtt.Server, string, error) { +func startMQTTServer(cfg *config, store *store, stats *meshtasticMessageStats) (*mqtt.Server, string, error) { server := mqtt.New(nil) if err := server.AddHook(new(auth.AllowHook), nil); err != nil { return nil, "", err } - if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store}, nil); err != nil { + if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store, stats: stats}, nil); err != nil { return nil, "", err } diff --git a/mqtt_status.go b/mqtt_status.go index 0360040..642a399 100644 --- a/mqtt_status.go +++ b/mqtt_status.go @@ -12,6 +12,7 @@ type mqttRuntimeStatus struct { server *mqtt.Server address string tls bool + stats *meshtasticMessageStats } type adminMqttStatus struct { @@ -67,8 +68,8 @@ func (m mqttRuntimeStatus) Status() adminMqttStatus { ClientsMaximum: info.ClientsMaximum, ClientsTotal: info.ClientsTotal, MessagesReceived: info.MessagesReceived, - MessagesSent: info.MessagesSent, - MessagesDropped: info.MessagesDropped, + MessagesSent: m.stats.Forwarded(), + MessagesDropped: m.stats.Dropped(), Retained: info.Retained, Inflight: info.Inflight, InflightDropped: info.InflightDropped,