屏蔽词功能ok
This commit is contained in:
@@ -34,9 +34,10 @@ const (
|
||||
|
||||
type meshtasticFilterHook struct {
|
||||
mqtt.HookBase
|
||||
key []byte
|
||||
store *store
|
||||
stats *meshtasticMessageStats
|
||||
key []byte
|
||||
store *store
|
||||
stats *meshtasticMessageStats
|
||||
blocking *blockingCache
|
||||
}
|
||||
|
||||
// ID 返回用于识别 Meshtastic payload 过滤器的 hook 名称。
|
||||
@@ -46,19 +47,31 @@ func (h *meshtasticFilterHook) ID() string {
|
||||
|
||||
// Provides 声明该 hook 只处理客户端发布消息。
|
||||
func (h *meshtasticFilterHook) Provides(b byte) bool {
|
||||
return b == mqtt.OnPublish
|
||||
return b == mqtt.OnConnect || b == mqtt.OnPublish
|
||||
}
|
||||
|
||||
// OnConnect 在 MQTT 会话建立前拒绝命中 IP 屏蔽表的客户端。
|
||||
func (h *meshtasticFilterHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
||||
info := mqttClientInfoFromClient(cl)
|
||||
if h.blocking != nil && h.blocking.IsIPBlocked(info.RemoteHost) {
|
||||
printJSON(map[string]any{"event": "mqtt_client_rejected", "reason": "blocked_ip", "client_id": info.ClientID, "remote_addr": info.RemoteAddr, "remote_host": info.RemoteHost})
|
||||
return packets.ErrNotAuthorized
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnPublish 在 broker 转发消息前校验 payload;无效消息会被拒绝并丢弃。
|
||||
func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
|
||||
valid, _, record := mqtpp.MQTTPP(pk.TopicName, pk.Payload, h.key)
|
||||
if !valid {
|
||||
h.stats.IncDropped()
|
||||
if h.store != nil {
|
||||
if err := h.store.InsertDiscardDetails(record, pk.Payload, mqttClientInfoFromClient(cl)); err != nil {
|
||||
printJSON(map[string]any{"event": "db_error", "type": "discard_details", "topic": pk.TopicName, "error": err.Error()})
|
||||
}
|
||||
h.rejectPublish(cl, pk, record)
|
||||
return pk, packets.ErrRejectPacket
|
||||
}
|
||||
if violation := blockingViolationForRecord(h.blocking, record); violation != nil {
|
||||
for key, value := range violation {
|
||||
record[key] = value
|
||||
}
|
||||
h.rejectPublish(cl, pk, record)
|
||||
return pk, packets.ErrRejectPacket
|
||||
}
|
||||
h.stats.IncForwarded()
|
||||
@@ -113,6 +126,39 @@ func (h *meshtasticFilterHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (pa
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
func (h *meshtasticFilterHook) rejectPublish(cl *mqtt.Client, pk packets.Packet, record map[string]any) {
|
||||
if h.stats != nil {
|
||||
h.stats.IncDropped()
|
||||
}
|
||||
if h.store != nil {
|
||||
if err := h.store.InsertDiscardDetails(record, pk.Payload, mqttClientInfoFromClient(cl)); err != nil {
|
||||
printJSON(map[string]any{"event": "db_error", "type": "discard_details", "topic": pk.TopicName, "error": err.Error()})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func blockingViolationForRecord(blocking *blockingCache, record map[string]any) map[string]any {
|
||||
if blocking == nil || record == nil {
|
||||
return nil
|
||||
}
|
||||
if blocking.IsNodeBlocked(record["from"], record["from_num"]) {
|
||||
return map[string]any{"error": "blocked node", "blocking_type": "node"}
|
||||
}
|
||||
var field string
|
||||
switch record["type"] {
|
||||
case "text_message":
|
||||
field = "text"
|
||||
case "nodeinfo", "map_report":
|
||||
field = "long_name"
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
if word, ok := blocking.FindForbiddenWord(record[field]); ok {
|
||||
return map[string]any{"error": "forbidden word", "blocking_type": "forbidden_word", "blocking_field": field, "matched_word": word}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mqttClientInfoFromClient(cl *mqtt.Client) mqttClientInfo {
|
||||
if cl == nil {
|
||||
return mqttClientInfo{}
|
||||
@@ -201,8 +247,13 @@ func run(cfg *config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
blocking, err := newBlockingCache(store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
messageStats := &meshtasticMessageStats{}
|
||||
server, mqttAddr, err := startMQTTServer(cfg, store, messageStats)
|
||||
server, mqttAddr, err := startMQTTServer(cfg, store, messageStats, blocking)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -215,7 +266,7 @@ func run(cfg *config) error {
|
||||
return err
|
||||
}
|
||||
mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled, stats: messageStats}
|
||||
httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus)
|
||||
httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus, blocking)
|
||||
go func() {
|
||||
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
errCh <- err
|
||||
@@ -246,12 +297,12 @@ func run(cfg *config) error {
|
||||
return runErr
|
||||
}
|
||||
|
||||
func startMQTTServer(cfg *config, store *store, stats *meshtasticMessageStats) (*mqtt.Server, string, error) {
|
||||
func startMQTTServer(cfg *config, store *store, stats *meshtasticMessageStats, blocking *blockingCache) (*mqtt.Server, string, error) {
|
||||
server := mqtt.New(nil)
|
||||
if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store, stats: stats}, nil); err != nil {
|
||||
if err := server.AddHook(&meshtasticFilterHook{key: cfg.key, store: store, stats: stats, blocking: blocking}, nil); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user