diff --git a/config/config.go b/config/config.go index d441f3f..4a9a098 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ type Config struct { Auth AuthConfig `yaml:"auth"` Logging LoggingConfig `yaml:"logging"` Database DatabaseConfig `yaml:"database"` + Admin AdminConfig `yaml:"admin"` } // ServerConfig MQTT 监听相关配置 @@ -68,6 +69,20 @@ type DatabaseConfig struct { DSN string `yaml:"dsn"` } +// AdminConfig HTTP 管理界面配置 +type AdminConfig struct { + // 是否启用管理界面,默认 false + Enabled bool `yaml:"enabled"` + // HTTP 监听端口,留空则默认 :8080 + Port string `yaml:"port"` + // 认证类型:none(默认) / basic / token(预留) + AuthType string `yaml:"auth_type"` + // Basic 认证用户名 + Username string `yaml:"username"` + // Basic 认证密码 + Password string `yaml:"password"` +} + // --------------------------------------------------------------------------- // 全局单例 // --------------------------------------------------------------------------- @@ -184,6 +199,15 @@ func applyDefaults(cfg *Config) bool { cfg.Database.File = "meshgo.db" changed = true } + // Admin + if cfg.Admin.Port == "" { + cfg.Admin.Port = ":8080" + changed = true + } + if cfg.Admin.AuthType == "" { + cfg.Admin.AuthType = "none" + changed = true + } return changed } @@ -211,5 +235,12 @@ func defaultConfig() *Config { File: "meshgo.db", DSN: "", }, + Admin: AdminConfig{ + Enabled: false, + Port: ":8080", + AuthType: "none", + Username: "", + Password: "", + }, } } diff --git a/go.mod b/go.mod index 1720f38..4c7fabc 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,43 @@ go 1.25.5 require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.15.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect + github.com/gabriel-vasile/mimetype v1.4.12 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect + github.com/gin-gonic/gin v1.12.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.30.1 // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/goccy/go-yaml v1.19.2 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/mochi-mqtt/server/v2 v2.7.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/quic-go/qpack v0.6.0 // indirect + github.com/quic-go/quic-go v0.59.0 // indirect github.com/rs/xid v1.4.0 // indirect - golang.org/x/text v0.21.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.3.1 // indirect + go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect + golang.org/x/arch v0.22.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/net v0.51.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/mysql v1.6.0 // indirect gorm.io/driver/sqlite v1.6.0 // indirect diff --git a/go.sum b/go.sum index 2584cff..9dc3bdd 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,98 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.12 h1:e9hWvmLYvtp846tLHam2o++qitpguFiYCKbn0w9jyqw= +github.com/gabriel-vasile/mimetype v1.4.12/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= +github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-gonic/gin v1.12.0 h1:b3YAbrZtnf8N//yjKeU2+MQsh2mY5htkZidOM7O0wG8= +github.com/gin-gonic/gin v1.12.0/go.mod h1:VxccKfsSllpKshkBWgVgRniFFAzFb9csfngsqANjnLc= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.30.1 h1:f3zDSN/zOma+w6+1Wswgd9fLkdwy06ntQJp0BBvFG0w= +github.com/go-playground/validator/v10 v10.30.1/go.mod h1:oSuBIQzuJxL//3MelwSLD5hc2Tu889bF0Idm9Dg26cM= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/goccy/go-yaml v1.19.2 h1:PmFC1S6h8ljIz6gMRBopkjP1TVT7xuwrButHID66PoM= +github.com/goccy/go-yaml v1.19.2/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI= github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= +github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= +github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SAw= +github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.3.1 h1:waO7eEiFDwidsBN6agj1vJQ4AG7lh2yqXyOXqhgQuyY= +github.com/ugorji/go/codec v1.3.1/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= +go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE= +go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= +golang.org/x/arch v0.22.0 h1:c/Zle32i5ttqRXjdLyyHZESLD/bB90DCU1g9l/0YBDI= +golang.org/x/arch v0.22.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= diff --git a/http/http.go b/http/http.go new file mode 100644 index 0000000..9601d85 --- /dev/null +++ b/http/http.go @@ -0,0 +1,155 @@ +package http + +import ( + "context" + _ "embed" + "log" + "meshgo/config" + "meshgo/stats" + "net/http" + "time" + + "github.com/gin-gonic/gin" +) + +// Server HTTP 管理界面服务 +type Server struct { + srv *http.Server + enabled bool +} + +// New 创建 HTTP 服务(不启动) +func New(cfg *config.AdminConfig) *Server { + if !cfg.Enabled { + return &Server{enabled: false} + } + + gin.SetMode(gin.ReleaseMode) + r := gin.New() + r.Use(gin.Recovery()) + + // 预留认证中间件扩展点 + r.Use(AuthMiddleware(cfg)) + + // 路由 + admin := r.Group("/admin/mqtt") + { + admin.GET("/api/stats", handleStats) + admin.GET("/api/clients", handleClients) + } + r.GET("/admin/mqtt", serveIndex) + r.GET("/", serveHome) + + addr := cfg.Port + if addr == "" { + addr = ":8080" + } + + return &Server{ + enabled: true, + srv: &http.Server{ + Addr: addr, + Handler: r, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }, + } +} + +// Start 启动 HTTP 服务(goroutine) +func (s *Server) Start() { + if !s.enabled { + return + } + go func() { + addr := s.srv.Addr + log.Printf("[http] 管理界面已启动: http://%s/", addr) + if err := s.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("[http] 管理界面启动失败: %v", err) + } + }() +} + +// Close 优雅关闭 +func (s *Server) Close() error { + if !s.enabled { + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return s.srv.Shutdown(ctx) +} + +// Enabled 返回是否启用 +func (s *Server) Enabled() bool { return s.enabled } + +// --------------------------------------------------------------------------- +// 路由处理 +// --------------------------------------------------------------------------- + +// serveIndex 返回管理页面 +func serveIndex(c *gin.Context) { + c.Header("Content-Type", "text/html; charset=utf-8") + c.String(http.StatusOK, indexHTML) +} + +// handleStats 返回实时统计(JSON) +func handleStats(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": stats.GetStats(), + }) +} + +// handleClients 返回在线客户端列表(JSON) +func handleClients(c *gin.Context) { + st := stats.GetStats() + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": st.Clients, + }) +} + +// --------------------------------------------------------------------------- +// 认证中间件(预留扩展点) +// AuthMiddleware 根据 cfg.AuthType 选择对应认证策略 +// 当前支持:none +// 预留支持:basic、token +// --------------------------------------------------------------------------- + +// AuthMiddleware 认证中间件工厂 +func AuthMiddleware(cfg *config.AdminConfig) gin.HandlerFunc { + switch cfg.AuthType { + case "basic": + return basicAuth(cfg.Username, cfg.Password) + case "token": + // TODO: token 认证 + return func(c *gin.Context) { c.Next() } + default: + // none:不做认证 + return func(c *gin.Context) { c.Next() } + } +} + +// basicAuth Basic 认证 +func basicAuth(user, pass string) gin.HandlerFunc { + return func(c *gin.Context) { + u, p, ok := c.Request.BasicAuth() + if !ok || u != user || p != pass { + c.Header("WWW-Authenticate", `Basic realm="meshgo"`) + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ + "code": 401, + "msg": "未授权", + }) + return + } + c.Next() + } +} + +// --------------------------------------------------------------------------- +// 模板(go:embed 内嵌) +// --------------------------------------------------------------------------- + +//go:embed index.html +var indexHTML string diff --git a/http/index.html b/http/index.html new file mode 100644 index 0000000..63e9fed --- /dev/null +++ b/http/index.html @@ -0,0 +1,237 @@ + + + + + + meshgo 管理面板 + + + + + +
meshgo MQTT 管理面板
+ +
+ + +
+
+
当前连接
+
-
+
实时在线客户端数
+
+
+
累计消息
+
-
+
所有主题消息总数
+
+
+
msh/# 消息
+
-
+
mesh 主题消息数
+
+
+
运行时长
+
-
+
-
+
+
+ + +
+
+

消息趋势(最近 30 条)

+
+
+
+

主题分布

+
+
+
+ + +
+

在线客户端

+ + + + + + + + + + + + + + +
状态客户端 IDIP 地址用户名订阅数连接时间
暂无数据
+
每 3 秒自动刷新
+
+ +
+ + + + + diff --git a/main.go b/main.go index 225bcc3..f22a227 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,8 @@ import ( "log" "meshgo/config" "meshgo/database" + "meshgo/http" + "meshgo/stats" "os" "os/signal" "syscall" @@ -60,6 +62,10 @@ func main() { if err := server.AddHook(new(meshDBHook), nil); err != nil { log.Fatalf("[main] 注册 payload 数据库 hook 失败: %v", err) } + // 注册统计 hook + if err := server.AddHook(new(stats.Hook), nil); err != nil { + log.Fatalf("[main] 注册统计 hook 失败: %v", err) + } // 添加 TCP 监听 tcpListener := listeners.NewTCP( @@ -81,6 +87,10 @@ func main() { log.Printf("[main] WebSocket 监听已绑定: %s", cfg.Server.WSAddr) } + // 启动 HTTP 管理界面 + httpServer := http.New(&cfg.Admin) + httpServer.Start() + // ---------------------------------------------------------------- // 5. 启动服务器(阻塞,直到收到退出信号) // ---------------------------------------------------------------- @@ -105,6 +115,7 @@ func main() { // 注:监听地址变更需要重启服务,此处仅刷新认证/日志等运行时配置 case syscall.SIGINT, syscall.SIGTERM: log.Println("[main] 正在优雅关闭服务…") + httpServer.Close() // 关闭 HTTP 管理界面 close(database.WriteCh) // 停止异步写入 worker if err := database.Close(); err != nil { log.Printf("[main] 关闭数据库时出错: %v", err) diff --git a/stats/stats.go b/stats/stats.go new file mode 100644 index 0000000..588c55a --- /dev/null +++ b/stats/stats.go @@ -0,0 +1,165 @@ +package stats + +import ( + "bytes" + "sync" + "sync/atomic" + "time" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" +) + +// --------------------------------------------------------------------------- +// 数据结构 +// --------------------------------------------------------------------------- + +// ClientInfo 当前在线客户端信息 +type ClientInfo struct { + ID string `json:"id"` + RemoteAddr string `json:"remote_addr"` + Username string `json:"username"` + ConnectedAt time.Time `json:"connected_at"` + SubsCount int `json:"subs_count"` +} + +// Stats 当前统计快照 +type Stats struct { + Connections int64 `json:"connections"` // 当前连接数 + MessagesTotal int64 `json:"messages_total"` // 累计消息数(所有主题) + MessagesMsh int64 `json:"messages_msh"` // msh/# 消息数 + Uptime int64 `json:"uptime"` // 服务运行时长(秒) + Clients []ClientInfo `json:"clients"` // 在线客户端列表 + Topics map[string]int64 `json:"topics"` // 各主题消息数 +} + +// --------------------------------------------------------------------------- +// 全局统计(atomic + mutex 无锁热点路径) +// --------------------------------------------------------------------------- + +var ( + connections atomic.Int64 + messagesTotal atomic.Int64 + messagesMsh atomic.Int64 + startTime = time.Now() + clientsMu sync.RWMutex + clients = make(map[string]ClientInfo) // clientID → info + subs = make(map[string][]string) // clientID → []filter + topicsMu sync.RWMutex + topics = make(map[string]int64) // topic → count +) + +// GetStats 返回当前统计快照(只读副本) +func GetStats() Stats { + clientsMu.RLock() + clientList := make([]ClientInfo, 0, len(clients)) + for id, info := range clients { + info.SubsCount = len(subs[id]) + clientList = append(clientList, info) + } + clientsMu.RUnlock() + + topicsMu.RLock() + topicsCopy := make(map[string]int64, len(topics)) + for k, v := range topics { + topicsCopy[k] = v + } + topicsMu.RUnlock() + + return Stats{ + Connections: connections.Load(), + MessagesTotal: messagesTotal.Load(), + MessagesMsh: messagesMsh.Load(), + Uptime: int64(time.Since(startTime).Seconds()), + Clients: clientList, + Topics: topicsCopy, + } +} + +// --------------------------------------------------------------------------- +// Hook 实现 +// --------------------------------------------------------------------------- + +// Hook 收集 MQTT 运行统计 +type Hook struct { + mqtt.HookBase +} + +func (h *Hook) ID() string { return "meshgo-stats" } + +func (h *Hook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mqtt.OnSessionEstablished, + mqtt.OnDisconnect, + mqtt.OnPublish, + mqtt.OnSubscribe, + mqtt.OnUnsubscribe, + }, []byte{b}) +} + +// OnSessionEstablished 客户端连接成功 +func (h *Hook) OnSessionEstablished(cl *mqtt.Client, pk packets.Packet) { + username := string(pk.Connect.Username) + if username == "" { + username = "(anonymous)" + } + clientsMu.Lock() + clients[cl.ID] = ClientInfo{ + ID: cl.ID, + RemoteAddr: cl.Net.Remote, + Username: username, + ConnectedAt: time.Now(), + } + subs[cl.ID] = []string{} + clientsMu.Unlock() + connections.Add(1) +} + +// OnDisconnect 客户端断开 +func (h *Hook) OnDisconnect(cl *mqtt.Client, err error, expire bool) { + clientsMu.Lock() + delete(clients, cl.ID) + delete(subs, cl.ID) + clientsMu.Unlock() + connections.Add(-1) +} + +// OnPublish 收到发布消息 +func (h *Hook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { + messagesTotal.Add(1) + if len(pk.TopicName) >= 4 && pk.TopicName[:4] == "msh/" { + messagesMsh.Add(1) + } + topicsMu.Lock() + topics[pk.TopicName]++ + topicsMu.Unlock() + return pk, nil +} + +// OnSubscribe 客户端订阅 +func (h *Hook) OnSubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { + clientsMu.Lock() + for _, f := range pk.Filters { + subs[cl.ID] = append(subs[cl.ID], f.Filter) + } + clientsMu.Unlock() + return pk +} + +// OnUnsubscribe 客户端取消订阅 +func (h *Hook) OnUnsubscribe(cl *mqtt.Client, pk packets.Packet) packets.Packet { + clientsMu.Lock() + for _, f := range pk.Filters { + filters := subs[cl.ID] + for i, ff := range filters { + if ff == f.Filter { + subs[cl.ID] = append(filters[:i], filters[i+1:]...) + break + } + } + } + clientsMu.Unlock() + return pk +} + +var _ mqtt.Hook = (*Hook)(nil)