diff --git a/admin_bot_routes.go b/admin_bot_routes.go new file mode 100644 index 0000000..c08c8b2 --- /dev/null +++ b/admin_bot_routes.go @@ -0,0 +1,174 @@ +package main + +import ( + "errors" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +type botNodeRequest struct { + NodeNum *int64 `json:"node_num"` + LongName string `json:"long_name"` + ShortName string `json:"short_name"` + Enabled bool `json:"enabled"` + DefaultChannelID string `json:"default_channel_id"` + TopicPrefix string `json:"topic_prefix"` +} + +type botSendMessageRequest struct { + BotID uint64 `json:"bot_id"` + MessageType string `json:"message_type"` + ChannelID string `json:"channel_id"` + ToNodeID string `json:"to_node_id"` + ToNodeNum *int64 `json:"to_node_num"` + Text string `json:"text"` +} + +func registerAdminBotRoutes(r gin.IRouter, store *store, sender botTextSender) { + r.GET("/bot/nodes", func(c *gin.Context) { + opts, ok := parseListOptions(c) + if !ok { + return + } + rows, err := store.ListBotNodes(opts) + if err != nil { + writeListResponse(c, rows, opts, err, botNodeDTO) + return + } + total, err := store.CountBotNodes(opts) + writeListResponseWithTotal(c, rows, opts, total, err, botNodeDTO) + }) + r.POST("/bot/nodes", func(c *gin.Context) { + var req botNodeRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid bot node request"}) + return + } + row, err := store.CreateBotNode(botNodeInputFromRequest(req)) + writeBotNodeMutationResponse(c, http.StatusCreated, row, err) + }) + r.PUT("/bot/nodes/:id", func(c *gin.Context) { + id, ok := parseBotID(c, "invalid bot node id") + if !ok { + return + } + var req botNodeRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid bot node request"}) + return + } + row, err := store.UpdateBotNode(id, botNodeInputFromRequest(req)) + writeBotNodeMutationResponse(c, http.StatusOK, row, err) + }) + r.DELETE("/bot/nodes/:id", func(c *gin.Context) { + id, ok := parseBotID(c, "invalid bot node id") + if !ok { + return + } + if err := store.DeleteBotNode(id); errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "bot node not found"}) + return + } else if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) + r.GET("/bot/messages", func(c *gin.Context) { + opts, ok := parseBotMessageListOptions(c) + if !ok { + return + } + rows, err := store.ListBotMessages(opts) + if err != nil { + writeListResponse(c, rows, opts.listOptions, err, botMessageDTO) + return + } + total, err := store.CountBotMessages(opts) + writeListResponseWithTotal(c, rows, opts.listOptions, total, err, botMessageDTO) + }) + r.POST("/bot/messages", func(c *gin.Context) { + if sender == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "bot sender is not configured"}) + return + } + var req botSendMessageRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid bot message request"}) + return + } + claims := c.MustGet("admin_claims").(*sessionClaims) + row, err := sender.SendText(c.Request.Context(), botSendTextRequest{BotID: req.BotID, MessageType: req.MessageType, ChannelID: req.ChannelID, ToNodeID: req.ToNodeID, ToNodeNum: req.ToNodeNum, Text: req.Text, CreatedBy: claims.Username}) + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "bot node not found"}) + return + } + if err != nil { + status := http.StatusBadRequest + if row != nil && row.ID != 0 { + c.JSON(http.StatusAccepted, gin.H{"item": botMessageDTO(*row), "error": err.Error()}) + return + } + c.JSON(status, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusCreated, gin.H{"item": botMessageDTO(*row)}) + }) +} + +func botNodeInputFromRequest(req botNodeRequest) botNodeInput { + return botNodeInput{NodeNum: req.NodeNum, LongName: req.LongName, ShortName: req.ShortName, Enabled: req.Enabled, DefaultChannelID: req.DefaultChannelID, TopicPrefix: req.TopicPrefix} +} + +func parseBotID(c *gin.Context, message string) (uint64, bool) { + id, err := strconv.ParseUint(c.Param("id"), 10, 64) + if err != nil || id == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": message}) + return 0, false + } + return id, true +} + +func parseBotMessageListOptions(c *gin.Context) (botMessageListOptions, bool) { + listOpts, ok := parseListOptions(c) + if !ok { + return botMessageListOptions{}, false + } + opts := botMessageListOptions{listOptions: listOpts, MessageType: c.Query("message_type"), ChannelID: c.Query("channel_id")} + if value := c.Query("bot_id"); value != "" { + id, err := strconv.ParseUint(value, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid bot id"}) + return botMessageListOptions{}, false + } + opts.BotID = id + } + return opts, true +} + +func writeBotNodeMutationResponse(c *gin.Context, status int, row *botNodeRecord, err error) { + if errors.Is(err, errBotNodeAlreadyExists) { + c.JSON(http.StatusConflict, gin.H{"error": "bot node already exists or conflicts with existing node"}) + return + } + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "bot node not found"}) + return + } + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + c.JSON(status, gin.H{"item": botNodeDTO(*row)}) +} + +func botNodeDTO(row botNodeRecord) gin.H { + return gin.H{"id": row.ID, "node_id": row.NodeID, "node_num": row.NodeNum, "long_name": row.LongName, "short_name": row.ShortName, "enabled": row.Enabled, "default_channel_id": row.DefaultChannelID, "topic_prefix": row.TopicPrefix, "created_at": row.CreatedAt, "updated_at": row.UpdatedAt} +} + +func botMessageDTO(row botMessageRecord) gin.H { + return gin.H{"id": row.ID, "bot_id": row.BotID, "bot_node_id": row.BotNodeID, "bot_node_num": row.BotNodeNum, "message_type": row.MessageType, "channel_id": row.ChannelID, "to_node_id": row.ToNodeID, "to_node_num": row.ToNodeNum, "topic": row.Topic, "packet_id": row.PacketID, "text": row.Text, "payload_len": row.PayloadLen, "encrypted": row.Encrypted, "status": row.Status, "error": row.Error, "published_at": row.PublishedAt, "created_by": row.CreatedBy, "created_at": row.CreatedAt} +} diff --git a/bot_service.go b/bot_service.go new file mode 100644 index 0000000..1f060fa --- /dev/null +++ b/bot_service.go @@ -0,0 +1,196 @@ +package main + +import ( + "context" + "crypto/rand" + "encoding/binary" + "fmt" + "strings" + "time" + "unicode/utf8" + + "meshtastic_mqtt_server/mqtpp" + + mqtt "github.com/mochi-mqtt/server/v2" +) + +const botMaxTextBytes = 200 + +type botSendTextRequest struct { + BotID uint64 + MessageType string + ChannelID string + ToNodeID string + ToNodeNum *int64 + Text string + CreatedBy string +} + +type botTextSender interface { + SendText(ctx context.Context, req botSendTextRequest) (*botMessageRecord, error) +} + +type botService struct { + store *store + server *mqtt.Server + key []byte +} + +func newBotService(store *store, server *mqtt.Server, key []byte) *botService { + return &botService{store: store, server: server, key: key} +} + +func (s *botService) SendText(_ context.Context, req botSendTextRequest) (*botMessageRecord, error) { + if s == nil || s.store == nil { + return nil, fmt.Errorf("bot service is not configured") + } + bot, err := s.store.GetBotNode(req.BotID) + if err != nil { + return nil, err + } + if !bot.Enabled { + return nil, fmt.Errorf("bot node is disabled") + } + messageType, err := normalizeBotMessageType(req.MessageType) + if err != nil { + return nil, err + } + text := strings.TrimSpace(req.Text) + if text == "" { + return nil, fmt.Errorf("text is required") + } + if !utf8.ValidString(text) { + return nil, fmt.Errorf("text must be valid utf-8") + } + if len([]byte(text)) > botMaxTextBytes { + return nil, fmt.Errorf("text is too long, max %d bytes", botMaxTextBytes) + } + channelID := strings.TrimSpace(req.ChannelID) + if channelID == "" { + channelID = bot.DefaultChannelID + } + if channelID == "" { + return nil, fmt.Errorf("channel id is required") + } + toNodeNum, toNodeID, err := botMessageTarget(messageType, req) + if err != nil { + return nil, err + } + packetID, err := randomPacketID() + if err != nil { + return nil, err + } + fromNodeNum := uint32(bot.NodeNum) + raw, err := mqtpp.BuildTextMessageServiceEnvelope(mqtpp.TextMessageBuildOptions{ + FromNodeNum: fromNodeNum, + ToNodeNum: uint32(toNodeNum), + PacketID: packetID, + ChannelID: channelID, + GatewayID: bot.NodeID, + Text: text, + PSK: s.key, + Encrypt: true, + ViaMQTT: true, + }) + if err != nil { + return nil, err + } + topic := strings.Trim(bot.TopicPrefix, "/") + "/" + channelID + "/" + bot.NodeID + row := &botMessageRecord{ + BotID: bot.ID, + BotNodeID: bot.NodeID, + BotNodeNum: bot.NodeNum, + MessageType: messageType, + ChannelID: channelID, + ToNodeID: toNodeID, + ToNodeNum: int64PtrOrNil(toNodeNum, messageType == botMessageTypeDirect), + Topic: topic, + PacketID: int64(packetID), + Text: text, + PayloadLen: int64(len(raw)), + Encrypted: true, + Status: botMessageStatusPending, + CreatedBy: strings.TrimSpace(req.CreatedBy), + } + if err := s.store.InsertBotMessage(row); err != nil { + return nil, err + } + if s.server == nil { + _ = s.store.UpdateBotMessageStatus(row.ID, botMessageStatusFailed, "mqtt server is not configured", nil) + row.Status = botMessageStatusFailed + row.Error = "mqtt server is not configured" + return row, fmt.Errorf("mqtt server is not configured") + } + if err := s.server.Publish(topic, raw, false, 0); err != nil { + _ = s.store.UpdateBotMessageStatus(row.ID, botMessageStatusFailed, err.Error(), nil) + row.Status = botMessageStatusFailed + row.Error = err.Error() + return row, err + } + now := time.Now() + if err := s.store.UpdateBotMessageStatus(row.ID, botMessageStatusPublished, "", &now); err != nil { + return nil, err + } + row.Status = botMessageStatusPublished + row.Error = "" + row.PublishedAt = &now + return row, nil +} + +func normalizeBotMessageType(value string) (string, error) { + switch strings.TrimSpace(value) { + case "", botMessageTypeChannel: + return botMessageTypeChannel, nil + case botMessageTypeDirect: + return botMessageTypeDirect, nil + default: + return "", fmt.Errorf("message type must be channel or direct") + } +} + +func botMessageTarget(messageType string, req botSendTextRequest) (int64, *string, error) { + if messageType == botMessageTypeChannel { + return int64(mqtpp.NodeNumBroadcast), nil, nil + } + if req.ToNodeNum != nil && *req.ToNodeNum > 0 { + if err := validateBotNodeNum(*req.ToNodeNum); err != nil { + return 0, nil, err + } + nodeID := mqtpp.NodeNumToID(uint32(*req.ToNodeNum)) + return *req.ToNodeNum, &nodeID, nil + } + toNodeID := strings.TrimSpace(req.ToNodeID) + if toNodeID == "" { + return 0, nil, fmt.Errorf("target node is required for direct message") + } + nodeNum, err := mqtpp.ParseNodeID(toNodeID) + if err != nil { + return 0, nil, err + } + if err := validateBotNodeNum(int64(nodeNum)); err != nil { + return 0, nil, err + } + normalized := mqtpp.NodeNumToID(nodeNum) + return int64(nodeNum), &normalized, nil +} + +func randomPacketID() (uint32, error) { + for i := 0; i < 8; i++ { + var buf [4]byte + if _, err := rand.Read(buf[:]); err != nil { + return 0, err + } + id := binary.LittleEndian.Uint32(buf[:]) + if id != 0 { + return id, nil + } + } + return 0, fmt.Errorf("generate packet id failed") +} + +func int64PtrOrNil(value int64, ok bool) *int64 { + if !ok { + return nil + } + return &value +} diff --git a/bot_store.go b/bot_store.go new file mode 100644 index 0000000..67070d8 --- /dev/null +++ b/bot_store.go @@ -0,0 +1,280 @@ +package main + +import ( + "crypto/rand" + "encoding/binary" + "errors" + "fmt" + "strings" + "time" + "unicode/utf8" + + "meshtastic_mqtt_server/mqtpp" + + "gorm.io/gorm" +) + +const ( + botDefaultTopicPrefix = "msh/2/e" + botMessageTypeChannel = "channel" + botMessageTypeDirect = "direct" + botMessageStatusPending = "pending" + botMessageStatusPublished = "published" + botMessageStatusFailed = "failed" +) + +var errBotNodeAlreadyExists = errors.New("bot node already exists") + +type botNodeInput struct { + NodeNum *int64 + LongName string + ShortName string + Enabled bool + DefaultChannelID string + TopicPrefix string +} + +type botMessageListOptions struct { + listOptions + BotID uint64 + MessageType string + ChannelID string +} + +func (s *store) ListBotNodes(opts listOptions) ([]botNodeRecord, error) { + opts = normalizeListOptions(opts) + var rows []botNodeRecord + q := s.db.Model(&botNodeRecord{}). + Order("updated_at DESC"). + Order("id DESC"). + Limit(opts.Limit). + Offset(opts.Offset) + return rows, q.Find(&rows).Error +} + +func (s *store) CountBotNodes(opts listOptions) (int64, error) { + var total int64 + return total, s.db.Model(&botNodeRecord{}).Count(&total).Error +} + +func (s *store) GetBotNode(id uint64) (*botNodeRecord, error) { + var row botNodeRecord + if err := s.db.Where("id = ?", id).Take(&row).Error; err != nil { + return nil, err + } + return &row, nil +} + +func (s *store) CreateBotNode(input botNodeInput) (*botNodeRecord, error) { + row, err := s.normalizedBotNodeRecord(input) + if err != nil { + return nil, err + } + if err := s.ensureBotNodeUnique(0, row.NodeID, row.NodeNum); err != nil { + return nil, err + } + if err := s.ensureBotNodeDoesNotConflictWithNodeInfo(row.NodeNum); err != nil { + return nil, err + } + if err := s.db.Create(row).Error; err != nil { + return nil, err + } + return row, nil +} + +func (s *store) UpdateBotNode(id uint64, input botNodeInput) (*botNodeRecord, error) { + if id == 0 { + return nil, fmt.Errorf("bot node id is required") + } + if _, err := s.GetBotNode(id); err != nil { + return nil, err + } + row, err := s.normalizedBotNodeRecord(input) + if err != nil { + return nil, err + } + if err := s.ensureBotNodeUnique(id, row.NodeID, row.NodeNum); err != nil { + return nil, err + } + if err := s.ensureBotNodeDoesNotConflictWithNodeInfo(row.NodeNum); err != nil { + return nil, err + } + updates := map[string]any{ + "node_id": row.NodeID, + "node_num": row.NodeNum, + "long_name": row.LongName, + "short_name": row.ShortName, + "enabled": row.Enabled, + "default_channel_id": row.DefaultChannelID, + "topic_prefix": row.TopicPrefix, + "updated_at": time.Now(), + } + if err := s.db.Model(&botNodeRecord{}).Where("id = ?", id).Updates(updates).Error; err != nil { + return nil, err + } + return s.GetBotNode(id) +} + +func (s *store) DeleteBotNode(id uint64) error { + result := s.db.Where("id = ?", id).Delete(&botNodeRecord{}) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +func (s *store) InsertBotMessage(row *botMessageRecord) error { + return s.db.Create(row).Error +} + +func (s *store) UpdateBotMessageStatus(id uint64, status, errText string, publishedAt *time.Time) error { + updates := map[string]any{"status": status, "error": strings.TrimSpace(errText), "published_at": publishedAt} + result := s.db.Model(&botMessageRecord{}).Where("id = ?", id).Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +func (s *store) ListBotMessages(opts botMessageListOptions) ([]botMessageRecord, error) { + opts.listOptions = normalizeListOptions(opts.listOptions) + var rows []botMessageRecord + q := applyBotMessageFilters(s.db.Model(&botMessageRecord{}), opts). + Order("created_at DESC"). + Order("id DESC"). + Limit(opts.Limit). + Offset(opts.Offset) + return rows, q.Find(&rows).Error +} + +func (s *store) CountBotMessages(opts botMessageListOptions) (int64, error) { + var total int64 + q := applyBotMessageFilters(s.db.Model(&botMessageRecord{}), opts) + return total, q.Count(&total).Error +} + +func applyBotMessageFilters(q *gorm.DB, opts botMessageListOptions) *gorm.DB { + if opts.BotID != 0 { + q = q.Where("bot_id = ?", opts.BotID) + } + if opts.MessageType != "" { + q = q.Where("message_type = ?", opts.MessageType) + } + if opts.ChannelID != "" { + q = q.Where("channel_id = ?", opts.ChannelID) + } + if opts.Since != nil { + q = q.Where("created_at >= ?", *opts.Since) + } + if opts.Until != nil { + q = q.Where("created_at <= ?", *opts.Until) + } + return q +} + +func (s *store) normalizedBotNodeRecord(input botNodeInput) (*botNodeRecord, error) { + longName := strings.TrimSpace(input.LongName) + shortName := strings.TrimSpace(input.ShortName) + channelID := strings.TrimSpace(input.DefaultChannelID) + topicPrefix := strings.Trim(strings.TrimSpace(input.TopicPrefix), "/") + if topicPrefix == "" { + topicPrefix = botDefaultTopicPrefix + } + if longName == "" { + return nil, fmt.Errorf("long name is required") + } + if !utf8.ValidString(longName) { + return nil, fmt.Errorf("long name must be valid utf-8") + } + if shortName == "" { + return nil, fmt.Errorf("short name is required") + } + if !utf8.ValidString(shortName) { + return nil, fmt.Errorf("short name must be valid utf-8") + } + if channelID == "" { + return nil, fmt.Errorf("default channel id is required") + } + var nodeNum int64 + if input.NodeNum == nil || *input.NodeNum == 0 { + generated, err := s.generateBotNodeNum() + if err != nil { + return nil, err + } + nodeNum = generated + } else { + nodeNum = *input.NodeNum + } + if err := validateBotNodeNum(nodeNum); err != nil { + return nil, err + } + return &botNodeRecord{NodeID: mqtpp.NodeNumToID(uint32(nodeNum)), NodeNum: nodeNum, LongName: longName, ShortName: shortName, Enabled: input.Enabled, DefaultChannelID: channelID, TopicPrefix: topicPrefix}, nil +} + +func validateBotNodeNum(nodeNum int64) error { + if nodeNum <= 0 || nodeNum >= int64(mqtpp.NodeNumBroadcast) { + return fmt.Errorf("node num must be between 1 and 4294967294") + } + return nil +} + +func (s *store) generateBotNodeNum() (int64, error) { + for i := 0; i < 32; i++ { + var buf [4]byte + if _, err := rand.Read(buf[:]); err != nil { + return 0, err + } + nodeNum := int64(binary.LittleEndian.Uint32(buf[:]) & 0x7fffffff) + if err := validateBotNodeNum(nodeNum); err != nil { + continue + } + if err := s.ensureBotNodeUnique(0, mqtpp.NodeNumToID(uint32(nodeNum)), nodeNum); err != nil { + if errors.Is(err, errBotNodeAlreadyExists) { + continue + } + return 0, err + } + if err := s.ensureBotNodeDoesNotConflictWithNodeInfo(nodeNum); err != nil { + if errors.Is(err, errBotNodeAlreadyExists) { + continue + } + return 0, err + } + return nodeNum, nil + } + return 0, fmt.Errorf("generate bot node num failed") +} + +func (s *store) ensureBotNodeUnique(id uint64, nodeID string, nodeNum int64) error { + var existing botNodeRecord + q := s.db.Where("node_id = ? OR node_num = ?", nodeID, nodeNum) + if id != 0 { + q = q.Where("id <> ?", id) + } + err := q.Take(&existing).Error + if err == nil { + return errBotNodeAlreadyExists + } + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err +} + +func (s *store) ensureBotNodeDoesNotConflictWithNodeInfo(nodeNum int64) error { + var existing nodeInfoRecord + err := s.db.Where("node_num = ?", nodeNum).Take(&existing).Error + if err == nil { + return errBotNodeAlreadyExists + } + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err +} diff --git a/db.go b/db.go index 705e9d5..890f4b4 100644 --- a/db.go +++ b/db.go @@ -219,6 +219,49 @@ func (mqttForwardTopicRecord) TableName() string { return "mqtt_forward_topics" } +type botNodeRecord struct { + ID uint64 `gorm:"column:id;primaryKey;autoIncrement"` + NodeID string `gorm:"column:node_id;not null;uniqueIndex"` + NodeNum int64 `gorm:"column:node_num;not null;uniqueIndex"` + LongName string `gorm:"column:long_name;not null"` + ShortName string `gorm:"column:short_name;not null"` + Enabled bool `gorm:"column:enabled;not null;index"` + DefaultChannelID string `gorm:"column:default_channel_id;not null;index"` + TopicPrefix string `gorm:"column:topic_prefix;not null"` + LastPacketID int64 `gorm:"column:last_packet_id;not null"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime"` + UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime;index"` +} + +func (botNodeRecord) TableName() string { + return "bot_nodes" +} + +type botMessageRecord struct { + ID uint64 `gorm:"column:id;primaryKey;autoIncrement"` + BotID uint64 `gorm:"column:bot_id;not null;index:idx_bot_message_bot_created_at,priority:1"` + BotNodeID string `gorm:"column:bot_node_id;not null;index"` + BotNodeNum int64 `gorm:"column:bot_node_num;not null;index"` + MessageType string `gorm:"column:message_type;not null;index"` + ChannelID string `gorm:"column:channel_id;not null;index"` + ToNodeID *string `gorm:"column:to_node_id;index"` + ToNodeNum *int64 `gorm:"column:to_node_num;index"` + Topic string `gorm:"column:topic;not null"` + PacketID int64 `gorm:"column:packet_id;not null;index"` + Text string `gorm:"column:text;type:text;not null"` + PayloadLen int64 `gorm:"column:payload_len;not null"` + Encrypted bool `gorm:"column:encrypted;not null;index"` + Status string `gorm:"column:status;not null;index"` + Error string `gorm:"column:error;type:text"` + PublishedAt *time.Time `gorm:"column:published_at;index"` + CreatedBy string `gorm:"column:created_by;index"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index:idx_bot_message_bot_created_at,priority:2"` +} + +func (botMessageRecord) TableName() string { + return "bot_messages" +} + type nodeInfoRecord struct { NodeID string `gorm:"column:node_id;primaryKey;not null"` NodeNum int64 `gorm:"column:node_num;not null;index"` @@ -421,6 +464,8 @@ func (s *store) migrate() error { {label: "forbidden_word_blocking", model: &forbiddenWordBlockingRecord{}}, {label: "mqtt_forwarders", model: &mqttForwarderRecord{}}, {label: "mqtt_forward_topics", model: &mqttForwardTopicRecord{}}, + {label: "bot_nodes", model: &botNodeRecord{}}, + {label: "bot_messages", model: &botMessageRecord{}}, {label: "nodeinfo", model: &nodeInfoRecord{}}, {label: "map_report", model: &mapReportRecord{}}, {label: "text_message", model: &textMessageRecord{}}, diff --git a/main.go b/main.go index 9ce5b51..3ff7752 100644 --- a/main.go +++ b/main.go @@ -224,6 +224,7 @@ func run(cfg *config) error { if err != nil { return err } + botSender := newBotService(store, server, cfg.key) forwardManager := newMQTTForwardManager(store) if err := forwardManager.StartFromStore(); err != nil { server.Close() @@ -239,7 +240,7 @@ func run(cfg *config) error { return err } mqttStatus := mqttRuntimeStatus{server: server, address: mqttAddr, tls: cfg.MQTT.TLS.Enabled, stats: messageStats, dbQueue: dbQueue} - httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus, blocking, forwardManager, settings) + httpServer = newHTTPServer(cfg.Web, store, sessions, mqttStatus, blocking, forwardManager, settings, botSender) webAddress := httpServer.Addr go func() { if cfg.Web.SocketPath != "" { @@ -281,7 +282,7 @@ func run(cfg *config) error { } func startMQTTServer(cfg *config, dbQueue *dbWriteQueue, stats *meshtasticMessageStats, blocking *blockingCache, settings *runtimeSettingsCache) (*mqtt.Server, string, error) { - server := mqtt.New(nil) + server := mqtt.New(&mqtt.Options{InlineClient: true}) if err := server.AddHook(new(auth.AllowHook), nil); err != nil { return nil, "", err } diff --git a/meshmap_frontend/src/App.vue b/meshmap_frontend/src/App.vue index 9403c7e..35aab7c 100644 --- a/meshmap_frontend/src/App.vue +++ b/meshmap_frontend/src/App.vue @@ -2,6 +2,7 @@ import { computed, onBeforeUnmount, onMounted, ref } from 'vue' import { adminLogout, createNodeBlockingRule, deleteNode, deleteTextMessage, getAdminMe, getHealth, getMapReportViewport, getNodeInfo, getPositions, getTextMessages } from './api' import AdminBlockingManagement from './components/AdminBlockingManagement.vue' +import AdminBot from './components/AdminBot.vue' import AdminDashboard from './components/AdminDashboard.vue' import AdminDiscardDetails from './components/AdminDiscardDetails.vue' import AdminHelpEdit from './components/AdminHelpEdit.vue' @@ -21,6 +22,7 @@ const currentPath = window.location.pathname const adminPath = currentPath const isAdminPage = adminPath.startsWith('/admin') const isMqttForwardAdminPage = adminPath === '/admin/mqtt_forward' || adminPath === '/admin/mqtt_forward/' +const isBotAdminPage = adminPath === '/admin/bot' || adminPath === '/admin/bot/' const detailMatch = currentPath.match(/^\/detailed\/(.+)$/) const detailedNodeId = detailMatch ? decodeURIComponent(detailMatch[1]) : '' const isDetailedPage = !!detailedNodeId @@ -493,6 +495,7 @@ onBeforeUnmount(() => { 用户管理 屏蔽管理 MQTT转发 + 机器人 帮助编辑 登录日志 丢弃数据 @@ -532,6 +535,7 @@ onBeforeUnmount(() => { + diff --git a/meshmap_frontend/src/api.ts b/meshmap_frontend/src/api.ts index cc4f425..5f48c57 100644 --- a/meshmap_frontend/src/api.ts +++ b/meshmap_frontend/src/api.ts @@ -6,6 +6,12 @@ import type { AdminRuntimeSettingsPayload, AdminRuntimeSettingsResponse, AdminUsersResponse, + BotMessage, + BotMessageMutationResponse, + BotNode, + BotNodeMutationResponse, + BotNodePayload, + BotSendMessagePayload, BlockingRuleResponse, DiscardDetails, ForbiddenWordBlockingRule, @@ -288,3 +294,31 @@ export function deleteMQTTForwardTopic(id: number): Promise<{ status: string }> export function getMQTTForwardStatus(): Promise { return getJSON('/api/admin/mqtt-forward/status') } + +export function getBotNodes(limit = 100, offset = 0): Promise> { + return getJSON>(listPath('/api/admin/bot/nodes', limit, offset)) +} + +export function createBotNode(payload: BotNodePayload): Promise { + return postJSON('/api/admin/bot/nodes', payload) +} + +export function updateBotNode(id: number, payload: BotNodePayload): Promise { + return putJSON(`/api/admin/bot/nodes/${id}`, payload) +} + +export function deleteBotNode(id: number): Promise<{ status: string }> { + return deleteJSON<{ status: string }>(`/api/admin/bot/nodes/${id}`) +} + +export function getBotMessages(botId = 0, limit = 100, offset = 0): Promise> { + const params = new URLSearchParams({ limit: String(limit), offset: String(offset) }) + if (botId > 0) { + params.set('bot_id', String(botId)) + } + return getJSON>(`/api/admin/bot/messages?${params.toString()}`) +} + +export function sendBotMessage(payload: BotSendMessagePayload): Promise { + return postJSON('/api/admin/bot/messages', payload) +} diff --git a/meshmap_frontend/src/components/AdminBot.vue b/meshmap_frontend/src/components/AdminBot.vue new file mode 100644 index 0000000..60a7638 --- /dev/null +++ b/meshmap_frontend/src/components/AdminBot.vue @@ -0,0 +1,461 @@ + + + + + diff --git a/meshmap_frontend/src/types.ts b/meshmap_frontend/src/types.ts index aa842d6..6d2e066 100644 --- a/meshmap_frontend/src/types.ts +++ b/meshmap_frontend/src/types.ts @@ -401,3 +401,67 @@ export interface MQTTForwardMutationResponse { export interface MQTTForwardStatusResponse { items: MQTTForwardRuntimeStatus[] } + +export type BotMessageType = 'channel' | 'direct' +export type BotMessageStatus = 'pending' | 'published' | 'failed' + +export interface BotNode { + id: number + node_id: string + node_num: number + long_name: string + short_name: string + enabled: boolean + default_channel_id: string + topic_prefix: string + created_at: string + updated_at: string +} + +export interface BotNodePayload { + node_num?: number | null + long_name: string + short_name: string + enabled: boolean + default_channel_id: string + topic_prefix?: string +} + +export interface BotNodeMutationResponse { + item: BotNode +} + +export interface BotMessage { + id: number + bot_id: number + bot_node_id: string + bot_node_num: number + message_type: BotMessageType + channel_id: string + to_node_id: string | null + to_node_num: number | null + topic: string + packet_id: number + text: string + payload_len: number + encrypted: boolean + status: BotMessageStatus + error: string + published_at: string | null + created_by: string + created_at: string +} + +export interface BotSendMessagePayload { + bot_id: number + message_type: BotMessageType + channel_id: string + to_node_id?: string + to_node_num?: number | null + text: string +} + +export interface BotMessageMutationResponse { + item: BotMessage + error?: string +} diff --git a/mqtpp/builder.go b/mqtpp/builder.go new file mode 100644 index 0000000..418f15d --- /dev/null +++ b/mqtpp/builder.go @@ -0,0 +1,125 @@ +package mqtpp + +import ( + "fmt" + "strconv" + "strings" + "unicode/utf8" + + "google.golang.org/protobuf/encoding/protowire" +) + +const NodeNumBroadcast uint32 = 0xffffffff + +type TextMessageBuildOptions struct { + FromNodeNum uint32 + ToNodeNum uint32 + PacketID uint32 + ChannelID string + GatewayID string + Text string + PSK []byte + Encrypt bool + ViaMQTT bool +} + +func BuildTextMessageServiceEnvelope(opts TextMessageBuildOptions) ([]byte, error) { + if opts.FromNodeNum == 0 { + return nil, fmt.Errorf("from node number is required") + } + if opts.PacketID == 0 { + return nil, fmt.Errorf("packet id is required") + } + if opts.ChannelID == "" { + return nil, fmt.Errorf("channel id is required") + } + if strings.TrimSpace(opts.GatewayID) == "" { + opts.GatewayID = NodeNumToID(opts.FromNodeNum) + } + if opts.Text == "" { + return nil, fmt.Errorf("text is required") + } + if !utf8.ValidString(opts.Text) { + return nil, fmt.Errorf("text must be valid utf-8") + } + + data := buildDataPacket(textMessageApp, []byte(opts.Text)) + packet, err := buildMeshPacket(opts, data) + if err != nil { + return nil, err + } + return buildServiceEnvelope(packet, opts.ChannelID, opts.GatewayID), nil +} + +func NodeNumToID(nodeNum uint32) string { + return nodeNumToID(nodeNum) +} + +func ParseNodeID(nodeID string) (uint32, error) { + value := strings.TrimSpace(nodeID) + if value == "" { + return 0, fmt.Errorf("node id is required") + } + value = strings.TrimPrefix(value, "!") + if len(value) != 8 { + return 0, fmt.Errorf("node id must be !xxxxxxxx") + } + num, err := strconv.ParseUint(value, 16, 32) + if err != nil { + return 0, fmt.Errorf("invalid node id: %w", err) + } + return uint32(num), nil +} + +func buildDataPacket(portnum uint32, payload []byte) []byte { + var out []byte + out = protowire.AppendTag(out, 1, protowire.VarintType) + out = protowire.AppendVarint(out, uint64(portnum)) + out = protowire.AppendTag(out, 2, protowire.BytesType) + out = protowire.AppendBytes(out, payload) + return out +} + +func buildMeshPacket(opts TextMessageBuildOptions, data []byte) ([]byte, error) { + var out []byte + out = protowire.AppendTag(out, 1, protowire.Fixed32Type) + out = protowire.AppendFixed32(out, opts.FromNodeNum) + out = protowire.AppendTag(out, 2, protowire.Fixed32Type) + out = protowire.AppendFixed32(out, opts.ToNodeNum) + + if opts.Encrypt { + if len(opts.PSK) == 0 { + return nil, fmt.Errorf("psk is required for encrypted text message") + } + ciphertext, err := cryptAESCTR(opts.PSK, opts.FromNodeNum, opts.PacketID, data) + if err != nil { + return nil, err + } + out = protowire.AppendTag(out, 3, protowire.VarintType) + out = protowire.AppendVarint(out, uint64(channelHash(opts.ChannelID, opts.PSK))) + out = protowire.AppendTag(out, 5, protowire.BytesType) + out = protowire.AppendBytes(out, ciphertext) + } else { + out = protowire.AppendTag(out, 4, protowire.BytesType) + out = protowire.AppendBytes(out, data) + } + + out = protowire.AppendTag(out, 6, protowire.Fixed32Type) + out = protowire.AppendFixed32(out, opts.PacketID) + if opts.ViaMQTT { + out = protowire.AppendTag(out, 14, protowire.VarintType) + out = protowire.AppendVarint(out, 1) + } + return out, nil +} + +func buildServiceEnvelope(packet []byte, channelID string, gatewayID string) []byte { + var out []byte + out = protowire.AppendTag(out, 1, protowire.BytesType) + out = protowire.AppendBytes(out, packet) + out = protowire.AppendTag(out, 2, protowire.BytesType) + out = protowire.AppendBytes(out, []byte(channelID)) + out = protowire.AppendTag(out, 3, protowire.BytesType) + out = protowire.AppendBytes(out, []byte(gatewayID)) + return out +} diff --git a/mqtpp/builder_test.go b/mqtpp/builder_test.go new file mode 100644 index 0000000..e7611b3 --- /dev/null +++ b/mqtpp/builder_test.go @@ -0,0 +1,94 @@ +package mqtpp + +import "testing" + +func TestBuildTextMessageServiceEnvelopeRoundTrip(t *testing.T) { + key, err := ExpandPSK("AQ==") + if err != nil { + t.Fatalf("ExpandPSK() error = %v", err) + } + + raw, err := BuildTextMessageServiceEnvelope(TextMessageBuildOptions{ + FromNodeNum: 0x12345678, + ToNodeNum: NodeNumBroadcast, + PacketID: 0x87654321, + ChannelID: "LongFast", + GatewayID: "!12345678", + Text: "hello from bot", + PSK: key, + Encrypt: true, + ViaMQTT: true, + }) + if err != nil { + t.Fatalf("BuildTextMessageServiceEnvelope() error = %v", err) + } + + valid, _, record := MQTTPP("msh/2/e/LongFast/!12345678", raw, key, Options{}) + if !valid { + t.Fatalf("MQTTPP() valid = false, record = %#v", record) + } + if record["type"] != "text_message" { + t.Fatalf("record type = %v", record["type"]) + } + if record["text"] != "hello from bot" { + t.Fatalf("text = %v", record["text"]) + } + if record["from_num"] != uint32(0x12345678) { + t.Fatalf("from_num = %v", record["from_num"]) + } + if record["packet_to_num"] != uint32(NodeNumBroadcast) { + t.Fatalf("packet_to_num = %v", record["packet_to_num"]) + } + if record["decrypt_success"] != true { + t.Fatalf("decrypt_success = %v", record["decrypt_success"]) + } +} + +func TestBuildTextMessageServiceEnvelopeDirectRoundTrip(t *testing.T) { + key, err := ExpandPSK("AQ==") + if err != nil { + t.Fatalf("ExpandPSK() error = %v", err) + } + + raw, err := BuildTextMessageServiceEnvelope(TextMessageBuildOptions{ + FromNodeNum: 0x12345678, + ToNodeNum: 0x10203040, + PacketID: 0x11111111, + ChannelID: "LongFast", + GatewayID: "!12345678", + Text: "direct hello", + PSK: key, + Encrypt: true, + ViaMQTT: true, + }) + if err != nil { + t.Fatalf("BuildTextMessageServiceEnvelope() error = %v", err) + } + + valid, _, record := MQTTPP("msh/2/e/LongFast/!12345678", raw, key, Options{}) + if !valid { + t.Fatalf("MQTTPP() valid = false, record = %#v", record) + } + if record["text"] != "direct hello" { + t.Fatalf("text = %v", record["text"]) + } + if record["packet_to"] != "!10203040" { + t.Fatalf("packet_to = %v", record["packet_to"]) + } + if record["packet_to_num"] != uint32(0x10203040) { + t.Fatalf("packet_to_num = %v", record["packet_to_num"]) + } +} + +func TestParseNodeID(t *testing.T) { + num, err := ParseNodeID("!1234abcd") + if err != nil { + t.Fatalf("ParseNodeID() error = %v", err) + } + if num != 0x1234abcd { + t.Fatalf("num = %#x", num) + } + if NodeNumToID(num) != "!1234abcd" { + t.Fatalf("NodeNumToID() = %s", NodeNumToID(num)) + } +} diff --git a/mqtpp/mqtpp.go b/mqtpp/mqtpp.go index 95ea5c8..6256ab2 100644 --- a/mqtpp/mqtpp.go +++ b/mqtpp/mqtpp.go @@ -944,6 +944,11 @@ func channelHash(channelName string, key []byte) byte { // decryptAESCTR 按 Meshtastic nonce 规则使用 AES-CTR 解密 payload。 func decryptAESCTR(key []byte, fromNum, packetID uint32, ciphertext []byte) ([]byte, error) { + return cryptAESCTR(key, fromNum, packetID, ciphertext) +} + +// cryptAESCTR 按 Meshtastic nonce 规则执行 AES-CTR;CTR 加密和解密是同一个 XOR 流操作。 +func cryptAESCTR(key []byte, fromNum, packetID uint32, input []byte) ([]byte, error) { block, err := aes.NewCipher(key) if err != nil { return nil, err @@ -951,9 +956,9 @@ func decryptAESCTR(key []byte, fromNum, packetID uint32, ciphertext []byte) ([]b nonce := make([]byte, aes.BlockSize) binary.LittleEndian.PutUint64(nonce[0:8], uint64(packetID)) binary.LittleEndian.PutUint32(nonce[8:12], fromNum) - plaintext := make([]byte, len(ciphertext)) - cipher.NewCTR(block, nonce).XORKeyStream(plaintext, ciphertext) - return plaintext, nil + output := make([]byte, len(input)) + cipher.NewCTR(block, nonce).XORKeyStream(output, input) + return output, nil } // enumName 把已知枚举值转换成名称,未知值保留为数字。 diff --git a/web.go b/web.go index 9c90258..73cd6f3 100644 --- a/web.go +++ b/web.go @@ -14,10 +14,10 @@ import ( "gorm.io/gorm" ) -func newHTTPServer(cfg webConfig, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache) *http.Server { +func newHTTPServer(cfg webConfig, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache, botSender botTextSender) *http.Server { return &http.Server{ Addr: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)), - Handler: newRouter(cfg, store, sessions, mqttStatus, blocking, forwarder, settings), + Handler: newRouter(cfg, store, sessions, mqttStatus, blocking, forwarder, settings, botSender), } } @@ -47,12 +47,12 @@ func serveHTTPUnixSocket(server *http.Server, socketPath string) error { return server.Serve(listener) } -func newRouter(cfg webConfig, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache) *gin.Engine { +func newRouter(cfg webConfig, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache, botSender botTextSender) *gin.Engine { r := gin.New() r.Use(gin.Logger(), gin.Recovery()) api := r.Group("/api") registerAPIRoutes(api, store) - registerAdminRoutes(api.Group("/admin"), store, sessions, mqttStatus, blocking, forwarder, settings) + registerAdminRoutes(api.Group("/admin"), store, sessions, mqttStatus, blocking, forwarder, settings, botSender) registerStaticRoutes(r, cfg.StaticDir) return r } @@ -123,7 +123,7 @@ func registerAPIRoutes(r gin.IRouter, store *store) { }) } -func registerAdminRoutes(r gin.IRouter, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache) { +func registerAdminRoutes(r gin.IRouter, store *store, sessions *sessionManager, mqttStatus mqttStatusProvider, blocking *blockingCache, forwarder mqttForwardReloader, settings *runtimeSettingsCache, botSender botTextSender) { type loginRequest struct { Username string `json:"username"` Password string `json:"password"` @@ -187,6 +187,7 @@ func registerAdminRoutes(r gin.IRouter, store *store, sessions *sessionManager, registerAdminMQTTForwardRoutes(protected, store, forwarder) registerAdminRuntimeSettingsRoutes(protected, store, settings) registerAdminHelpRoutes(protected, store) + registerAdminBotRoutes(protected, store, botSender) protected.GET("/me", func(c *gin.Context) { claims := c.MustGet("admin_claims").(*sessionClaims) c.JSON(http.StatusOK, gin.H{"user": adminUserDTO{Username: claims.Username, Role: claims.Role}})