MQTT 消息驱动 (MQTT Message Driver)
约 675 字大约 2 分钟
2026-05-20
概述 (Overview)
MQTT 消息驱动用于轻量级工业 IoT 消息传输及事件驱动的发布/订阅模式。驱动底层依赖 MQTTnet 库实现,支持 MQTT 3.1/5.0 协议,可选 SSL/TLS 安全连接。
重要:MQTT 属于 Push(推送)型协议,订阅直接映射为底层 Broker 的 Topic 订阅,不经过 ScanScheduler 主动轮询调度,而是由网络事件触发回调。因此 ReadAsync 不受支持(返回 405 Not Supported)。
驱动已通过挂载 IMqttClient.DisconnectedAsync 事件实现断线即时感知,状态流转到 DriverState.Faulted 后由 Watchdog 自动重连。
使用方法 (Usage)
通过 IDriverRegistry 工厂方法创建(推荐,DI 自动装配依赖):
// 通过别名工厂创建(DI 自动装配 ILogger, IUniconCacheProvider, INetworkMonitor)
var driver = _driverRegistry.CreateDriver("Mqtt", "Cloud_Broker");
_driverRegistry.Register(driver);
await _connectionManager.RegisterDriverAsync(driver, "server=192.168.1.200;port=1883;clientid=gateway_01");参数说明 (Parameters)
连接字符串参数
| 参数名 | 类型 | 说明 | 是否必填 | 默认值 |
|---|---|---|---|---|
| server / host | string | MQTT Broker 服务的 IP 或主机名 | 是 | 无 |
| port | int | MQTT Broker 端口号 | 否 | 1883 |
| clientid | string | 客户端唯一连接 ID | 否 | 随机 GUID |
| username / user | string | 账户授权登录用户名 | 否 | 空 |
| password / pwd | string | 账户授权登录密码 | 否 | 空 |
| cleansession | bool | 是否清除会话(MQTT 3.1.1) | 否 | true |
| keepalive | int | 心跳维持间隔时间(秒) | 否 | 15 |
| usetls / tls | bool | 是否启用 SSL/TLS 安全连接 | 否 | false |
订阅地址 (Address / Topic)
地址即为 MQTT Topic,支持 MQTT 标准通配符:
+:单层通配符,如devices/+/telemetry#:多层通配符,如factory/#
发布地址 (WriteAsync address)
WriteAsync 的 request.Address 即为发布目标 Topic,value 为消息字符串内容。
返回值 (Returns)
| 方法 | 返回值 | 说明 |
|---|---|---|
ReadAsync | UniconResponse<T> | 固定报错 405,MQTT 不支持主动单次拉取读取 |
WriteAsync<T> | UniconResponse<bool> | 成功发布消息至 Broker 则返回 true |
SubscribeAsync | Task<string> | 返回 Topic 本身作为订阅 ID |
UnsubscribeAsync | Task | 向 Broker 取消 Topic 订阅 |
使用示例 (Examples)
示例 1:订阅特定设备遥测主题(支持通配符)
var driver = _driverRegistry.CreateDriver("Mqtt", "Cloud_Broker");
_driverRegistry.Register(driver);
await _connectionManager.RegisterDriverAsync(driver, "server=192.168.1.200;port=1883;clientid=gateway_01");
// 直接传入 Func 作为回调(支持 MQTT 通配符)
await driver.SubscribeAsync("devices/+/telemetry", async dataValue =>
{
Console.WriteLine($"Topic 消息: {dataValue.Value}");
Console.WriteLine($"到达时间: {dataValue.ServerTimestamp}");
await Task.CompletedTask;
});示例 2:向云端发布消息
var req = new UniconRequest { Address = "gateway/status" };
var res = await driver.WriteAsync<string>(req, "{\"status\": \"online\", \"driverId\": \"Line1_PLC\"}");
Console.WriteLine(res.Success ? "消息发布成功" : $"发布失败: {res.Message}");示例 3:使用 SSL/TLS 安全连接
var driver = _driverRegistry.CreateDriver("Mqtt", "SecureBroker");
_driverRegistry.Register(driver);
await _connectionManager.RegisterDriverAsync(
driver,
"server=broker.example.com;port=8883;username=myuser;password=mypass;usetls=true;clientid=secure_gw_01"
);示例 4:订阅并取消订阅
// 订阅
string subId = await driver.SubscribeAsync("factory/line1/#", async dv =>
{
Console.WriteLine($"收到: {dv.Value}");
await Task.CompletedTask;
});
// 按 Topic 取消订阅
await driver.UnsubscribeAsync("factory/line1/#");