统一主动轮询订阅引擎 (Active Polling Subscription Engine v2)
约 1040 字大约 3 分钟
2026-05-19
概述 (Overview)
对于 S7、Modbus 等不具备主动上报能力的南向协议,UniCon v2 在 UniCon.Core 层实现了完整重构的主动采集订阅引擎。
核心架构改进(v2 vs v1)
| 能力 | v1 旧版 | v2 当前版 |
|---|---|---|
| 调度方式 | while(true)+Delay(50) 全量扫描 | 最小等待时间调度,仅调度到期 ScanGroup |
| 地址去重 | 运行时 GroupBy(address) | 注册阶段 ScanGroupRegistry 预分组 |
| Callback 执行 | 扫描线程同步调用 | Channel<T> 投递,独立线程异步消费 |
| 缓存比对 | 在 Subscription 层 | 在 TagEntry(Tag 层) |
| 扫描模式扩展 | if/else 硬编码 | IScanStrategy 策略模式 |
| CacheProvider | static 全局赋值 | 构造函数 DI 注入 |
| Callback 签名 | Action<DataValue<object>> | Func<DataValue<object>, Task>(异步) |
使用方法 (Usage)
方法 1:极简快捷订阅
// 使用驱动默认扫描周期与 ExceptionBased 模式
var subId = await driver.SubscribeAsync("DB1.DBD0", async dataValue =>
{
Console.WriteLine($"变更通知: {dataValue.Value} [{dataValue.Quality}]");
await Task.CompletedTask;
});
// 取消该地址下的所有订阅
await driver.UnsubscribeAsync("DB1.DBD0");方法 2:结构化精准订阅(推荐)
var subId = await driver.SubscribeAsync(new UniconSubscription
{
Address = "DB1.DBD0",
ScanRateMs = 500,
ScanMode = UniconScanMode.ExceptionBased,
// 可选:Tag 元数据(死区过滤、单位描述等)
Metadata = new TagMetadata
{
Name = "反应釜温度",
Unit = "℃",
Deadband = 0.5, // 变化 < 0.5℃ 时不触发通知
DataType = UniconDataType.Float
},
// 异步 Func callback,在独立线程中被 NotificationDispatcher await
Callback = async dataValue =>
{
Console.WriteLine($"[{dataValue.ServerTimestamp:HH:mm:ss.fff}] 温度: {dataValue.Value} ℃");
await _myService.SaveAsync(dataValue);
},
// Backpressure 控制
MaxQueueLength = 256,
OverflowPolicy = OverflowPolicy.DropOldest
});
// 按 ID 精准取消
await driver.UnsubscribeByIdAsync(subId);查询活动订阅
IEnumerable<UniconSubscription> activeSubs = driver.GetSubscriptions();查询扫描统计
var stats = driver.GetStatistics(500, UniconScanMode.ExceptionBased);
if (stats != null)
{
Console.WriteLine($"扫描次数: {stats.ScanCount}");
Console.WriteLine($"通知次数: {stats.NotifyCount}");
Console.WriteLine($"平均读取: {stats.AverageReadDurationMs:F1}ms");
Console.WriteLine($"错误率: {stats.ErrorRate:P1}");
}监听驱动状态变更
driver.StateChanged += (_, e) =>
{
Console.WriteLine($"[{e.DriverId}] {e.OldState} → {e.NewState}");
};参数说明 (Parameters)
UniconSubscription 属性
| 属性名 | 类型 | 说明 | 必填 | 默认值 |
|---|---|---|---|---|
Id | string | 唯一订阅 ID,自动生成 Guid | 否 | 随机 Guid |
Address | string | 寄存器地址,如 "DB1.DBD0" | 是 | 无 |
ScanRateMs | int | 轮询周期(毫秒) | 否 | 1000 |
ScanMode | UniconScanMode | ExceptionBased / Polled | 否 | ExceptionBased |
Callback | Func<DataValue<object>, Task> | 异步通知回调 | 是 | 无 |
Metadata | TagMetadata? | Tag 元数据(死区/单位/类型) | 否 | null |
MaxQueueLength | int | 通知队列最大容量(Backpressure 控制) | 否 | 128 |
OverflowPolicy | OverflowPolicy | DropOldest / DropNewest | 否 | DropOldest |
TagMetadata 属性
| 属性名 | 类型 | 说明 |
|---|---|---|
Deadband | double | 数值变化死区(工程单位),ExceptionBased 时生效;0 禁用 |
Unit | string | 工程单位,如 "℃"、"bar" |
DataType | UniconDataType | 期望的数据类型 |
ScalingFactor / ScalingOffset | double | 线性缩放:物理值 = Raw × Factor + Offset |
AccessMode | TagAccessMode | ReadOnly / WriteOnly / ReadWrite |
DataValue<T> 字段
| 字段 | 类型 | 说明 |
|---|---|---|
Value | T? | 实际数据值 |
Status | DataStatus | Good / Bad / Uncertain / Timeout |
Quality | QualityCode | OPC UA 兼容质量码(16 位) |
SourceTimestamp | DateTime | 数据在设备端产生的时间(Driver 填充) |
ServerTimestamp | DateTime | 数据到达网关的时间(ScanScheduler 填充) |
返回值 (Returns)
| 方法 | 返回值 | 说明 |
|---|---|---|
SubscribeAsync(UniconSubscription) | Task<string> | 成功后返回订阅 ID |
UnsubscribeByIdAsync(string) | Task | 按 ID 精准注销 |
GetStatistics(rateMs, mode) | ScanStatistics? | 该 ScanGroup 的统计对象,无活跃 Tag 时返回 null |
使用示例 (Examples)
示例 1:S7 点位变化订阅(含死区过滤)
var s7Driver = driverRegistry.Get("S7_PLC_01");
await s7Driver.SubscribeAsync(new UniconSubscription
{
Address = "DB1.DBD0",
ScanRateMs = 500,
Metadata = new TagMetadata { Unit = "℃", Deadband = 0.3 },
Callback = async dv =>
{
Console.WriteLine($"温度: {dv.Value} ℃ @ {dv.ServerTimestamp:HH:mm:ss.fff}");
await Task.CompletedTask;
}
});示例 2:自定义 Redis 缓存提供者
// 实现 IUniconCacheProvider
public class RedisCacheProvider : IUniconCacheProvider
{
private readonly IDatabase _db;
public RedisCacheProvider(IConnectionMultiplexer redis)
=> _db = redis.GetDatabase();
public async Task<DataValue<object>?> GetAsync(string driverId, string address, CancellationToken ct = default)
{
var json = await _db.StringGetAsync($"unicon:{driverId}:{address}");
return json.HasValue ? JsonSerializer.Deserialize<DataValue<object>>(json!) : null;
}
public async Task SetAsync(string driverId, string address, DataValue<object> dataValue, CancellationToken ct = default)
{
var json = JsonSerializer.Serialize(dataValue);
await _db.StringSetAsync($"unicon:{driverId}:{address}", json);
}
public async Task RemoveAsync(string driverId, string address, CancellationToken ct = default)
=> await _db.KeyDeleteAsync($"unicon:{driverId}:{address}");
}
// 在 DI 注册(替换内置 MemoryCacheProvider)
builder.Services.AddSingleton<IUniconCacheProvider, RedisCacheProvider>();示例 3:多地址共享同一 ScanGroup(零重复 IO)
// 相同 ScanRate + ScanMode 的订阅自动共享一个 ScanGroup
// 两个地址只发生一次 ReadBatch IO,再分别通知各自订阅者
await driver.SubscribeAsync(new UniconSubscription { Address = "DB1.DBD0", ScanRateMs = 100, Callback = ... });
await driver.SubscribeAsync(new UniconSubscription { Address = "DB1.DBD4", ScanRateMs = 100, Callback = ... });
await driver.SubscribeAsync(new UniconSubscription { Address = "DB1.DBD8", ScanRateMs = 100, Callback = ... });
// → 同一 100ms ScanGroup,一次 ReadBatch 读取三个地址