219 lines
6.6 KiB
Go
219 lines
6.6 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type botDirectMessageListOptions struct {
|
|
listOptions
|
|
BotID uint64
|
|
PeerNodeNum int64
|
|
Direction string
|
|
}
|
|
|
|
// InsertBotDirectMessage 把一条机器人 DM(出向或入向)写入 bot_direct_messages 表。
|
|
func (s *store) InsertBotDirectMessage(row *botDirectMessageRecord) error {
|
|
if s == nil || s.db == nil {
|
|
return fmt.Errorf("store is not configured")
|
|
}
|
|
if row == nil {
|
|
return fmt.Errorf("bot direct message is required")
|
|
}
|
|
if row.Direction == "" {
|
|
return fmt.Errorf("bot direct message direction is required")
|
|
}
|
|
return s.db.Create(row).Error
|
|
}
|
|
|
|
// UpdateBotDirectMessageStatus 更新一条出向 DM 的发送状态(pending → published/failed)。
|
|
func (s *store) UpdateBotDirectMessageStatus(id uint64, status, errText string, publishedAt *time.Time) error {
|
|
if s == nil || s.db == nil {
|
|
return fmt.Errorf("store is not configured")
|
|
}
|
|
if id == 0 {
|
|
return fmt.Errorf("bot direct message id is required")
|
|
}
|
|
updates := map[string]any{
|
|
"status": status,
|
|
"error": strings.TrimSpace(errText),
|
|
"published_at": publishedAt,
|
|
}
|
|
result := s.db.Model(&botDirectMessageRecord{}).Where("id = ?", id).Updates(updates)
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
if result.RowsAffected == 0 {
|
|
return gorm.ErrRecordNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListBotDirectMessagesByConversation 按 (bot, peer) 反序拉取 DM 历史,给 /admin/bot/direct 页面。
|
|
func (s *store) ListBotDirectMessagesByConversation(opts botDirectMessageListOptions) ([]botDirectMessageRecord, error) {
|
|
if s == nil || s.db == nil {
|
|
return nil, fmt.Errorf("store is not configured")
|
|
}
|
|
if opts.BotID == 0 {
|
|
return nil, fmt.Errorf("bot id is required")
|
|
}
|
|
if opts.PeerNodeNum == 0 {
|
|
return nil, fmt.Errorf("peer node num is required")
|
|
}
|
|
opts.listOptions = normalizeListOptions(opts.listOptions)
|
|
var rows []botDirectMessageRecord
|
|
q := s.db.Model(&botDirectMessageRecord{}).
|
|
Where("bot_id = ? AND peer_node_num = ?", opts.BotID, opts.PeerNodeNum).
|
|
Order("created_at DESC").
|
|
Order("id DESC").
|
|
Limit(opts.Limit).
|
|
Offset(opts.Offset)
|
|
if opts.Direction != "" {
|
|
q = q.Where("direction = ?", opts.Direction)
|
|
}
|
|
if opts.Since != nil {
|
|
q = q.Where("created_at >= ?", *opts.Since)
|
|
}
|
|
if opts.Until != nil {
|
|
q = q.Where("created_at <= ?", *opts.Until)
|
|
}
|
|
return rows, q.Find(&rows).Error
|
|
}
|
|
|
|
// CountBotDirectMessagesByConversation 返回会话总条数(前端无限滚动可用,可选)。
|
|
func (s *store) CountBotDirectMessagesByConversation(opts botDirectMessageListOptions) (int64, error) {
|
|
if s == nil || s.db == nil {
|
|
return 0, fmt.Errorf("store is not configured")
|
|
}
|
|
if opts.BotID == 0 || opts.PeerNodeNum == 0 {
|
|
return 0, fmt.Errorf("bot id and peer node num are required")
|
|
}
|
|
var total int64
|
|
q := s.db.Model(&botDirectMessageRecord{}).
|
|
Where("bot_id = ? AND peer_node_num = ?", opts.BotID, opts.PeerNodeNum)
|
|
if opts.Direction != "" {
|
|
q = q.Where("direction = ?", opts.Direction)
|
|
}
|
|
if opts.Since != nil {
|
|
q = q.Where("created_at >= ?", *opts.Since)
|
|
}
|
|
if opts.Until != nil {
|
|
q = q.Where("created_at <= ?", *opts.Until)
|
|
}
|
|
return total, q.Count(&total).Error
|
|
}
|
|
|
|
// FindBotForIncomingPKIPacket 在 bot_direct_messages 写入路径上判断接收方是否为受管 bot。
|
|
// 返回的 bot 用于填充 BotID/BotNodeID/BotNodeNum;不命中时返回 ErrRecordNotFound。
|
|
func (s *store) FindBotForIncomingPKIPacket(toNodeNum int64) (*botNodeRecord, error) {
|
|
if s == nil || s.db == nil {
|
|
return nil, fmt.Errorf("store is not configured")
|
|
}
|
|
bot, err := s.GetBotNodeByNodeNum(toNodeNum)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !bot.Enabled {
|
|
return nil, errors.New("bot disabled")
|
|
}
|
|
return bot, nil
|
|
}
|
|
|
|
// isInboundBotDirectMessage 判断 record 是否是“PKI 加密、发往受管 bot”的入向 DM。
|
|
// 仅在 type=text_message、pki_encrypted=true、packet_to_num 命中受管 bot 时返回 true。
|
|
// 任何步骤失败都返回 false,让记录回落到 text_message 表(与之前行为兼容)。
|
|
func isInboundBotDirectMessage(s *store, record map[string]any) bool {
|
|
if s == nil || record == nil {
|
|
return false
|
|
}
|
|
if pki, _ := record["pki_encrypted"].(bool); !pki {
|
|
return false
|
|
}
|
|
toNum, ok := uint32FromRecord(record["packet_to_num"])
|
|
if !ok || toNum == 0 {
|
|
return false
|
|
}
|
|
bot, err := s.FindBotForIncomingPKIPacket(int64(toNum))
|
|
if err != nil || bot == nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// insertInboundBotDirectMessage 把一条入向 PKI DM 转写入 bot_direct_messages 表。
|
|
// 失败时返回错误,由 dbWriteQueue 统一打印 db_error 事件。
|
|
func insertInboundBotDirectMessage(s *store, record map[string]any, clientInfo mqttClientInfo) error {
|
|
if s == nil {
|
|
return fmt.Errorf("store is not configured")
|
|
}
|
|
if record == nil {
|
|
return fmt.Errorf("record is required")
|
|
}
|
|
toNum, ok := uint32FromRecord(record["packet_to_num"])
|
|
if !ok || toNum == 0 {
|
|
return fmt.Errorf("missing packet_to_num")
|
|
}
|
|
bot, err := s.FindBotForIncomingPKIPacket(int64(toNum))
|
|
if err != nil {
|
|
return fmt.Errorf("lookup bot for inbound DM: %w", err)
|
|
}
|
|
peerNum, ok := uint32FromRecord(record["from_num"])
|
|
if !ok || peerNum == 0 {
|
|
return fmt.Errorf("missing from_num")
|
|
}
|
|
peerNodeID, _ := record["from"].(string)
|
|
if peerNodeID == "" {
|
|
return fmt.Errorf("missing from")
|
|
}
|
|
packetID, _ := uint32FromRecord(record["packet_id"])
|
|
topic, _ := record["topic"].(string)
|
|
gateway, _ := record["gateway_id"].(string)
|
|
var gatewayPtr *string
|
|
if gw := strings.TrimSpace(gateway); gw != "" {
|
|
gatewayPtr = &gw
|
|
}
|
|
text, _ := record["text"].(string)
|
|
wantAck, _ := record["want_ack"].(bool)
|
|
payloadLen, _ := record["payload_len"].(int)
|
|
if payloadLen == 0 {
|
|
if v, ok := record["payload_len"].(int64); ok {
|
|
payloadLen = int(v)
|
|
}
|
|
}
|
|
contentJSON, encodeErr := json.Marshal(record)
|
|
var contentPtr *string
|
|
if encodeErr == nil {
|
|
s := string(contentJSON)
|
|
contentPtr = &s
|
|
}
|
|
now := time.Now()
|
|
dm := &botDirectMessageRecord{
|
|
BotID: bot.ID,
|
|
BotNodeID: bot.NodeID,
|
|
BotNodeNum: bot.NodeNum,
|
|
PeerNodeID: peerNodeID,
|
|
PeerNodeNum: int64(peerNum),
|
|
Direction: botDirectMessageDirectionInbound,
|
|
Topic: topic,
|
|
PacketID: int64(packetID),
|
|
Text: text,
|
|
PayloadLen: int64(payloadLen),
|
|
PKIEncrypted: true,
|
|
WantAck: wantAck,
|
|
GatewayID: gatewayPtr,
|
|
Status: botMessageStatusPublished,
|
|
ReceivedAt: &now,
|
|
ContentJSON: contentPtr,
|
|
}
|
|
if err := s.InsertBotDirectMessage(dm); err != nil {
|
|
return fmt.Errorf("insert bot direct message from %s: %w", peerNodeID, err)
|
|
}
|
|
_ = clientInfo // mqtt 元数据已经记录在 content_json 里,这里保留参数以保持队列签名一致
|
|
return nil
|
|
}
|