优化机器人ack
This commit is contained in:
@@ -41,6 +41,7 @@ type meshtasticFilterHook struct {
|
||||
blocking *blockingCache
|
||||
settings *runtimeSettingsCache
|
||||
pkiResolver func(toNodeNum, fromNodeNum uint32) ([]byte, []byte, bool)
|
||||
autoAcker func(record map[string]any)
|
||||
}
|
||||
|
||||
// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。
|
||||
@@ -83,6 +84,9 @@ func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (pa
|
||||
h.stats.IncForwarded()
|
||||
|
||||
h.dbQueue.EnqueueRecord(record, mqttClientInfoFromClient(cl))
|
||||
if h.autoAcker != nil {
|
||||
h.autoAcker(record)
|
||||
}
|
||||
if record["type"] != "empty_packet" {
|
||||
printJSON(record)
|
||||
}
|
||||
@@ -225,11 +229,12 @@ func run(cfg *config) error {
|
||||
}
|
||||
|
||||
messageStats := &meshtasticMessageStats{}
|
||||
server, mqttAddr, err := startMQTTServer(cfg, store, dbQueue, messageStats, blocking, settings)
|
||||
server, mqttHook, mqttAddr, err := startMQTTServer(cfg, store, dbQueue, messageStats, blocking, settings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
botSender := newBotService(store, server, cfg.key)
|
||||
mqttHook.autoAcker = botSender.MaybeAutoAck
|
||||
botCtx, stopBotBroadcaster := context.WithCancel(context.Background())
|
||||
defer stopBotBroadcaster()
|
||||
botSender.StartNodeInfoBroadcaster(botCtx)
|
||||
@@ -289,10 +294,10 @@ func run(cfg *config) error {
|
||||
return runErr
|
||||
}
|
||||
|
||||
func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, string, error) {
|
||||
func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, *meshtasticFilterHook, string, error) {
|
||||
server := mqtt.New(&mqtt.Options{InlineClient: true})
|
||||
if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, "", err
|
||||
}
|
||||
hook := &meshtasticFilterHook{
|
||||
key: cfg.key,
|
||||
@@ -303,23 +308,23 @@ func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *me
|
||||
pkiResolver: newPKIKeyResolver(store),
|
||||
}
|
||||
if err := server.AddHook(hook, nil); err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, "", err
|
||||
}
|
||||
|
||||
addr := net.JoinHostPort(cfg.MQTT.Host, strconv.Itoa(cfg.MQTT.Port))
|
||||
tlsConfig, err := buildTLSConfig(cfg.MQTT.TLS)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, "", err
|
||||
}
|
||||
listener := listeners.NewTCP(listeners.Config{ID: "tcp", Address: addr, TLSConfig: tlsConfig})
|
||||
if err := server.AddListener(listener); err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, "", err
|
||||
}
|
||||
if err := server.Serve(); err != nil {
|
||||
return nil, "", err
|
||||
return nil, nil, "", err
|
||||
}
|
||||
printJSON(map[string]any{"event": "broker_started", "address": addr, "tls": cfg.MQTT.TLS.Enabled})
|
||||
return server, addr, nil
|
||||
return server, hook, addr, nil
|
||||
}
|
||||
|
||||
// printJSON 将记录编码为 JSON 后按数据包类型着色输出。
|
||||
|
||||
Reference in New Issue
Block a user