Files
meshtastic_mqtt_server/admin_mqtt_forward_routes.go
T
2026-06-05 18:49:30 +08:00

282 lines
10 KiB
Go

package main
import (
"errors"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type mqttForwarderRequest struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
SourceHost string `json:"source_host"`
SourcePort int `json:"source_port"`
SourceUsername string `json:"source_username"`
SourcePassword *string `json:"source_password"`
SourcePasswordClear bool `json:"source_password_clear"`
SourceClientID string `json:"source_client_id"`
SourceTLS bool `json:"source_tls"`
TargetHost string `json:"target_host"`
TargetPort int `json:"target_port"`
TargetUsername string `json:"target_username"`
TargetPassword *string `json:"target_password"`
TargetPasswordClear bool `json:"target_password_clear"`
TargetClientID string `json:"target_client_id"`
TargetTLS bool `json:"target_tls"`
}
type mqttForwardTopicRequest struct {
Topic string `json:"topic"`
Enabled bool `json:"enabled"`
Direction string `json:"direction"`
SourcePrefix string `json:"source_prefix"`
TargetPrefix string `json:"target_prefix"`
QoS int `json:"qos"`
Retain bool `json:"retain"`
}
func registerAdminMQTTForwardRoutes(r gin.IRouter, store *store, forwarder mqttForwardReloader) {
r.GET("/mqtt-forward/forwarders", func(c *gin.Context) {
opts, ok := parseListOptions(c)
if !ok {
return
}
rows, err := store.ListMQTTForwarders(opts)
if err != nil {
writeListResponse(c, rows, opts, err, mqttForwarderDTO)
return
}
total, err := store.CountMQTTForwarders(opts)
writeListResponseWithTotal(c, rows, opts, total, err, mqttForwarderDTO)
})
r.POST("/mqtt-forward/forwarders", func(c *gin.Context) {
var req mqttForwarderRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid mqtt forwarder request"})
return
}
input := mqttForwarderInputFromRequest(req)
row, err := store.CreateMQTTForwarder(input)
writeMQTTForwardMutationResponse(c, http.StatusCreated, row, err, func() error {
return reloadMQTTForwarder(forwarder, row.ID)
})
})
r.PUT("/mqtt-forward/forwarders/:id", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forwarder id")
if !ok {
return
}
var req mqttForwarderRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid mqtt forwarder request"})
return
}
input := mqttForwarderInputFromRequest(req)
row, err := store.UpdateMQTTForwarder(id, input)
writeMQTTForwardMutationResponse(c, http.StatusOK, row, err, func() error {
return reloadMQTTForwarder(forwarder, id)
})
})
r.DELETE("/mqtt-forward/forwarders/:id", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forwarder id")
if !ok {
return
}
if forwarder != nil {
forwarder.StopForwarder(id)
}
writeMQTTForwardDeleteResponse(c, store.DeleteMQTTForwarder(id), nil)
})
r.POST("/mqtt-forward/forwarders/:id/restart", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forwarder id")
if !ok {
return
}
if err := reloadMQTTForwarder(forwarder, id); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
r.GET("/mqtt-forward/forwarders/:id/topics", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forwarder id")
if !ok {
return
}
opts, ok := parseListOptions(c)
if !ok {
return
}
rows, err := store.ListMQTTForwardTopics(id, opts)
if err != nil {
writeListResponse(c, rows, opts, err, mqttForwardTopicDTO)
return
}
total, err := store.CountMQTTForwardTopics(id)
writeListResponseWithTotal(c, rows, opts, total, err, mqttForwardTopicDTO)
})
r.POST("/mqtt-forward/forwarders/:id/topics", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forwarder id")
if !ok {
return
}
var req mqttForwardTopicRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid mqtt forward topic request"})
return
}
row, err := store.CreateMQTTForwardTopic(id, mqttForwardTopicInputFromRequest(req))
writeMQTTForwardTopicMutationResponse(c, http.StatusCreated, row, err, func() error {
return reloadMQTTForwarder(forwarder, id)
})
})
r.PUT("/mqtt-forward/topics/:id", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forward topic id")
if !ok {
return
}
var req mqttForwardTopicRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid mqtt forward topic request"})
return
}
row, err := store.UpdateMQTTForwardTopic(id, mqttForwardTopicInputFromRequest(req))
writeMQTTForwardTopicMutationResponse(c, http.StatusOK, row, err, func() error {
return reloadMQTTForwarder(forwarder, row.ForwarderID)
})
})
r.DELETE("/mqtt-forward/topics/:id", func(c *gin.Context) {
id, ok := parseMQTTForwardID(c, "invalid mqtt forward topic id")
if !ok {
return
}
row, err := store.GetMQTTForwardTopic(id)
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "mqtt forward topic not found"})
return
}
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
parentID := row.ForwarderID
writeMQTTForwardDeleteResponse(c, store.DeleteMQTTForwardTopic(id), func() error {
return reloadMQTTForwarder(forwarder, parentID)
})
})
r.GET("/mqtt-forward/status", func(c *gin.Context) {
items := []mqttForwardRuntimeStatus{}
if forwarder != nil {
items = forwarder.Status()
}
c.JSON(http.StatusOK, gin.H{"items": items})
})
}
func mqttForwarderInputFromRequest(req mqttForwarderRequest) mqttForwarderInput {
sourcePassword := req.SourcePassword
if req.SourcePasswordClear {
empty := ""
sourcePassword = &empty
}
targetPassword := req.TargetPassword
if req.TargetPasswordClear {
empty := ""
targetPassword = &empty
}
return mqttForwarderInput{Name: req.Name, Enabled: req.Enabled, SourceHost: req.SourceHost, SourcePort: req.SourcePort, SourceUsername: req.SourceUsername, SourcePassword: sourcePassword, SourceClientID: req.SourceClientID, SourceTLS: req.SourceTLS, TargetHost: req.TargetHost, TargetPort: req.TargetPort, TargetUsername: req.TargetUsername, TargetPassword: targetPassword, TargetClientID: req.TargetClientID, TargetTLS: req.TargetTLS}
}
func mqttForwardTopicInputFromRequest(req mqttForwardTopicRequest) mqttForwardTopicInput {
return mqttForwardTopicInput{Topic: req.Topic, Enabled: req.Enabled, Direction: req.Direction, SourcePrefix: req.SourcePrefix, TargetPrefix: req.TargetPrefix, QoS: req.QoS, Retain: req.Retain}
}
func parseMQTTForwardID(c *gin.Context, message string) (uint64, bool) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil || id == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": message})
return 0, false
}
return id, true
}
func reloadMQTTForwarder(forwarder mqttForwardReloader, id uint64) error {
if forwarder == nil {
return nil
}
return forwarder.ReloadForwarder(id)
}
func writeMQTTForwardMutationResponse(c *gin.Context, status int, row *mqttForwarderRecord, err error, afterSuccess func() error) {
if errors.Is(err, errMQTTForwarderAlreadyExists) {
c.JSON(http.StatusConflict, gin.H{"error": "mqtt forwarder already exists"})
return
}
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "mqtt forwarder not found"})
return
}
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if afterSuccess != nil {
if err := afterSuccess(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "mqtt forwarder saved but reload failed: " + err.Error()})
return
}
}
c.JSON(status, gin.H{"item": mqttForwarderDTO(*row)})
}
func writeMQTTForwardTopicMutationResponse(c *gin.Context, status int, row *mqttForwardTopicRecord, err error, afterSuccess func() error) {
if errors.Is(err, errMQTTForwardTopicAlreadyExists) {
c.JSON(http.StatusConflict, gin.H{"error": "mqtt forward topic already exists"})
return
}
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "mqtt forward topic not found"})
return
}
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if afterSuccess != nil {
if err := afterSuccess(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "mqtt forward topic saved but reload failed: " + err.Error()})
return
}
}
c.JSON(status, gin.H{"item": mqttForwardTopicDTO(*row)})
}
func writeMQTTForwardDeleteResponse(c *gin.Context, err error, afterSuccess func() error) {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "mqtt forward item not found"})
return
}
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
if afterSuccess != nil {
if err := afterSuccess(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "mqtt forward item deleted but reload failed: " + err.Error()})
return
}
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
func mqttForwarderDTO(row mqttForwarderRecord) gin.H {
return gin.H{"id": row.ID, "name": row.Name, "enabled": row.Enabled, "source_host": row.SourceHost, "source_port": row.SourcePort, "source_username": row.SourceUsername, "source_password_set": row.SourcePassword != "", "source_client_id": row.SourceClientID, "source_tls": row.SourceTLS, "target_host": row.TargetHost, "target_port": row.TargetPort, "target_username": row.TargetUsername, "target_password_set": row.TargetPassword != "", "target_client_id": row.TargetClientID, "target_tls": row.TargetTLS, "created_at": row.CreatedAt, "updated_at": row.UpdatedAt}
}
func mqttForwardTopicDTO(row mqttForwardTopicRecord) gin.H {
return gin.H{"id": row.ID, "forwarder_id": row.ForwarderID, "topic": row.Topic, "enabled": row.Enabled, "direction": row.Direction, "source_prefix": row.SourcePrefix, "target_prefix": row.TargetPrefix, "qos": row.QoS, "retain": row.Retain, "created_at": row.CreatedAt, "updated_at": row.UpdatedAt}
}