mqtt

package module
v0.0.0-...-b25f10e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2025 License: MIT Imports: 24 Imported by: 0

README

MQTT Server Implementation

项目概述

这是一个完整的MQTT服务器实现,支持MQTT v3.1.1和v5.0协议。

协议支持

MQTT v3.1.1 (OASIS Standard, 29 October 2014)
  • 完整的14种控制报文支持
  • 三种QoS等级:0(最多一次)、1(至少一次)、2(恰好一次)
  • 遗嘱消息、用户名密码认证
  • 会话持久化
MQTT v5.0 (OASIS Standard, 7 March 2019)
  • 在v3.1.1基础上增加了属性系统
  • 新增AUTH报文用于扩展认证
  • 支持会话过期间隔、接收最大值、最大报文长度等高级特性
  • 主题别名、订阅标识符等优化功能

代码注释规范

本项目已按照MQTT协议规范为所有代码添加了详细注释,包含:

1. 协议章节索引
  • 每个字段都标注了对应的协议章节号
  • 便于开发者快速查找协议原文
  • 例如:参考章节: 3.1.2.1 Protocol Name
2. 版本差异说明
  • 明确标注v3.1.1和v5.0的区别
  • 突出新增功能和行为变化
  • 帮助开发者理解协议演进
3. 字段详细说明
  • 位置:在报文中的具体位置
  • 类型:数据类型和编码方式
  • 含义:字段的具体作用
  • 约束:协议规定的限制条件
4. 行为差异说明
  • 不同版本协议的行为差异
  • 错误处理方式的变化
  • 兼容性注意事项

已注释的核心文件

packet/packet.go
  • MQTT控制报文通用接口
  • 报文类型分发逻辑
  • 版本差异说明
packet/0x0.fixed_header.go
  • 固定报头结构
  • 标志位验证规则
  • 剩余长度编码
packet/0x1.connect.go
  • CONNECT报文完整实现
  • 连接标志位解析
  • v5.0属性系统支持

使用示例

// 创建MQTT v5.0客户端连接
connect := &packet.CONNECT{
    FixedHeader: &packet.FixedHeader{Version: packet.VERSION500},
    ClientID: "test-client",
    KeepAlive: 60,
    Props: &packet.ConnectProperties{
        SessionExpiryInterval: 3600, // 1小时会话过期
        ReceiveMaximum: 100,         // 最大接收100条消息
    },
}

// 序列化报文
var buf bytes.Buffer
err := connect.Pack(&buf)

PUBACK报文结构:

graph TD
    A[PUBACK报文] --> B[固定报头 Fixed Header]
    A --> C[可变报头 Variable Header]
    A --> D[载荷 Payload]
    
    B --> B1[报文类型: 0x04]
    B --> B2[标志位: 必须为0]
    B --> B3[剩余长度]
    
    C --> C1[报文标识符 Packet ID]
    C --> C2[原因码 Reason Code - v5.0]
    C --> C3[属性 Properties - v5.0]
    
    D --> D1[无载荷]
    
    C2 --> C2a[0x00: 成功]
    C2 --> C2b[0x10: 无匹配订阅者]
    C2 --> C2c[0x80: 未指定错误]
    C2 --> C2d[0x83: 实现特定错误]
    
    C3 --> C3a[原因字符串]
    C3 --> C3b[用户属性]

开发指南

添加新功能
  1. 参考MQTT协议文档确定功能规范
  2. 在代码中添加详细的协议章节注释
  3. 说明v3.1.1和v5.0的差异
  4. 添加字段位置、类型、含义等说明
协议兼容性
  • 优先保证v3.1.1兼容性
  • v5.0功能作为可选扩展
  • 明确标注版本差异

协议文档参考

贡献指南

欢迎提交Issue和Pull Request。在贡献代码时,请:

  1. 遵循现有的注释规范
  2. 添加完整的协议章节索引
  3. 说明版本差异
  4. 包含字段的详细说明

许可证

本项目采用MIT许可证,详见LICENSE文件。

Documentation

Index

Constants

View Source
const (
	RESERVED    byte = 0x0
	CONNECT     byte = 0x1
	CONNACK     byte = 0x2
	PUBLISH     byte = 0x3
	PUBACK      byte = 0x4
	PUBREC      byte = 0x5
	PUBREL      byte = 0x6
	PUBCOMP     byte = 0x7
	SUBSCRIBE   byte = 0x8
	SUBACK      byte = 0x9
	UNSUBSCRIBE byte = 0xA
	UNSUBACK    byte = 0xB
	PINGREQ     byte = 0xC
	PINGRESP    byte = 0xD
	DISCONNECT  byte = 0xE
	AUTH        byte = 0xF
)

Control packet types. Position: byte 1, bits 7-4

Variables

View Source
var CONFIG = &config{
	Auth: map[string]string{
		"":     "",
		"root": "admin",
	},
}
View Source
var ErrAbortHandler = errors.New("mqtt: abort Handler")

ErrAbortHandler is a sentinel panic value to abort a handler. While any panic from ServeHTTP aborts the response to the client, panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log.

View Source
var ErrServerClosed = errors.New("mqtt: Server closed")

ErrServerClosed is returned by the Server.Serve, [ServeTLS], [ListenAndServe], and [ListenAndServeTLS] methods after a call to Server.Shutdown or [Server.Close].

Functions

func Fedstart

func Fedstart(ctx context.Context, listen string, join string) error

func IN

func IN(x string, m ...string) bool

func ServerLog

func ServerLog(ctx context.Context, stat *requests.Stat)

func Web

func Web(ctx context.Context) error

Types

type BridgeNode

type BridgeNode struct {
	Name     string `json:"name"`     // 远程节点名称
	Address  string `json:"address"`  // 远程节点地址 (如: localhost:1884)
	Username string `json:"username"` // 用户名
	Password string `json:"password"` // 密码
}

type Client

type Client struct {
	// URL specifies either the URI being requested (for server requests) or the URL to access (for client requests).
	//
	// For server requests, the URL is parsed from the URI supplied on the Request-Line as stored in RequestURI.
	// For most requests, fields other than Path and RawQuery will be empty. (See RFC 7230, Section 5.3)
	//
	// For client requests, the URL's Host specifies the server to
	// connect to, while the Request's Host field optionally
	// specifies the Host header value to send in the MQTT request.
	URL *url.URL

	// DialContext specifies the dial function for creating unencrypted TCP connections.
	// If DialContext is nil (and the deprecated Dial below is also nil), then the transport dials using package net.
	//
	// DialContext runs concurrently with calls to RoundTrip.
	// A RoundTrip call that initiates a dial may end up using
	// a connection dialed previously when the earlier connection
	// becomes idle before the later DialContext completes.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// DialTLSContext specifies an optional dial function for creating TLS connections for non-proxied HTTPS requests.
	//
	// If DialTLSContext is nil (and the deprecated DialTLS below is also nil), DialContext and TLSClientConfig are used.
	//
	// If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS
	// requests and the TLSClientConfig and TLSHandshakeTimeout are ignored.
	// The returned net.Conn is assumed to already be past the TLS handshake.
	DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	// If non-nil, HTTP/2 support may not be enabled by default.
	TLSClientConfig *tls.Config

	// TLSHandshakeTimeout specifies the maximum amount of time to wait for a TLS handshake. Zero means no timeout.
	TLSHandshakeTimeout time.Duration

	// Timeout specifies a time limit for requests made by this Client.
	// The timeout includes connection time, any redirects, and reading the response body.
	// The timer remains running after Get, Head, Post, or Do return and will interrupt reading of the Response.Body.
	//
	// A Timeout of zero means no timeout.
	//
	// The Client cancels requests to the underlying Transport as if the Request's Context ended.
	//
	// For compatibility, the Client will also use the deprecated CancelRequest method on Transport if found.
	// New RoundTripper implementations should use the Request's Context
	// for cancellation instead of implementing CancelRequest.
	Timeout time.Duration
	// contains filtered or unexported fields
}

A Client is an MQTT client. Its zero value ([DefaultClient]) is a usable client that uses [DefaultTransport].

The [Client.Transport] typically has internal state (cached TCP connections), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.

A Client is higher-level than a [RoundTripper] (such as [Transport]) and additionally handles HTTP details such as cookies and redirects.

func New

func New(opts ...Option) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

func (*Client) ConnectAndSubscribe

func (c *Client) ConnectAndSubscribe(ctx context.Context) error

func (*Client) Disconnect

func (c *Client) Disconnect() error

func (*Client) ID

func (c *Client) ID() string

func (*Client) OnMessage

func (c *Client) OnMessage(fn func(*packet.Message))

func (*Client) RoundTrip

func (c *Client) RoundTrip(req packet.Packet) (packet.Packet, error)

RoundTrip implements the [RoundTripper] interface.

For higher-level HTTP client support (such as handling of cookies and redirects), see [Get], [Post], and the Client type.

Like the RoundTripper interface, the error types returned by RoundTrip are unspecified.

func (*Client) ServeMessage

func (c *Client) ServeMessage(ctx context.Context) error

func (*Client) ServeMessageLoop

func (c *Client) ServeMessageLoop(ctx context.Context) error

func (*Client) SubmitMessage

func (c *Client) SubmitMessage(message *packet.Message) error

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context) error

type ConnState

type ConnState int

A ConnState represents the state of a client connection to a server. It's used by the optional [Server.ConnState] hook.

const (
	// StateNew represents a new connection that is expected to
	// send a request immediately. Connections begin at this
	// state and then transition to either StateActive or
	// StateClosed.
	StateNew ConnState = iota

	// StateActive represents a connection that has read 1 or more
	// bytes of a request. The Server.ConnState hook for
	// StateActive fires before the request has entered a handler
	// and doesn't fire again until the request has been
	// handled. After the request is handled, the state
	// transitions to StateClosed, StateHijacked, or StateIdle.
	// For HTTP/2, StateActive fires on the transition from zero
	// to one active request, and only transitions away once all
	// active requests are complete. That means that ConnState
	// cannot be used to do per-request work; ConnState only notes
	// the overall state of the connection.
	StateActive

	// StateIdle represents a connection that has finished
	// handling a request and is in the keep-alive state, waiting
	// for a new request. Connections transition from StateIdle
	// to either StateActive or StateClosed.
	StateIdle

	// StateHijacked represents a hijacked connection.
	// This is a terminal state. It does not transition to StateClosed.
	StateHijacked

	// StateClosed represents a closed connection.
	// This is a terminal state. Hijacked connections do not
	// transition to StateClosed.
	StateClosed
)

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

Endpoint 不提供事务能力,事务需要其他的上层协议来处理

func (*Endpoint) List

func (e *Endpoint) List() map[string]string

func (*Endpoint) Ping

func (e *Endpoint) Ping()

Ping 这里有个问题,如果一个节点退出后,立即加入集群,会存在之前的节点没有清理的问题 这里的目前解决办法就是,节点sleep5秒之后再重新进入集群

func (*Endpoint) Send

func (e *Endpoint) Send(content []byte) error

type FilterSubscription

type FilterSubscription struct {
	Filter string // 订阅filter,如 "sensor/+"
	// contains filtered or unexported fields
}

FilterSubscription 单个filter的订阅信息

type Handler

type Handler interface {
	ServeMQTT(ResponseWriter, packet.Packet)
}

A Handler responds to an MQTT request.

type HandlerFunc

type HandlerFunc func(ResponseWriter, packet.Packet)

func (HandlerFunc) ServeMQTT

func (f HandlerFunc) ServeMQTT(rw ResponseWriter, r packet.Packet)

type InFight

type InFight struct {
	// contains filtered or unexported fields
}

func (*InFight) Get

func (i *InFight) Get(id uint16) (*packet.PUBLISH, bool)

func (*InFight) Put

func (i *InFight) Put(pkt *packet.PUBLISH) bool

type Listen

type Listen struct {
	URL      string `yaml:"url"`
	CertFile string `yaml:"certFile"`
	KeyFile  string `yaml:"keyFile"`
}

type MemorySubscribed

type MemorySubscribed struct {
	// contains filtered or unexported fields
}

MemorySubscribed 高性能订阅管理器

func NewMemorySubscribed

func NewMemorySubscribed(s *Server, cfg SubscribedConfig) *MemorySubscribed

NewMemorySubscribed 创建订阅管理器

func (*MemorySubscribed) GetStats

func (m *MemorySubscribed) GetStats() map[string]float64

GetStats 获取统计信息(供外部查询) 注意:Prometheus 指标通过 /metrics 端点暴露,这里返回空map

func (*MemorySubscribed) Print

func (m *MemorySubscribed) Print()

Print 打印当前订阅信息(调试用)

func (*MemorySubscribed) Publish

func (m *MemorySubscribed) Publish(message *packet.Message, props *packet.PublishProperties, sourceConn *conn) error

Publish 发布消息(核心优化方法)

func (*MemorySubscribed) Subscribe

func (m *MemorySubscribed) Subscribe(c *conn)

Subscribe 客户端订阅(优化版)

func (*MemorySubscribed) Unsubscribe

func (m *MemorySubscribed) Unsubscribe(c *conn)

Unsubscribe 客户端断开连接时取消所有订阅

func (*MemorySubscribed) UnsubscribeTopics

func (m *MemorySubscribed) UnsubscribeTopics(c *conn, topics []string)

UnsubscribeTopics 取消订阅指定的topics(处理UNSUBSCRIBE报文时使用)

type Option

type Option func(*Options)

func ClientID

func ClientID(clientID string) Option

ClientID 设置客户端ID

func Subscription

func Subscription(subscription ...packet.Subscription) Option

func URL

func URL(url string) Option

func Version

func Version(version string) Option

type Options

type Options struct {
	URL           string // client used
	ClientID      string
	Version       byte
	Subscriptions []packet.Subscription
}

type ResponseWriter

type ResponseWriter interface {
	OnSend(request packet.Packet) error
}

type Server

type Server struct {
	Handler          Handler
	WebsocketHandler websocket.Handler

	// TLSConfig optionally provides a TLS configuration for use
	// by ServeTLS and ListenAndServeTLS. Note that this value is
	// cloned by ServeTLS and ListenAndServeTLS, so it's not
	// possible to modify the configuration with methods like
	// tls.Config.SetSessionTicketKeys. To use
	// SetSessionTicketKeys, use Server.Serve with a TLS Listener
	// instead.
	TLSConfig *tls.Config

	// ConnState specifies an optional callback function that is
	// called when a client connection changes state. See the
	// ConnState type and associated constants for details.
	ConnState func(net.Conn, ConnState)

	// ConnContext optionally specifies a function that modifies
	// the context used for a new connection c. The provided ctx
	// is derived from the base context and has a ServerContextKey
	// value.
	ConnContext func(ctx context.Context, c net.Conn) context.Context

	Federated map[string]*Client
	// contains filtered or unexported fields
}

A Server defines parameters for running an HTTP server. The zero value for Server is a valid configuration.

func NewServer

func NewServer(ctx context.Context) *Server

func (*Server) InitServer

func (s *Server) InitServer(ctx context.Context) error

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(opts ...Option) error

func (*Server) ListenAndServeTLS

func (s *Server) ListenAndServeTLS(certFile, keyFile string, opts ...Option) error

func (*Server) ListenAndServeWebsocket

func (s *Server) ListenAndServeWebsocket(opts ...Option) error

ListenAndServeWebsocket 启动基于 WebSocket 的 MQTT 服务 (ws)

func (*Server) ListenAndServeWebsocketTLS

func (s *Server) ListenAndServeWebsocketTLS(certFile, keyFile string, opts ...Option) error

ListenAndServeWebsocketTLS 启动基于 TLS 的 WebSocket 服务 (wss)

func (*Server) Serve

func (s *Server) Serve(l net.Listener) error

Serve accepts incoming connections on the Listener l, creating a new service goroutine for each. The service goroutines read requests and then call srv.Handler to reply to them.

HTTP/2 support is only enabled if the Listener returns *tls.Conn connections. and they were configured with "h2" in the TLS Config.NextProtos.

Serve always returns a non-nil error and closes l. After Server.Shutdown or [Server.Close], the returned error is ErrServerClosed.

func (*Server) ServeTLS

func (s *Server) ServeTLS(l net.Listener, certFile, keyFile string) error

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

type Stat

type Stat struct {
	Uptime            prometheus.Counter
	ActiveConnections prometheus.Gauge
	PacketReceived    prometheus.Counter
	ByteReceived      prometheus.Counter
	PacketSent        prometheus.Counter
	ByteSent          prometheus.Counter
}

func (*Stat) RefreshUptime

func (s *Stat) RefreshUptime()

func (*Stat) Register

func (s *Stat) Register()

type SubscribedConfig

type SubscribedConfig struct {
	// 是否启用缓存(默认true)
	EnableCache bool

	// 缓存大小限制(默认10000)
	MaxCacheSize int

	// 统计信息打印间隔(默认1分钟)
	StatsInterval time.Duration

	// 是否启用性能监控(默认true)
	EnableMonitoring bool
}

SubscribedConfig 配置选项

type SubscriptionManager

type SubscriptionManager interface {
	// Subscribe 客户端订阅
	Subscribe(c *conn)

	// Unsubscribe 客户端断开连接时取消所有订阅
	Unsubscribe(c *conn)

	// UnsubscribeTopics 取消订阅指定的topics(处理UNSUBSCRIBE报文时使用)
	UnsubscribeTopics(c *conn, topics []string)

	// Publish 发布消息
	// sourceConn: 消息来源的连接,nil 表示服务端自己发布的消息
	Publish(message *packet.Message, props *packet.PublishProperties, sourceConn *conn) error

	// Print 打印订阅信息(调试用)
	Print()
}

SubscriptionManager 订阅管理器接口

Directories

Path Synopsis
cmd
benchmark command
http-beanch command
mqtt-client command
mqtt-server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL