package main import ( "bytes" "log" "meshgo/config" "meshgo/database" "os" "os/signal" "syscall" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/listeners" "github.com/mochi-mqtt/server/v2/packets" ) func main() { // ---------------------------------------------------------------- // 1. 确保运行时目录存在 // ---------------------------------------------------------------- ensureDir(dataDir) ensureDir(configDir) // ---------------------------------------------------------------- // 2. 初始化配置(如无配置文件则写入默认值) // ---------------------------------------------------------------- if err := config.EnsureConfigComplete(configFile); err != nil { log.Fatalf("[main] 无法写入默认配置文件: %v", err) } if err := config.Load(configFile); err != nil { log.Fatalf("[main] 无法加载配置文件: %v", err) } // ---------------------------------------------------------------- // 3. 初始化数据库(在启动服务前完成,避免 hook 调用时未初始化) // ---------------------------------------------------------------- if err := database.Init(&config.Get().Database, dataDir); err != nil { log.Fatalf("[main] 数据库初始化失败: %v", err) } database.StartWriter() // 启动异步写入 worker // ---------------------------------------------------------------- // 4. 构建 MQTT 服务器(所有初始化在 Serve 之前完成) // ---------------------------------------------------------------- cfg := config.Get() server := mqtt.New(&mqtt.Options{ InlineClient: false, }) // 注册认证 hook if err := server.AddHook(new(meshAuthHook), nil); err != nil { log.Fatalf("[main] 注册认证 hook 失败: %v", err) } // 注册日志 hook if err := server.AddHook(new(meshLogHook), nil); err != nil { log.Fatalf("[main] 注册日志 hook 失败: %v", err) } // 注册 payload 数据库日志 hook if err := server.AddHook(new(meshDBHook), nil); err != nil { log.Fatalf("[main] 注册 payload 数据库 hook 失败: %v", err) } // 添加 TCP 监听 tcpListener := listeners.NewTCP( listeners.Config{ID: "tcp1", Address: cfg.Server.TCPAddr}, ) if err := server.AddListener(tcpListener); err != nil { log.Fatalf("[main] 添加 TCP 监听失败 (%s): %v", cfg.Server.TCPAddr, err) } log.Printf("[main] TCP 监听已绑定: %s", cfg.Server.TCPAddr) // 可选:添加 WebSocket 监听 if cfg.Server.WSAddr != "" { wsListener := listeners.NewWebsocket( listeners.Config{ID: "ws1", Address: cfg.Server.WSAddr}, ) if err := server.AddListener(wsListener); err != nil { log.Fatalf("[main] 添加 WebSocket 监听失败 (%s): %v", cfg.Server.WSAddr, err) } log.Printf("[main] WebSocket 监听已绑定: %s", cfg.Server.WSAddr) } // ---------------------------------------------------------------- // 5. 启动服务器(阻塞,直到收到退出信号) // ---------------------------------------------------------------- log.Println("[main] meshgo MQTT 服务已启动") if err := server.Serve(); err != nil { log.Fatalf("[main] MQTT 服务器启动失败: %v", err) } // ---------------------------------------------------------------- // 5. 信号处理 // SIGHUP → 热重载配置 // SIGINT / SIGTERM → 优雅退出 // ---------------------------------------------------------------- sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) for sig := range sigCh { switch sig { case syscall.SIGHUP: log.Println("[main] 收到 SIGHUP,热重载配置…") config.Reload(configFile) // 注:监听地址变更需要重启服务,此处仅刷新认证/日志等运行时配置 case syscall.SIGINT, syscall.SIGTERM: log.Println("[main] 正在优雅关闭服务…") close(database.WriteCh) // 停止异步写入 worker if err := database.Close(); err != nil { log.Printf("[main] 关闭数据库时出错: %v", err) } if err := server.Close(); err != nil { log.Printf("[main] 关闭服务器时出错: %v", err) } log.Println("[main] meshgo 已停止") return } } } // ensureDir 确保目录存在,不存在则创建(含父目录) func ensureDir(path string) { if _, err := os.Stat(path); os.IsNotExist(err) { if err = os.MkdirAll(path, 0755); err != nil { log.Fatalf("[main] 无法创建目录 %s: %v", path, err) } log.Printf("[main] 已创建目录: %s", path) } } // ---------------------------------------------------------------- // 认证 Hook // ---------------------------------------------------------------- // meshAuthHook 实现 mochi-mqtt Hook 接口,基于配置文件进行认证 type meshAuthHook struct { mqtt.HookBase } func (h *meshAuthHook) ID() string { return "meshgo-auth" } func (h *meshAuthHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnConnectAuthenticate, mqtt.OnACLCheck, }, []byte{b}) } // OnConnectAuthenticate 验证客户端连接凭证 func (h *meshAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { cfg := config.Get().Auth // 未开启认证,全部放行 if !cfg.Enabled { return true } username := string(pk.Connect.Username) password := string(pk.Connect.Password) // 匿名连接处理 if username == "" { return cfg.AllowAnonymous } // 逐条比对用户列表 for _, u := range cfg.Users { if u.Username == username && u.Password == password { return true } } return false } // OnACLCheck 主题级别 ACL 检查(默认全放行,可按需扩展) func (h *meshAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { return true } // 确保 meshAuthHook 实现了 mqtt.Hook 接口(编译期检查) var _ mqtt.Hook = (*meshAuthHook)(nil)