更新机器人接收功能
This commit is contained in:
@@ -35,11 +35,12 @@ const (
|
||||
|
||||
type meshtasticFilterHook struct {
|
||||
mqtt.HookBase
|
||||
key []byte
|
||||
dbQueue *dbWriteQueue
|
||||
stats *meshtasticMessageStats
|
||||
blocking *blockingCache
|
||||
settings *runtimeSettingsCache
|
||||
key []byte
|
||||
dbQueue *dbWriteQueue
|
||||
stats *meshtasticMessageStats
|
||||
blocking *blockingCache
|
||||
settings *runtimeSettingsCache
|
||||
pkiResolver func(toNodeNum, fromNodeNum uint32) ([]byte, []byte, bool)
|
||||
}
|
||||
|
||||
// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。
|
||||
@@ -64,7 +65,10 @@ func (h *meshtasticFilterHook) OnConnect(cl *mqtt.Client, pk packets.Packet) err
|
||||
|
||||
// OnPublish 在 broker 转发消息前校验 payload;无效消息会被拒绝并丢弃。
|
||||
func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
|
||||
valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key, mqtpp.Options{AllowEncryptedForwarding: h.settings.AllowEncryptedForwarding()})
|
||||
valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key, mqtpp.Options{
|
||||
AllowEncryptedForwarding: h.settings.AllowEncryptedForwarding(),
|
||||
PKIKeyResolver: h.pkiResolver,
|
||||
})
|
||||
if !valid {
|
||||
h.rejectPublish(cl, pk, record)
|
||||
return pk, packets.ErrRejectPacket
|
||||
@@ -221,7 +225,7 @@ func run(cfg *config) error {
|
||||
}
|
||||
|
||||
messageStats := &meshtasticMessageStats{}
|
||||
server, mqttAddr, err := startMQTTServer(cfg, dbQueue, messageStats, blocking, settings)
|
||||
server, mqttAddr, err := startMQTTServer(cfg, store, dbQueue, messageStats, blocking, settings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -285,12 +289,20 @@ func run(cfg *config) error {
|
||||
return runErr
|
||||
}
|
||||
|
||||
func startMQTTServer(cfg *config, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, string, error) {
|
||||
func startMQTTServer(cfg *config, store *store, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, string, error) {
|
||||
server := mqtt.New(&mqtt.Options{InlineClient: true})
|
||||
if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, dbQueue: dbQueue, stats: stats, blocking: blocking, settings: settings}, nil); err != nil {
|
||||
hook := &meshtasticFilterHook{
|
||||
key: cfg.key,
|
||||
dbQueue: dbQueue,
|
||||
stats: stats,
|
||||
blocking: blocking,
|
||||
settings: settings,
|
||||
pkiResolver: newPKIKeyResolver(store),
|
||||
}
|
||||
if err := server.AddHook(hook, nil); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user