diff --git a/admin_bot_routes.go b/admin_bot_routes.go index 443c8bf..e58e1cf 100644 --- a/admin_bot_routes.go +++ b/admin_bot_routes.go @@ -139,12 +139,10 @@ func registerAdminBotRoutes(r gin.IRouter, store *store, sender botTextSender) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid bot id"}) return } - bot, err := store.GetBotNode(botID) - if errors.Is(err, gorm.ErrRecordNotFound) { + if _, err := store.GetBotNode(botID); errors.Is(err, gorm.ErrRecordNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "bot node not found"}) return - } - if err != nil { + } else if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } @@ -153,8 +151,14 @@ func registerAdminBotRoutes(r gin.IRouter, store *store, sender botTextSender) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid target node num"}) return } - rows, err := store.ListBotDirectTextMessages(bot.NodeNum, target, opts) - writeListResponse(c, rows, opts, err, textMessageDTO) + dmOpts := botDirectMessageListOptions{listOptions: opts, BotID: botID, PeerNodeNum: target, Direction: c.Query("direction")} + rows, err := store.ListBotDirectMessagesByConversation(dmOpts) + if err != nil { + writeListResponse(c, rows, opts, err, botDirectMessageDTO) + return + } + total, err := store.CountBotDirectMessagesByConversation(dmOpts) + writeListResponseWithTotal(c, rows, opts, total, err, botDirectMessageDTO) }) r.POST("/bot/messages", func(c *gin.Context) { if sender == nil { @@ -238,3 +242,29 @@ func botNodeDTO(row botNodeRecord) gin.H { func botMessageDTO(row botMessageRecord) gin.H { return gin.H{"id": row.ID, "bot_id": row.BotID, "bot_node_id": row.BotNodeID, "bot_node_num": row.BotNodeNum, "message_type": row.MessageType, "channel_id": row.ChannelID, "to_node_id": row.ToNodeID, "to_node_num": row.ToNodeNum, "topic": row.Topic, "packet_id": row.PacketID, "text": row.Text, "payload_len": row.PayloadLen, "encrypted": row.Encrypted, "status": row.Status, "error": row.Error, "published_at": row.PublishedAt, "created_by": row.CreatedBy, "created_at": row.CreatedAt} } + +func botDirectMessageDTO(row botDirectMessageRecord) gin.H { + return gin.H{ + "id": row.ID, + "bot_id": row.BotID, + "bot_node_id": row.BotNodeID, + "bot_node_num": row.BotNodeNum, + "peer_node_id": row.PeerNodeID, + "peer_node_num": row.PeerNodeNum, + "direction": row.Direction, + "topic": row.Topic, + "packet_id": row.PacketID, + "text": row.Text, + "payload_len": row.PayloadLen, + "pki_encrypted": row.PKIEncrypted, + "want_ack": row.WantAck, + "gateway_id": row.GatewayID, + "status": row.Status, + "error": row.Error, + "bot_message_id": row.BotMessageID, + "created_by": row.CreatedBy, + "published_at": row.PublishedAt, + "received_at": row.ReceivedAt, + "created_at": row.CreatedAt, + } +} diff --git a/bot_direct_message_store.go b/bot_direct_message_store.go new file mode 100644 index 0000000..588848b --- /dev/null +++ b/bot_direct_message_store.go @@ -0,0 +1,218 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "gorm.io/gorm" +) + +type botDirectMessageListOptions struct { + listOptions + BotID uint64 + PeerNodeNum int64 + Direction string +} + +// InsertBotDirectMessage 把一条机器人 DM(出向或入向)写入 bot_direct_messages 表。 +func (s *store) InsertBotDirectMessage(row *botDirectMessageRecord) error { + if s == nil || s.db == nil { + return fmt.Errorf("store is not configured") + } + if row == nil { + return fmt.Errorf("bot direct message is required") + } + if row.Direction == "" { + return fmt.Errorf("bot direct message direction is required") + } + return s.db.Create(row).Error +} + +// UpdateBotDirectMessageStatus 更新一条出向 DM 的发送状态(pending → published/failed)。 +func (s *store) UpdateBotDirectMessageStatus(id uint64, status, errText string, publishedAt *time.Time) error { + if s == nil || s.db == nil { + return fmt.Errorf("store is not configured") + } + if id == 0 { + return fmt.Errorf("bot direct message id is required") + } + updates := map[string]any{ + "status": status, + "error": strings.TrimSpace(errText), + "published_at": publishedAt, + } + result := s.db.Model(&botDirectMessageRecord{}).Where("id = ?", id).Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +// ListBotDirectMessagesByConversation 按 (bot, peer) 反序拉取 DM 历史,给 /admin/bot/direct 页面。 +func (s *store) ListBotDirectMessagesByConversation(opts botDirectMessageListOptions) ([]botDirectMessageRecord, error) { + if s == nil || s.db == nil { + return nil, fmt.Errorf("store is not configured") + } + if opts.BotID == 0 { + return nil, fmt.Errorf("bot id is required") + } + if opts.PeerNodeNum == 0 { + return nil, fmt.Errorf("peer node num is required") + } + opts.listOptions = normalizeListOptions(opts.listOptions) + var rows []botDirectMessageRecord + q := s.db.Model(&botDirectMessageRecord{}). + Where("bot_id = ? AND peer_node_num = ?", opts.BotID, opts.PeerNodeNum). + Order("created_at DESC"). + Order("id DESC"). + Limit(opts.Limit). + Offset(opts.Offset) + if opts.Direction != "" { + q = q.Where("direction = ?", opts.Direction) + } + if opts.Since != nil { + q = q.Where("created_at >= ?", *opts.Since) + } + if opts.Until != nil { + q = q.Where("created_at <= ?", *opts.Until) + } + return rows, q.Find(&rows).Error +} + +// CountBotDirectMessagesByConversation 返回会话总条数(前端无限滚动可用,可选)。 +func (s *store) CountBotDirectMessagesByConversation(opts botDirectMessageListOptions) (int64, error) { + if s == nil || s.db == nil { + return 0, fmt.Errorf("store is not configured") + } + if opts.BotID == 0 || opts.PeerNodeNum == 0 { + return 0, fmt.Errorf("bot id and peer node num are required") + } + var total int64 + q := s.db.Model(&botDirectMessageRecord{}). + Where("bot_id = ? AND peer_node_num = ?", opts.BotID, opts.PeerNodeNum) + if opts.Direction != "" { + q = q.Where("direction = ?", opts.Direction) + } + if opts.Since != nil { + q = q.Where("created_at >= ?", *opts.Since) + } + if opts.Until != nil { + q = q.Where("created_at <= ?", *opts.Until) + } + return total, q.Count(&total).Error +} + +// FindBotForIncomingPKIPacket 在 bot_direct_messages 写入路径上判断接收方是否为受管 bot。 +// 返回的 bot 用于填充 BotID/BotNodeID/BotNodeNum;不命中时返回 ErrRecordNotFound。 +func (s *store) FindBotForIncomingPKIPacket(toNodeNum int64) (*botNodeRecord, error) { + if s == nil || s.db == nil { + return nil, fmt.Errorf("store is not configured") + } + bot, err := s.GetBotNodeByNodeNum(toNodeNum) + if err != nil { + return nil, err + } + if !bot.Enabled { + return nil, errors.New("bot disabled") + } + return bot, nil +} + +// isInboundBotDirectMessage 判断 record 是否是“PKI 加密、发往受管 bot”的入向 DM。 +// 仅在 type=text_message、pki_encrypted=true、packet_to_num 命中受管 bot 时返回 true。 +// 任何步骤失败都返回 false,让记录回落到 text_message 表(与之前行为兼容)。 +func isInboundBotDirectMessage(s *store, record map[string]any) bool { + if s == nil || record == nil { + return false + } + if pki, _ := record["pki_encrypted"].(bool); !pki { + return false + } + toNum, ok := uint32FromRecord(record["packet_to_num"]) + if !ok || toNum == 0 { + return false + } + bot, err := s.FindBotForIncomingPKIPacket(int64(toNum)) + if err != nil || bot == nil { + return false + } + return true +} + +// insertInboundBotDirectMessage 把一条入向 PKI DM 转写入 bot_direct_messages 表。 +// 失败时返回错误,由 dbWriteQueue 统一打印 db_error 事件。 +func insertInboundBotDirectMessage(s *store, record map[string]any, clientInfo mqttClientInfo) error { + if s == nil { + return fmt.Errorf("store is not configured") + } + if record == nil { + return fmt.Errorf("record is required") + } + toNum, ok := uint32FromRecord(record["packet_to_num"]) + if !ok || toNum == 0 { + return fmt.Errorf("missing packet_to_num") + } + bot, err := s.FindBotForIncomingPKIPacket(int64(toNum)) + if err != nil { + return fmt.Errorf("lookup bot for inbound DM: %w", err) + } + peerNum, ok := uint32FromRecord(record["from_num"]) + if !ok || peerNum == 0 { + return fmt.Errorf("missing from_num") + } + peerNodeID, _ := record["from"].(string) + if peerNodeID == "" { + return fmt.Errorf("missing from") + } + packetID, _ := uint32FromRecord(record["packet_id"]) + topic, _ := record["topic"].(string) + gateway, _ := record["gateway_id"].(string) + var gatewayPtr *string + if gw := strings.TrimSpace(gateway); gw != "" { + gatewayPtr = &gw + } + text, _ := record["text"].(string) + wantAck, _ := record["want_ack"].(bool) + payloadLen, _ := record["payload_len"].(int) + if payloadLen == 0 { + if v, ok := record["payload_len"].(int64); ok { + payloadLen = int(v) + } + } + contentJSON, encodeErr := json.Marshal(record) + var contentPtr *string + if encodeErr == nil { + s := string(contentJSON) + contentPtr = &s + } + now := time.Now() + dm := &botDirectMessageRecord{ + BotID: bot.ID, + BotNodeID: bot.NodeID, + BotNodeNum: bot.NodeNum, + PeerNodeID: peerNodeID, + PeerNodeNum: int64(peerNum), + Direction: botDirectMessageDirectionInbound, + Topic: topic, + PacketID: int64(packetID), + Text: text, + PayloadLen: int64(payloadLen), + PKIEncrypted: true, + WantAck: wantAck, + GatewayID: gatewayPtr, + Status: botMessageStatusPublished, + ReceivedAt: &now, + ContentJSON: contentPtr, + } + if err := s.InsertBotDirectMessage(dm); err != nil { + return fmt.Errorf("insert bot direct message from %s: %w", peerNodeID, err) + } + _ = clientInfo // mqtt 元数据已经记录在 content_json 里,这里保留参数以保持队列签名一致 + return nil +} diff --git a/bot_service.go b/bot_service.go index 4731417..65b3881 100644 --- a/bot_service.go +++ b/bot_service.go @@ -355,7 +355,75 @@ func (s *botService) sendPKIDirect(bot *botNodeRecord, fromNodeNum, toNodeNum ui Status: botMessageStatusPending, CreatedBy: strings.TrimSpace(createdBy), } - return s.persistAndPublish(row, topic, raw) + result, err := s.persistAndPublish(row, topic, raw) + // 不论发送结果如何,都把 DM 镜像写入 bot_direct_messages 以驱动 /admin/bot/direct 渲染。 + // 这里把发送结果(status/error/published_at)同步过去——成功时 status=published, + // 失败时 status=failed,前端就能看到本地视图与发送日志一致。 + s.recordOutboundDirectMessage(bot, row, *toNodeID, toNodeNum, text, len(raw), err) + return result, err +} + +// recordOutboundDirectMessage 把出向 PKI DM 写入 bot_direct_messages。失败仅打日志。 +func (s *botService) recordOutboundDirectMessage(bot *botNodeRecord, msg *botMessageRecord, peerNodeID string, peerNodeNum uint32, text string, payloadLen int, sendErr error) { + if s == nil || s.store == nil || msg == nil || bot == nil { + return + } + status := msg.Status + if status == "" { + if sendErr != nil { + status = botMessageStatusFailed + } else { + status = botMessageStatusPublished + } + } + errText := msg.Error + if errText == "" && sendErr != nil { + errText = sendErr.Error() + } + createdBy := strings.TrimSpace(msg.CreatedBy) + var createdByPtr *string + if createdBy != "" { + createdByPtr = &createdBy + } + gateway := strings.TrimSpace(bot.NodeID) + var gatewayPtr *string + if gateway != "" { + gatewayPtr = &gateway + } + var botMessageID *uint64 + if msg.ID != 0 { + id := msg.ID + botMessageID = &id + } + dm := &botDirectMessageRecord{ + BotID: bot.ID, + BotNodeID: bot.NodeID, + BotNodeNum: bot.NodeNum, + PeerNodeID: peerNodeID, + PeerNodeNum: int64(peerNodeNum), + Direction: botDirectMessageDirectionOutbound, + Topic: msg.Topic, + PacketID: msg.PacketID, + Text: text, + PayloadLen: int64(payloadLen), + PKIEncrypted: true, + WantAck: false, // 我们当前发送的 DM 默认不显式请求 ack + GatewayID: gatewayPtr, + Status: status, + Error: strings.TrimSpace(errText), + BotMessageID: botMessageID, + CreatedBy: createdByPtr, + PublishedAt: msg.PublishedAt, + } + if err := s.store.InsertBotDirectMessage(dm); err != nil { + printJSON(map[string]any{ + "event": "bot_direct_message_outbound_persist_failed", + "bot_node_id": bot.NodeID, + "peer_node_id": peerNodeID, + "bot_message_id": msg.ID, + "error": err.Error(), + }) + } } // lookupRecipientPublicKey 从 nodeinfo 表中按 node_id 查询目标节点的 X25519 公钥(hex 编码)。 diff --git a/db.go b/db.go index a6582bd..82980a3 100644 --- a/db.go +++ b/db.go @@ -286,6 +286,47 @@ func (botMessageRecord) TableName() string { return "bot_messages" } +// botDirectMessageRecord 专门保存机器人参与的 PKI 私聊(DM)。 +// +// - 设计原因:text_message 表只存频道消息;DM 是端到端的,逻辑上属于 “一对会话”,需要按 +// bot+对端聚合渲染,与 text_message 全表浏览的形态不一样。 +// - direction = "outbound" 表示 bot → device;"inbound" 表示 device → bot。 +// - 出向消息在发送时插入 status=pending,发送成功后更新为 published;入向消息默认直接 +// published。两种方向都通过 bot_id/peer_node_num 索引快速回放会话。 +type botDirectMessageRecord struct { + ID uint64 `gorm:"column:id;primaryKey;autoIncrement"` + BotID uint64 `gorm:"column:bot_id;not null;index:idx_bot_dm_bot_peer,priority:1;index:idx_bot_dm_bot_created_at,priority:1"` + BotNodeID string `gorm:"column:bot_node_id;not null;index"` + BotNodeNum int64 `gorm:"column:bot_node_num;not null;index"` + PeerNodeID string `gorm:"column:peer_node_id;not null;index:idx_bot_dm_bot_peer,priority:2"` + PeerNodeNum int64 `gorm:"column:peer_node_num;not null;index"` + Direction string `gorm:"column:direction;not null;index"` + Topic string `gorm:"column:topic;not null"` + PacketID int64 `gorm:"column:packet_id;not null;index"` + Text string `gorm:"column:text;type:text;not null"` + PayloadLen int64 `gorm:"column:payload_len;not null"` + PKIEncrypted bool `gorm:"column:pki_encrypted;not null"` + WantAck bool `gorm:"column:want_ack;not null"` + GatewayID *string `gorm:"column:gateway_id"` + Status string `gorm:"column:status;not null;index"` + Error string `gorm:"column:error;type:text"` + BotMessageID *uint64 `gorm:"column:bot_message_id;index"` + CreatedBy *string `gorm:"column:created_by"` + PublishedAt *time.Time `gorm:"column:published_at;index"` + ReceivedAt *time.Time `gorm:"column:received_at;index"` + ContentJSON *string `gorm:"column:content_json;type:text"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime;index:idx_bot_dm_bot_created_at,priority:2"` +} + +func (botDirectMessageRecord) TableName() string { + return "bot_direct_messages" +} + +const ( + botDirectMessageDirectionInbound = "inbound" + botDirectMessageDirectionOutbound = "outbound" +) + type nodeInfoRecord struct { NodeID string `gorm:"column:node_id;primaryKey;not null"` NodeNum int64 `gorm:"column:node_num;not null;index"` @@ -491,6 +532,7 @@ func (s *store) migrate() error { {label: "mqtt_forward_topics", model: &mqttForwardTopicRecord{}}, {label: "bot_nodes", model: &botNodeRecord{}}, {label: "bot_messages", model: &botMessageRecord{}}, + {label: "bot_direct_messages", model: &botDirectMessageRecord{}}, {label: "nodeinfo", model: &nodeInfoRecord{}}, {label: "map_report", model: &mapReportRecord{}}, {label: "text_message", model: &textMessageRecord{}}, @@ -511,6 +553,7 @@ func (s *store) migrate() error { indexes []string }{ {label: "text_message", model: &textMessageRecord{}, indexes: []string{"idx_text_message_from_num_created_at", "idx_text_message_created_at", "idx_text_message_packet_id"}}, + {label: "bot_direct_messages", model: &botDirectMessageRecord{}, indexes: []string{"idx_bot_dm_bot_peer", "idx_bot_dm_bot_created_at"}}, } { if err := createMissingIndexes(migrator, item.model, item.label, item.indexes); err != nil { return err diff --git a/db_write_queue.go b/db_write_queue.go index 5fc76c1..074cc72 100644 --- a/db_write_queue.go +++ b/db_write_queue.go @@ -43,6 +43,14 @@ func (q *dbWriteQueue) EnqueueRecord(record map[string]any, clientInfo mqttClien return q.store.UpsertMapReport(record) }}) case "text_message": + // 私聊(PKI 加密、发往受管 bot)单独走 bot_direct_messages 表, + // 不再写入 text_message 以避免和频道消息混在一起。 + if isInboundBotDirectMessage(q.store, record) { + q.enqueue(dbWriteJob{typeName: "bot_direct_message_inbound", from: record["from"], run: func() error { + return insertInboundBotDirectMessage(q.store, record, clientInfo) + }}) + return + } q.enqueue(dbWriteJob{typeName: "text_message", from: record["from"], run: func() error { return q.store.InsertTextMessage(record, clientInfo) }}) diff --git a/meshmap_frontend/src/api.ts b/meshmap_frontend/src/api.ts index 17843a4..27b607f 100644 --- a/meshmap_frontend/src/api.ts +++ b/meshmap_frontend/src/api.ts @@ -42,6 +42,7 @@ import type { PublicMapTileSourcesResponse, TelemetryRecord, TextMessage, + BotDirectMessage, } from './types' async function requestJSON(path: string, init?: RequestInit): Promise { @@ -377,12 +378,12 @@ export function getBotMessages(botId = 0, limit = 100, offset = 0): Promise>(`/api/admin/bot/messages?${params.toString()}`) } -export function getBotDirectTextMessages(botId: number, targetNodeNum: number, limit = 100, offset = 0, channelId = ''): Promise> { +export function getBotDirectMessages(botId: number, targetNodeNum: number, limit = 100, offset = 0, direction = ''): Promise> { const params = new URLSearchParams({ bot_id: String(botId), target_node_num: String(targetNodeNum), limit: String(limit), offset: String(offset) }) - if (channelId) { - params.set('channel_id', channelId) + if (direction) { + params.set('direction', direction) } - return getJSON>(`/api/admin/bot/direct-messages?${params.toString()}`) + return getJSON>(`/api/admin/bot/direct-messages?${params.toString()}`) } export function sendBotMessage(payload: BotSendMessagePayload): Promise { diff --git a/meshmap_frontend/src/components/AdminBotDirect.vue b/meshmap_frontend/src/components/AdminBotDirect.vue index 6d22e8f..b643087 100644 --- a/meshmap_frontend/src/components/AdminBotDirect.vue +++ b/meshmap_frontend/src/components/AdminBotDirect.vue @@ -1,18 +1,16 @@