From 6ef2a0e5d8e1975bf90f39978f20a72ba72648cf Mon Sep 17 00:00:00 2001 From: zhouhao Date: Thu, 26 Mar 2026 20:35:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E7=89=A9=E6=A8=A1=E5=9E=8B/Modbus):=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20Modbus=20=E6=94=AF=E6=8C=81=EF=BC=8C?= =?UTF-8?q?=E5=8C=85=E6=8B=AC=E5=B8=A7=E7=BC=96=E8=A7=A3=E7=A0=81=E3=80=81?= =?UTF-8?q?=E5=AF=84=E5=AD=98=E5=99=A8=E9=94=AE=E8=A7=A3=E6=9E=90=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E5=86=99=E8=AF=B7=E6=B1=82=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../things/helper/modbus/ModbusFrame.java | 44 ++ .../helper/modbus/ModbusFrameCodec.java | 441 ++++++++++++ .../things/helper/modbus/ModbusLinkType.java | 5 + .../helper/modbus/ModbusPropertyMapping.java | 99 +++ .../helper/modbus/ModbusReadRequest.java | 38 + .../modbus/ModbusReadRequestBuilder.java | 147 ++++ .../modbus/ModbusRegisterDefinition.java | 195 ++++++ .../helper/modbus/ModbusRegisterKey.java | 79 +++ .../helper/modbus/ModbusRegisterSnapshot.java | 33 + .../helper/modbus/ModbusRegisterType.java | 120 ++++ .../helper/modbus/ModbusRegistersDecoder.java | 215 ++++++ .../helper/modbus/ModbusThingsHelper.java | 60 ++ .../helper/modbus/ModbusThingsMapping.java | 663 ++++++++++++++++++ .../helper/modbus/ModbusWriteRequest.java | 43 ++ .../modbus/ModbusWriteRequestBuilder.java | 350 +++++++++ .../helper/modbus/ThingWriteOperation.java | 28 + 16 files changed, 2560 insertions(+) create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrame.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrameCodec.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusLinkType.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusPropertyMapping.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequest.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequestBuilder.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterDefinition.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterKey.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterSnapshot.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterType.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegistersDecoder.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsHelper.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsMapping.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequest.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequestBuilder.java create mode 100644 jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ThingWriteOperation.java diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrame.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrame.java new file mode 100644 index 000000000..648405dbf --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrame.java @@ -0,0 +1,44 @@ +package org.jetlinks.community.things.helper.modbus; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 简单的 Modbus 帧抽象, 用于在 ByteBuf 与寄存器读写请求之间做承载。 + *

+ * 这里仅关注逻辑字段, 不关心底层是 RTU 还是 TCP, 也不处理 CRC 等链路相关细节。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ModbusFrame { + + /** + * 从站ID + */ + private int slaveId; + + /** + * 功能码 + */ + private int functionCode; + + /** + * 起始寄存器地址 + */ + private int address; + + /** + * 寄存器数据区。 + *

+ * 对于读响应, 表示读取到的寄存器原始字节; + * 对于写请求, 表示要写入的寄存器原始字节. + */ + private ByteBuf values; + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrameCodec.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrameCodec.java new file mode 100644 index 000000000..744b21ecb --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusFrameCodec.java @@ -0,0 +1,441 @@ +package org.jetlinks.community.things.helper.modbus; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; +import reactor.core.publisher.Flux; + +import java.util.List; + +/** + * Modbus 帧与 ByteBuf 之间的简单编解码工具。 + *

+ * 支持功能码: + *

+ *

+ *

解码/编码行为由 {@link ModbusLinkType} 决定: + *

+ * 不处理粘包拆包。 + */ +public final class ModbusFrameCodec { + + private ModbusFrameCodec() { + } + + /** + * 等价于 {@link #decode(ByteBuf, ModbusLinkType)} {@link ModbusLinkType#PDU}。 + */ + public static ModbusFrame decode(ByteBuf payload) { + return decode(payload, ModbusLinkType.PDU); + } + + /** + * 从上游报文中解析出单个 Modbus 帧。 + * + * @param payload 原始缓冲区(不修改 readerIndex) + * @param linkType 链路类型 + * @return Modbus 帧, 解析失败时返回 null + */ + public static ModbusFrame decode(ByteBuf payload, ModbusLinkType linkType) { + if (payload == null || !payload.isReadable()) { + return null; + } + ModbusLinkType lt = linkType == null ? ModbusLinkType.PDU : linkType; + ByteBuf pduBuf = switch (lt) { + case TCP -> stripTcpMbap(payload); + case RTU -> stripRtuCrc(payload); + default -> payload; + }; + if (pduBuf == null || !pduBuf.isReadable()) { + return null; + } + return decodePdu(pduBuf); + } + + /** + * 剥离 Modbus TCP MBAP,返回 UnitId + 功能码 + 数据 的视图(不复制负载)。 + */ + static ByteBuf stripTcpMbap(ByteBuf raw) { + if (raw.readableBytes() < 8) { + return null; + } + int proto = raw.getUnsignedShort(raw.readerIndex() + 2); + if (proto != 0) { + return null; + } + int length = raw.getUnsignedShort(raw.readerIndex() + 4); + if (length < 2 || raw.readableBytes() < 6 + length) { + return null; + } + return raw.slice(raw.readerIndex() + 6, length); + } + + /** + * 校验并剥离 RTU CRC,返回 PDU 视图。 + */ + static ByteBuf stripRtuCrc(ByteBuf raw) { + int n = raw.readableBytes(); + if (n < 4) { + return null; + } + int crcRx = raw.getUnsignedShortLE(raw.readerIndex() + n - 2); + int crcCalc = crc16Modbus(raw, raw.readerIndex(), n - 2); + if (crcCalc != crcRx) { + return null; + } + return raw.slice(raw.readerIndex(), n - 2); + } + + /** + * 解析裸 PDU:首字节从站/单元 ID,第二字节功能码。 + */ + private static ModbusFrame decodePdu(ByteBuf payload) { + if (payload == null || !payload.isReadable()) { + return null; + } + // 不修改原始 readerIndex, 复制一份用于解析 + ByteBuf buf = payload.slice(); + if (buf.readableBytes() < 2) { + return null; + } + int slaveId = buf.readUnsignedByte(); + int functionCode = buf.readUnsignedByte(); + + switch (functionCode) { + case 0x01: // Read Coils (response) + case 0x02: { // Read Discrete Inputs (response) + if (!buf.isReadable()) { + return null; + } + int byteCount = buf.readUnsignedByte(); + if (buf.readableBytes() < byteCount) { + return null; + } + ByteBuf values = buf.readSlice(byteCount); + return new ModbusFrame(slaveId, functionCode, 0, values); + } + case 0x03: // Read Holding Registers (response) + case 0x04: { // Read Input Registers (response) + if (!buf.isReadable()) { + return null; + } + int byteCount = buf.readUnsignedByte(); + if (buf.readableBytes() < byteCount) { + return null; + } + // 寄存器数据区, 直接保留为 ByteBuf + ByteBuf values = buf.readSlice(byteCount); + // 响应报文中并未包含起始地址, 这里将 address 设为 0, + // 实际映射时通过 ModbusThingsMapping.properties 中每条映射的寄存器键完成定位。 + return new ModbusFrame(slaveId, functionCode, 0, values); + } + case 0x05: { // Write Single Coil (request/response): address(2) + value(2), 0xFF00=ON 0x0000=OFF + if (buf.readableBytes() < 4) { + return null; + } + int address = buf.readUnsignedShort(); + ByteBuf values = buf.readSlice(2); + return new ModbusFrame(slaveId, functionCode, address, values); + } + case 0x06: { // Write Single Register (request/response) + if (buf.readableBytes() < 4) { + return null; + } + int address = buf.readUnsignedShort(); + // 后续 2 字节为寄存器值 + ByteBuf values = buf.readSlice(2); + return new ModbusFrame(slaveId, functionCode, address, values); + } + case 0x0F: { // Write Multiple Coils (request): address(2) + quantity(2) + byteCount + coil bytes + if (buf.readableBytes() < 5) { + return null; + } + int address = buf.readUnsignedShort(); + int quantity = buf.readUnsignedShort(); + int byteCount = buf.readUnsignedByte(); + if (buf.readableBytes() < byteCount) { + return null; + } + ByteBuf values = buf.readSlice(byteCount); + return new ModbusFrame(slaveId, functionCode, address, values); + } + case 0x10: { // Write Multiple Registers (request) + if (buf.readableBytes() < 5) { + return null; + } + int address = buf.readUnsignedShort(); + int quantity = buf.readUnsignedShort(); + int byteCount = buf.readUnsignedByte(); + if (buf.readableBytes() < byteCount) { + return null; + } + // 写多个寄存器的数据区 + ByteBuf values = buf.readSlice(byteCount); + return new ModbusFrame(slaveId, functionCode, address, values); + } + case 0x16: { // Mask Write Register (request/response echo): address(2) + AND_Mask(2) + OR_Mask(2) + if (buf.readableBytes() < 6) { + return null; + } + int address = buf.readUnsignedShort(); + int andMask = buf.readUnsignedShort(); + int orMask = buf.readUnsignedShort(); + ByteBuf values = Unpooled.buffer(4).writeShort(andMask).writeShort(orMask); + return new ModbusFrame(slaveId, functionCode, address, values); + } + default: + // 不支持的功能码, 交由上层处理 + return null; + } + } + + /** + * 将一批 Modbus 帧编码为字节流(默认 {@link ModbusLinkType#PDU})。 + */ + public static Flux encode(List frames) { + return encode(frames, ModbusLinkType.PDU); + } + + /** + * 将一批 Modbus 帧按链路类型编码。 + */ + public static Flux encode(List frames, ModbusLinkType linkType) { + if (frames == null || frames.isEmpty()) { + return Flux.empty(); + } + ModbusLinkType lt = linkType == null ? ModbusLinkType.PDU : linkType; + return Flux.fromIterable(frames) + .map(f -> encodeFrame(f, lt)); + } + + /** + * 将单个帧编码为 PDU(无 MBAP、无 CRC)。 + */ + public static ByteBuf encodeFrame(ModbusFrame frame) { + return encodeFrame(frame, ModbusLinkType.PDU); + } + + /** + * 按链路类型封装:TCP 增加 MBAP,RTU 增加 CRC,PDU 仅输出 UnitId+功能码+数据。 + */ + public static ByteBuf encodeFrame(ModbusFrame frame, ModbusLinkType linkType) { + if (frame == null) { + return Unpooled.EMPTY_BUFFER; + } + ModbusLinkType lt = linkType == null ? ModbusLinkType.PDU : linkType; + ByteBuf pdu = encodePduInternal(frame); + if (pdu == null || !pdu.isReadable()) { + return pdu == null ? Unpooled.EMPTY_BUFFER : pdu; + } + switch (lt) { + case TCP: + return wrapTcpMbap(pdu); + case RTU: + return appendRtuCrc(pdu); + case PDU: + default: + return pdu; + } + } + + private static ByteBuf wrapTcpMbap(ByteBuf pdu) { + try { + int n = pdu.readableBytes(); + ByteBuf out = Unpooled.buffer(6 + n); + out.writeShort(0); + out.writeShort(0); + out.writeShort(n); + out.writeBytes(pdu); + return out; + } finally { + ReferenceCountUtil.safeRelease(pdu); + } + } + + private static ByteBuf appendRtuCrc(ByteBuf pdu) { + try { + int n = pdu.readableBytes(); + ByteBuf out = Unpooled.buffer(n + 2); + out.writeBytes(pdu); + int crc = crc16Modbus(out, 0, n); + out.writeByte(crc & 0xFF); + out.writeByte((crc >> 8) & 0xFF); + return out; + } finally { + ReferenceCountUtil.safeRelease(pdu); + } + } + + /** + * Modbus RTU CRC16(多项式 0xA001,初值 0xFFFF)。 + */ + public static int crc16Modbus(ByteBuf buf, int offset, int len) { + int crc = 0xFFFF; + for (int i = 0; i < len; i++) { + crc ^= (buf.getByte(offset + i) & 0xFF); + for (int j = 0; j < 8; j++) { + if ((crc & 1) != 0) { + crc = (crc >>> 1) ^ 0xA001; + } else { + crc >>>= 1; + } + } + } + return crc & 0xFFFF; + } + + private static ByteBuf encodePduInternal(ModbusFrame frame) { + if (frame == null) { + return Unpooled.EMPTY_BUFFER; + } + int functionCode = frame.getFunctionCode(); + ByteBuf values = frame.getValues(); + if (values == null) { + values = Unpooled.EMPTY_BUFFER; + } + ByteBuf buf; + switch (functionCode) { + case 0x01: + case 0x02: { + // 读线圈/离散输入请求: slaveId + functionCode + address(2) + quantity(2) + int quantity; + if (values.readableBytes() == 2) { + quantity = values.getUnsignedShort(values.readerIndex()); + } else { + quantity = values.readableBytes() / 2; + } + if (quantity <= 0) { + quantity = 1; + } + buf = Unpooled.buffer(1 + 1 + 2 + 2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + buf.writeShort(quantity); + return buf; + } + case 0x03: + case 0x04: { + // 读寄存器请求: slaveId + functionCode + address(2) + quantity(2) + int quantity; + if (values.readableBytes() == 2) { + quantity = values.getUnsignedShort(values.readerIndex()); + } else { + quantity = values.readableBytes() / 2; + } + if (quantity <= 0) { + quantity = 1; + } + buf = Unpooled.buffer(1 + 1 + 2 + 2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + buf.writeShort(quantity); + return buf; + } + case 0x05: { + // 写单线圈: slaveId + functionCode + address(2) + value(2), 0xFF00=ON 0x0000=OFF + buf = Unpooled.buffer(1 + 1 + 2 + 2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + int v = values.readableBytes() >= 2 ? values.getUnsignedShort(values.readerIndex()) : 0; + buf.writeShort((v == 1 || v == 0xFF00) ? 0xFF00 : 0x0000); + return buf; + } + case 0x06: { + // 写单寄存器: slaveId + functionCode + address(2) + value(2) + buf = Unpooled.buffer(1 + 1 + 2 + 2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + if (values.readableBytes() >= 2) { + // 不修改源 ByteBuf 的 readerIndex + buf.writeBytes(values, values.readerIndex(), 2); + } else { + buf.writeShort(0); + } + return buf; + } + case 0x0F: { + // 写多个线圈: slaveId + functionCode + address(2) + quantity(2) + byteCount + coil bytes + // values 格式: 每个线圈占 2 字节(short), 0/1 表示 off/on, 需打包为线圈字节(每字节8位,LSB优先) + int coilCount = values.readableBytes() / 2; + int byteCount = (coilCount + 7) / 8; + if (coilCount == 0) { + byteCount = 1; + coilCount = 1; + } + ByteBuf coilBytes = Unpooled.buffer(byteCount); + for (int i = 0; i < byteCount; i++) { + int b = 0; + for (int bit = 0; bit < 8 && i * 8 + bit < coilCount; bit++) { + int idx = (i * 8 + bit) * 2; + if (values.readableBytes() >= idx + 2) { + int v = values.getUnsignedShort(values.readerIndex() + idx); + if ((v & 1) != 0) { + b |= (1 << bit); + } + } + } + coilBytes.writeByte(b); + } + buf = Unpooled.buffer(1 + 1 + 2 + 2 + 1 + byteCount); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + buf.writeShort(coilCount); + buf.writeByte(byteCount); + buf.writeBytes(coilBytes); + return buf; + } + case 0x10: { + // 写多个寄存器: slaveId + functionCode + address(2) + quantity(2) + byteCount + values*2 + int byteCount = values.readableBytes(); + int quantity = byteCount / 2; + buf = Unpooled.buffer(1 + 1 + 2 + 2 + 1 + byteCount); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + buf.writeShort(quantity); + buf.writeByte(byteCount); + if (byteCount > 0) { + buf.writeBytes(values, values.readerIndex(), byteCount); + } + return buf; + } + case 0x16: { + // 掩码写: slaveId + functionCode + address(2) + AND_Mask(2) + OR_Mask(2) + buf = Unpooled.buffer(1 + 1 + 2 + 2 + 2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + buf.writeShort(frame.getAddress()); + int andMask = values.readableBytes() >= 2 ? values.getUnsignedShort(values.readerIndex()) : 0; + int orMask = values.readableBytes() >= 4 ? values.getUnsignedShort(values.readerIndex() + 2) : 0; + buf.writeShort(andMask); + buf.writeShort(orMask); + return buf; + } + default: + // 未知功能码, 仅写入 slaveId 和 functionCode + buf = Unpooled.buffer(2); + buf.writeByte(frame.getSlaveId()); + buf.writeByte(functionCode); + return buf; + } + } + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusLinkType.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusLinkType.java new file mode 100644 index 000000000..568441f0f --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusLinkType.java @@ -0,0 +1,5 @@ +package org.jetlinks.community.things.helper.modbus; + +public enum ModbusLinkType { + PDU, RTU, TCP +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusPropertyMapping.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusPropertyMapping.java new file mode 100644 index 000000000..4e5aa99bd --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusPropertyMapping.java @@ -0,0 +1,99 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 描述物模型属性与 Modbus 寄存器之间的映射关系。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ModbusPropertyMapping { + + /** + * 物模型属性标识 + */ + private String property; + + /** + * 所属寄存器定义 + */ + private ModbusRegisterDefinition register; + + /** + * 当 {@link ModbusRegisterDefinition} 中配置的 Codec 解码结果为数组或集合时, + * 通过该下标(从0开始)提取对应元素。 + *

+ * 未配置时, 默认使用 Codec 解码后的完整结果。 + */ + private Integer elementIndex; + + /** + * 位偏移(从0开始), 用于按位解析. + *

+ * 当未配置 {@link #elementIndex} 且未配置 {@link ModbusRegisterDefinition#codec} 时, + * 则直接按寄存器 16 位值的对应位提取. + */ + private Integer bitIndex; + + /** + * 位长度, 默认为 1. + */ + private Integer bitLength; + + /** + * 保持寄存器按位写时是否使用功能码 0x16(Mask Write Register)。 + *

+ * 默认关闭:仍合并为整字后走 0x06/0x10。开启且映射为保持寄存器按位写时,下行生成掩码写 PDU(需设备支持 FC22)。 + */ + private Boolean useMaskWrite; + + /** + * 读写方向: 是否可读 + */ + private boolean readable = true; + + /** + * 读写方向: 是否可写 + */ + private boolean writable = true; + + /** + * 可选描述信息 + */ + private String description; + + /** + * 判断是否配置了按位解析 + * + * @return true if bit parsing is configured + */ + public boolean isBitOperation() { + return bitIndex != null && bitIndex >= 0; + } + + /** + * 获取位长度,默认为 1 + * + * @return bit length + */ + public int getBitLengthSafe() { + return bitLength != null && bitLength > 0 ? bitLength : 1; + } + + /** + * 获取位掩码,用于按位提取和修改 + * + * @return bit mask + */ + public int getBitMask() { + int length = getBitLengthSafe(); + return ((1 << length) - 1) << (bitIndex != null ? bitIndex : 0); + } + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequest.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequest.java new file mode 100644 index 000000000..672a8322f --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequest.java @@ -0,0 +1,38 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 抽象的 Modbus 读请求描述, 不关心底层报文格式。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ModbusReadRequest { + + /** + * 从站ID + */ + private int slaveId; + + /** + * 功能码 + */ + private int functionCode; + + /** + * 起始寄存器地址 + */ + private int address; + + /** + * 读取寄存器数量 + */ + private int quantity; + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequestBuilder.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequestBuilder.java new file mode 100644 index 000000000..0a3026274 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusReadRequestBuilder.java @@ -0,0 +1,147 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * 将物模型要读取的属性 ID 转换为 Modbus 读请求, 并自动按从站/功能码合并连续寄存器。 + *

+ * 全部为静态方法, 可直接传入 {@link Collection} 或 {@link Map}(仅使用 key 作为属性 ID), 避免额外包装对象。 + */ +public final class ModbusReadRequestBuilder { + + private ModbusReadRequestBuilder() { + } + + /** + * 根据属性 ID 集合与映射配置生成 Modbus 读请求。 + * + * @param propertyIds 要读取的物模型属性 ID(null 或空字符串会被忽略) + * @param mapping 映射配置 + * @return Modbus 读请求列表 + */ + public static List build(Collection propertyIds, + ModbusThingsMapping mapping) { + if (propertyIds == null || propertyIds.isEmpty() || mapping == null) { + return List.of(); + } + + List units = new ArrayList<>(); + for (String propertyId : propertyIds) { + if (propertyId == null || propertyId.isEmpty()) { + continue; + } + mapping + .getProperty(propertyId) + .ifPresent(prop -> { + ModbusRegisterDefinition def = prop.getRegister(); + if (def == null || def.getKey() == null) { + return; + } + ModbusRegisterKey key = def.getKey(); + int start = key.getAddress(); + int quantity = Math.max(def.getRegisterCount(), 1); + units.add(new RegisterReadUnit(key.getSlaveId(), key.getType(), start, quantity)); + }); + } + + if (units.isEmpty()) { + return List.of(); + } + + Map> groups = units + .stream() + .collect(Collectors.groupingBy(unit -> new SlaveFunctionKey(unit.getSlaveId(), unit.getType()))); + + List requests = new ArrayList<>(); + for (Map.Entry> entry : groups.entrySet()) { + List groupUnits = entry + .getValue() + .stream() + .sorted(Comparator.comparingInt(RegisterReadUnit::getAddress)) + .collect(Collectors.toList()); + + mergeGroup(entry.getKey(), groupUnits, requests); + } + return requests; + } + + /** + * 根据属性 Map 与映射配置生成 Modbus 读请求。 + *

+ * 仅使用 {@link Map#keySet()} 作为要读取的属性 ID,忽略 value。 + * + * @param properties 属性 ID 到任意值的映射(value 不参与计算) + * @param mapping 映射配置 + * @return Modbus 读请求列表 + */ + public static List build(Map properties, + ModbusThingsMapping mapping) { + if (properties == null || properties.isEmpty()) { + return List.of(); + } + return build(properties.keySet(), mapping); + } + + private static void mergeGroup(SlaveFunctionKey key, + List units, + List output) { + if (units.isEmpty()) { + return; + } + int currentStart = units.get(0).getAddress(); + int currentEnd = currentStart + units.get(0).getQuantity() - 1; + + for (int i = 1; i < units.size(); i++) { + RegisterReadUnit unit = units.get(i); + int start = unit.getAddress(); + int end = start + unit.getQuantity() - 1; + // 地址连续则合并 + if (start <= currentEnd + 1) { + currentEnd = Math.max(currentEnd, end); + } else { + output.add(new ModbusReadRequest(key.getSlaveId(), key.getFunctionCode(), currentStart, currentEnd - currentStart + 1)); + currentStart = start; + currentEnd = end; + } + } + output.add(new ModbusReadRequest(key.getSlaveId(), key.getFunctionCode(), currentStart, currentEnd - currentStart + 1)); + } + + @Getter + @AllArgsConstructor + private static class RegisterReadUnit { + private final int slaveId; + private final ModbusRegisterType type; + private final int address; + private final int quantity; + } + + @Getter + @AllArgsConstructor + private static class SlaveFunctionKey { + private final int slaveId; + private final ModbusRegisterType type; + + public int getFunctionCode() { + return type != null ? type.getReadFunction() : 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SlaveFunctionKey that = (SlaveFunctionKey) o; + return slaveId == that.slaveId && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(slaveId, type); + } + } + +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterDefinition.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterDefinition.java new file mode 100644 index 000000000..497788116 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterDefinition.java @@ -0,0 +1,195 @@ +package org.jetlinks.community.things.helper.modbus; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.jetlinks.core.codec.Codec; +import org.jetlinks.core.codec.Codecs; +import org.jetlinks.core.codec.layout.ByteLayout; +import org.jetlinks.core.codec.layout.ByteLayouts; +import org.jetlinks.core.utils.NumberUtils; + +import static org.jetlinks.core.codec.layout.ByteLayout.BIG_ENDIAN; + +/** + * 描述 Modbus 寄存器的基础定义信息。 + *

+ * 通过 {@link Codec} 与 {@link ByteLayout} + * 定义数值的解析规则与字节布局, 避免在此处单独配置有符号、浮点、小端等细节。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ModbusRegisterDefinition { + + /** + * 寄存器键 + */ + private ModbusRegisterKey key; + + /** + * 寄存器数量(单位: 寄存器, 一个寄存器为2字节) + */ + private int registerCount = 1; + + /** + * 使用的编解码器标识, 对应 {@link Codec#getId()}. + *

+ * 当配置此字段时, 将优先使用 Codec 进行数值编解码。 + */ + private String codec; + + /** + * 字节布局标识, 对应 {@link ByteLayout#getId()}. + *

+ * 仅在使用 Codec 时生效, 用于描述大小端等字节重排规则。 + */ + private String layout; + + /** + * 缩放因子, 解码时: 原始值 * scaleFactor(经 {@link #applyScaleAfterDecode} 应用)。 + */ + private double scaleFactor = 1D; + + /** + * 小数位, <0 表示不处理精度(经 {@link #applyScaleAfterDecode} 应用)。 + */ + private int scale = -1; + + /** + * 描述信息 + */ + private String description; + + /** + * 使用当前寄存器定义, 从给定的 ByteBuf 中解码出完整数值。 + *

+ * - 若配置了 codec, 则直接使用 codec + ByteLayout 进行解码; + * - 否则按无符号 16 位整型读取原始值(缩放由上层在按位/元素提取之后或之前统一处理)。 + * + * @param source 寄存器对应的原始字节(通常长度为 registerCount * 2) + * @return 解码后的结果, 可能是标量、数组或集合; 若无法解码则返回 null + */ + public Object decode(ByteBuf source) { + if (source == null) { + return null; + } + + // 优先使用 Codec + ByteLayout 进行解析 + if (codec != null && !codec.isEmpty()) { + Codec c = Codecs.getNow(codec); + ByteLayout layoutObj = resolveLayout(layout); + ByteBuf buf; + if (layoutObj != null + && layoutObj.byteLength() > 0 + && layoutObj.byteLength() <= source.readableBytes() + && layoutObj != BIG_ENDIAN + && layoutObj != ByteLayout.AB + && layoutObj != ByteLayout.AB_CD + && layoutObj != ByteLayout.AB_CD_EF_GH) { + // 仅在需要进行字节重排时才复制, 避免修改上游缓存内容 + buf = source.copy(); + ByteBuf slice = buf.slice(0, layoutObj.byteLength()); + buf = layoutObj.reorder(slice); + } else { + // 无需字节重排时直接 duplicate, 避免不必要的 copy + buf = source.duplicate(); + } + return c.decode(buf); + } + + // 兼容旧配置: 未配置 codec 时读取无符号 16 位原始值 + if (source.readableBytes() < 2) { + return null; + } + return source.getUnsignedShort(source.readerIndex()); + } + + /** + * 对 Codec/原始整型解码结果应用 {@link #scaleFactor} / {@link #scale}。 + *

字内按位映射应在调用本方法之前使用原始解码值做位提取(见 {@link ModbusThingsMapping#decode})。

+ */ + public Object applyScaleAfterDecode(Object decoded) { + if (decoded == null) { + return null; + } + if (scaleFactor == 1D && scale < 0) { + return decoded; + } + if (decoded instanceof Number) { + return scaleOne(((Number) decoded).doubleValue()); + } + if (decoded instanceof int[]) { + int[] arr = (int[]) decoded; + if (arr.length == 0) { + return arr; + } + double[] out = new double[arr.length]; + for (int i = 0; i < arr.length; i++) { + out[i] = scaleOneDouble(arr[i]); + } + return out; + } + if (decoded instanceof long[]) { + long[] arr = (long[]) decoded; + double[] out = new double[arr.length]; + for (int i = 0; i < arr.length; i++) { + out[i] = scaleOneDouble(arr[i]); + } + return out; + } + if (decoded instanceof float[]) { + float[] arr = (float[]) decoded; + double[] out = new double[arr.length]; + for (int i = 0; i < arr.length; i++) { + out[i] = scaleOneDouble(arr[i]); + } + return out; + } + if (decoded instanceof double[]) { + double[] arr = (double[]) decoded; + double[] out = new double[arr.length]; + for (int i = 0; i < arr.length; i++) { + out[i] = scaleOneDouble(arr[i]); + } + return out; + } + return decoded; + } + + private Object scaleOne(double v) { + if (scaleFactor != 1D) { + v = v * scaleFactor; + } + if (scale >= 0) { + return NumberUtils.convertEffectiveScale(v, scale); + } + return v; + } + + private double scaleOneDouble(double v) { + if (scaleFactor != 1D) { + v = v * scaleFactor; + } + if (scale >= 0) { + Object o = NumberUtils.convertEffectiveScale(v, scale); + return o instanceof Number ? ((Number) o).doubleValue() : Double.parseDouble(String.valueOf(o)); + } + return v; + } + + /** + * 根据布局标识解析为 {@link ByteLayout}, 当前简单支持 2 字节场景(AB/BA)。 + */ + private ByteLayout resolveLayout(String id) { + if (id == null || id.isEmpty()) { + return null; + } + return ByteLayouts.getNow(id); + } + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterKey.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterKey.java new file mode 100644 index 000000000..cb1b7094f --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterKey.java @@ -0,0 +1,79 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.*; + +/** + * 表示一个 Modbus 寄存器键,用于唯一标识一个寄存器。 + *

+ * 字符串表示格式: {@code 从站ID_类型_地址},例如 {@code 1_HoldingRegisters_0} + * + * @author zhou + */ +@Getter +@Setter +@EqualsAndHashCode +@NoArgsConstructor +@AllArgsConstructor +public class ModbusRegisterKey { + + /** + * 从站ID + */ + private int slaveId; + + /** + * 寄存器类型 + */ + private ModbusRegisterType type; + + /** + * 寄存器地址 + */ + private int address; + + /** + * 根据功能码创建寄存器键 + * + * @param slaveId 从站ID + * @param functionCode 功能码 + * @param address 寄存器地址 + * @return 寄存器键 + * @throws IllegalArgumentException 如果功能码无效 + */ + public static ModbusRegisterKey of(int slaveId, int functionCode, int address) { + ModbusRegisterType registerType = ModbusRegisterType.fromFunctionCode(functionCode); + if (registerType == null) { + throw new IllegalArgumentException("Invalid function code: " + functionCode); + } + return new ModbusRegisterKey(slaveId, registerType, address); + } + + /** + * 获取读功能码 + * + * @return 读功能码 + */ + public int getReadFunctionCode() { + return type != null ? type.getReadFunction() : -1; + } + + /** + * 获取写功能码 + * + * @param writeMultiple 是否写多个 + * @return 写功能码,如果不可写则返回 -1 + */ + public int getWriteFunctionCode(boolean writeMultiple) { + if (type == null || !type.isWritable()) { + return -1; + } + return writeMultiple ? type.getWriteMultipleFunction() : type.getWriteSingleFunction(); + } + + @Override + public String toString() { + return slaveId + "_" + (type != null ? type.name() : "UNKNOWN") + "_" + address; + } + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterSnapshot.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterSnapshot.java new file mode 100644 index 000000000..6ec3771e5 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterSnapshot.java @@ -0,0 +1,33 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * 用于在转换过程中保存寄存器的当前值快照。 + *

+ * 这里使用 int 表示寄存器值(0-65535),具体有符号处理在映射阶段完成。 + */ +@Getter +@AllArgsConstructor +public class ModbusRegisterSnapshot { + + private final Map values; + + public static ModbusRegisterSnapshot of(Map values) { + if (values == null || values.isEmpty()) { + return new ModbusRegisterSnapshot(Collections.emptyMap()); + } + return new ModbusRegisterSnapshot(Collections.unmodifiableMap(new HashMap<>(values))); + } + + public Integer get(ModbusRegisterKey key) { + return values.get(key); + } + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterType.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterType.java new file mode 100644 index 000000000..a4f056fab --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegisterType.java @@ -0,0 +1,120 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.hswebframework.web.dict.I18nEnumDict; + +import java.util.Arrays; +import java.util.Optional; + +/** + * Modbus 寄存器类型, 用于简化配置并自动推断读写功能码。 + */ +@AllArgsConstructor +@Getter +public enum ModbusRegisterType implements I18nEnumDict { + /** + * 线圈寄存器 (可读写, 1 bit) + * 读: 0x01, 写单个: 0x05, 写多个: 0x0F + */ + Coils("线圈寄存器", 0x01, 0x05, 0x0F, null), + + /** + * 离散输入寄存器 (只读, 1 bit) + * 读: 0x02 + */ + DiscreteInputs("离散输入寄存器", 0x02, null, null, null), + + /** + * 保存寄存器 (可读写, 16 bits) + * 读: 0x03, 写单个: 0x06, 写多个: 0x10, 掩码写: 0x16 + */ + HoldingRegisters("保存寄存器", 0x03, 0x06, 0x10, 0x16), + + /** + * 输入寄存器 (只读, 16 bits) + * 读: 0x04 + */ + InputRegisters("输入寄存器", 0x04, null, null, null); + + private final String text; + private final int readFunction; + private final Integer writeSingleFunction; + private final Integer writeMultipleFunction; + /** + * 掩码写寄存器 (仅保持寄存器), 功能码 0x16;与按位写且 {@link ModbusPropertyMapping#useMaskWrite} 配合使用。 + */ + private final Integer maskWriteFunction; + + @Override + public String getValue() { + return name(); + } + + public boolean isReadOnly() { + return writeSingleFunction == null; + } + + public static Optional of(String text) { + return Arrays.stream(values()) + .filter(value -> value.name().equalsIgnoreCase(text) || value.getText().equals(text)) + .findAny(); + } + + /** + * 根据功能码获取对应的寄存器类型 + * + * @param functionCode 功能码 + * @return 寄存器类型,如果功能码无效则返回 null + */ + public static ModbusRegisterType fromFunctionCode(int functionCode) { + return Arrays.stream(values()) + .filter(type -> type.readFunction == functionCode + || (type.writeSingleFunction != null && type.writeSingleFunction == functionCode) + || (type.writeMultipleFunction != null && type.writeMultipleFunction == functionCode) + || (type.maskWriteFunction != null && type.maskWriteFunction == functionCode)) + .findFirst() + .orElse(null); + } + + /** + * 判断该寄存器类型是否可写 + * + * @return true if writable + */ + public boolean isWritable() { + return writeSingleFunction != null; + } + + /** + * 判断该寄存器类型是否可读 + * + * @return true if readable + */ + public boolean isReadable() { + return readFunction > 0; + } + + /** + * 获取写功能码(优先返回写多个的功能码) + * + * @param writeMultiple 是否写多个 + * @return 写功能码,如果不可写则返回 -1 + */ + public int getWriteFunctionCode(boolean writeMultiple) { + if (!isWritable()) { + return -1; + } + return writeMultiple ? writeMultipleFunction : writeSingleFunction; + } + + /** + * 获取读功能码 + * + * @return 读功能码 + */ + public int getReadFunctionCode() { + return readFunction; + } + +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegistersDecoder.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegistersDecoder.java new file mode 100644 index 000000000..e7c56cd67 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusRegistersDecoder.java @@ -0,0 +1,215 @@ +package org.jetlinks.community.things.helper.modbus; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import reactor.core.publisher.Mono; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; + +/** + * Modbus 寄存器解码器,负责将寄存器快照解析为物模型属性值。 + *

+ * 解析流程: + *

    + *
  1. 根据属性映射定位寄存器值
  2. + *
  3. 使用寄存器定义中的 Codec 解析数值
  4. + *
  5. 应用按位提取或元素索引提取
  6. + *
  7. 返回属性ID到属性值的映射
  8. + *
+ * + * @author zhou + */ +public class ModbusRegistersDecoder { + + /** + * 解码寄存器快照为物模型属性值 + * + * @param snapshot 寄存器快照 + * @param mapping 映射配置 + * @return 属性ID到属性值的映射 + */ + public static Map decode(ModbusRegisterSnapshot snapshot, + ModbusThingsMapping mapping) { + if (snapshot == null || mapping == null) { + return new HashMap<>(); + } + + Map properties = new HashMap<>(); + Map registers = snapshot.getValues(); + + if (registers == null || registers.isEmpty()) { + return properties; + } + + // 先按寄存器维度解码,避免重复解码同一寄存器 + Map decodedRegisters = new HashMap<>(); + + for (ModbusPropertyMapping propertyMapping : mapping.getProperties()) { + if (propertyMapping == null + || !propertyMapping.isReadable() + || propertyMapping.getProperty() == null + || propertyMapping.getRegister() == null) { + continue; + } + + ModbusRegisterKey key = propertyMapping.getRegister().getKey(); + if (key == null || !registers.containsKey(key)) { + continue; + } + + // 获取或解码寄存器值 + Object decodedValue = decodedRegisters.computeIfAbsent(key, k -> { + Integer registerValue = registers.get(k); + return decodeRegister(registerValue, propertyMapping.getRegister()); + }); + + ModbusRegisterDefinition def = propertyMapping.getRegister(); + Object propertyValue = propertyMapping.isBitOperation() + ? extractPropertyValue(decodedValue, propertyMapping) + : extractPropertyValue(def.applyScaleAfterDecode(decodedValue), propertyMapping); + if (propertyValue != null) { + properties.put(propertyMapping.getProperty(), propertyValue); + } + } + + return properties; + } + + /** + * 使用寄存器定义解码单个寄存器值 + * + * @param registerValue 寄存器原始值(16位无符号整数) + * @param definition 寄存器定义 + * @return 解码后的值 + */ + private static Object decodeRegister(Integer registerValue, ModbusRegisterDefinition definition) { + if (registerValue == null || definition == null) { + return null; + } + ByteBuf buf = Unpooled.buffer(2); + buf.writeShort(registerValue & 0xFFFF); + try { + return definition.decode(buf); + } catch (Exception e) { + return decodeWithScale(registerValue); + } finally { + buf.release(); + } + } + + /** + * definition.decode 失败时的回退:仅返回无符号 16 位原始值(缩放由 {@link ModbusRegisterDefinition#applyScaleAfterDecode} 统一处理)。 + */ + private static Object decodeWithScale(Integer registerValue) { + return registerValue & 0xFFFF; + } + + /** + * 从解码结果中提取属性值 + * + * @param decodedValue 解码后的寄存器值 + * @param propertyMapping 属性映射 + * @return 最终的属性值 + */ + private static Object extractPropertyValue(Object decodedValue, ModbusPropertyMapping propertyMapping) { + if (decodedValue == null) { + return null; + } + + // 优先处理按位提取 + if (propertyMapping.isBitOperation()) { + return extractBits(decodedValue, propertyMapping); + } + + // 处理元素索引提取 + if (propertyMapping.getElementIndex() != null && propertyMapping.getElementIndex() >= 0) { + return extractElement(decodedValue, propertyMapping.getElementIndex()); + } + + // 直接返回解码结果 + return decodedValue; + } + + /** + * 按位提取属性值 + * + * @param value 原始值 + * @param propertyMapping 属性映射 + * @return 提取后的位值 + */ + private static Object extractBits(Object value, ModbusPropertyMapping propertyMapping) { + int bitIndex = propertyMapping.getBitIndex(); + int bitLength = propertyMapping.getBitLengthSafe(); + + int intValue; + if (value instanceof Number) { + intValue = ((Number) value).intValue(); + } else if (value instanceof Boolean) { + intValue = ((Boolean) value) ? 1 : 0; + } else { + try { + intValue = new BigDecimal(String.valueOf(value)).intValue(); + } catch (Exception e) { + return null; + } + } + + // 提取指定位 + int mask = ((1 << bitLength) - 1) << bitIndex; + int bits = (intValue & mask) >>> bitIndex; + + // 如果是1位,返回布尔值;否则返回数值 + return bitLength == 1 ? (bits == 1) : bits; + } + + /** + * 从数组或集合中提取指定下标的元素 + * + * @param decodedValue 解码后的值 + * @param index 元素下标 + * @return 提取的元素 + */ + @SuppressWarnings("all") + private static Object extractElement(Object decodedValue, int index) { + if (decodedValue == null || index < 0) { + return null; + } + + if (decodedValue.getClass().isArray()) { + int length = java.lang.reflect.Array.getLength(decodedValue); + if (index >= length) { + return null; + } + return java.lang.reflect.Array.get(decodedValue, index); + } + + if (decodedValue instanceof Iterable) { + int i = 0; + for (Object item : (Iterable) decodedValue) { + if (i == index) { + return item; + } + i++; + } + return null; + } + + // 不是数组或集合,直接返回原值 + return decodedValue; + } + + /** + * 响应式版本的解码方法 + * + * @param snapshot 寄存器快照 + * @param mapping 映射配置 + * @return 属性ID到属性值的映射 + */ + public static Mono> decodeReactive(ModbusRegisterSnapshot snapshot, + ModbusThingsMapping mapping) { + return Mono.fromSupplier(() -> decode(snapshot, mapping)); + } + +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsHelper.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsHelper.java new file mode 100644 index 000000000..53d86dd8c --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsHelper.java @@ -0,0 +1,60 @@ +package org.jetlinks.community.things.helper.modbus; + +import org.jetlinks.core.monitor.Monitor; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 基于 {@link ModbusThingsMapping} 的静态入口:寄存器快照解析、读/写请求构建。 + */ +public final class ModbusThingsHelper { + + private ModbusThingsHelper() { + } + + /** + * 将寄存器当前值解析为物模型属性(内部委托 {@link ModbusThingsMapping#decodeRegisters(Map, Monitor)})。 + */ + public static Map decodeProperties(ModbusThingsMapping mapping, + Map registers, + Monitor monitor) { + if (mapping == null || registers == null || registers.isEmpty()) { + return Collections.emptyMap(); + } + return mapping.decodeRegisters(registers, monitor); + } + + public static List buildReadRequests(ModbusThingsMapping mapping, + Collection properties) { + return ModbusReadRequestBuilder.build(properties, mapping); + } + + public static List buildReadRequests(ModbusThingsMapping mapping, + Collection properties, + Monitor monitor) { + return buildReadRequests(mapping, properties); + } + + public static List buildReadRequests(ModbusThingsMapping mapping, + Map properties, + Monitor monitor) { + return ModbusReadRequestBuilder.build(properties, mapping); + } + + public static List buildWriteRequests(ModbusThingsMapping mapping, + Map properties, + ModbusRegisterSnapshot snapshot, + Monitor monitor) { + if (mapping == null || properties == null || properties.isEmpty()) { + return List.of(); + } + List ops = properties.entrySet().stream() + .map(e -> new ThingWriteOperation(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + return new ModbusWriteRequestBuilder().build(ops, mapping, snapshot); + } +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsMapping.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsMapping.java new file mode 100644 index 000000000..cb1e65372 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusThingsMapping.java @@ -0,0 +1,663 @@ +package org.jetlinks.community.things.helper.modbus; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.jetlinks.core.monitor.Monitor; + +import java.nio.ByteOrder; +import java.util.*; + +/** + * 描述一个物模型在 Modbus 协议下的整体映射配置。 + *

+ * 优化内存缓存结构: + *

    + *
  • 使用索引结构加速查询(属性ID、寄存器键)
  • + *
  • 懒加载索引,减少初始化开销
  • + *
  • 使用不可变集合,降低内存占用
  • + *
  • 提供快速查找方法
  • + *
+ * + * @author zhou + */ +@Getter +@Setter +@NoArgsConstructor +public class ModbusThingsMapping { + + /** + * 物模型标识(例如设备ID或物类型ID) + */ + private String thingId; + + /** + * 链路封装:TCP(含 MBAP)、RTU(含 CRC)或仅 PDU(默认,与历史配置一致)。 + */ + private ModbusLinkType modbusLinkType = ModbusLinkType.PDU; + + /** + * 物模型属性与寄存器映射集合;寄存器定义(地址、Codec、数量等)内嵌在每条 {@link ModbusPropertyMapping#getRegister()} 中。 + */ + private List properties = Collections.emptyList(); + + public void setProperties(List properties) { + if (properties == null || properties.isEmpty()) { + this.properties = Collections.emptyList(); + return; + } + List list = new ArrayList<>(properties.size()); + for (ModbusPropertyMapping property : properties) { + if (property != null && property.getProperty() != null && !property.getProperty().isEmpty()) { + list.add(property); + } + } + this.properties = Collections.unmodifiableList(list); + } + + // 索引结构(懒加载) + @JsonIgnore + private transient volatile Map propertyIndex; + @JsonIgnore + private transient volatile Map> registerIndex; + + /** + * 初始化索引,在首次查询时自动调用 + */ + private void buildIndexes() { + if (propertyIndex != null && registerIndex != null) { + return; + } + + synchronized (this) { + if (propertyIndex != null && registerIndex != null) { + return; + } + + Map propIdx = new HashMap<>(properties.size()); + Map> regIdx = new HashMap<>(); + + for (ModbusPropertyMapping property : properties) { + if (property == null || property.getProperty() == null) { + continue; + } + + // 构建属性索引 + propIdx.put(property.getProperty(), property); + + // 构建寄存器索引 + if (property.getRegister() != null && property.getRegister().getKey() != null) { + ModbusRegisterKey key = property.getRegister().getKey(); + regIdx.computeIfAbsent(key, k -> new ArrayList<>()).add(property); + } + } + + // 将列表转为不可变,减少内存占用 + Map> immutableRegIdx = new HashMap<>(regIdx.size()); + regIdx.forEach((k, v) -> immutableRegIdx.put(k, Collections.unmodifiableList(v))); + + this.propertyIndex = Collections.unmodifiableMap(propIdx); + this.registerIndex = Collections.unmodifiableMap(immutableRegIdx); + } + } + + /** + * 获取属性映射列表 + * + * @return 不可变的属性映射列表 + */ + public List getProperties() { + return properties != null ? properties : Collections.emptyList(); + } + + /** + * 根据属性ID快速查找映射配置(O(1)复杂度) + * + * @param propertyId 属性ID + * @return 属性映射,如果不存在返回 Optional.empty() + */ + public Optional getProperty(String propertyId) { + if (propertyId == null || propertyId.isEmpty()) { + return Optional.empty(); + } + ensureIndexes(); + return Optional.ofNullable(propertyIndex.get(propertyId)); + } + + /** + * 根据寄存器键快速查找所有关联的属性映射(O(1)复杂度) + * + * @param key 寄存器键 + * @return 属性映射列表,如果不存在返回空列表 + */ + public List getPropertiesByRegister(ModbusRegisterKey key) { + if (key == null) { + return Collections.emptyList(); + } + ensureIndexes(); + return registerIndex.getOrDefault(key, Collections.emptyList()); + } + + /** + * 检查是否包含指定属性 + * + * @param propertyId 属性ID + * @return true if contains + */ + public boolean containsProperty(String propertyId) { + if (propertyId == null || propertyId.isEmpty()) { + return false; + } + ensureIndexes(); + return propertyIndex.containsKey(propertyId); + } + + /** + * 获取所有属性ID + * + * @return 属性ID集合 + */ + public Set getPropertyIds() { + ensureIndexes(); + return propertyIndex.keySet(); + } + + /** + * 获取映射的寄存器键集合 + * + * @return 寄存器键集合 + */ + public Set getRegisterKeys() { + ensureIndexes(); + return registerIndex.keySet(); + } + + /** + * 获取属性映射数量 + * + * @return 属性数量 + */ + public int getPropertyCount() { + return properties != null ? properties.size() : 0; + } + + /** + * 确保索引已构建 + */ + private void ensureIndexes() { + if (propertyIndex == null || registerIndex == null) { + buildIndexes(); + } + } + + /** + * 单帧线圈/离散解析时上限,防止异常报文导致过大遍历。 + */ + private static final int MAX_COIL_BITS_PER_DECODE = 2000; + + /** + * 根据已解析的 {@link ModbusFrame},按本映射配置直接解析出物模型属性值。 + *

+ * 不再经过整帧 → {@code Map} 的中间寄存器表,而是按每个属性映射 + * 在帧数据区中定位原始字节并调用 {@link ModbusRegisterDefinition#decode(ByteBuf)}。 + * + * @param frame 逻辑 Modbus 帧(与 {@link ModbusFrameCodec#decode(ByteBuf)} 结果一致) + * @param monitor 可为 null;解码单属性失败时记录告警 + * @return 属性 id → 属性值,无匹配时返回空 Map + */ + public Map decode(ModbusFrame frame, Monitor monitor) { + if (frame == null) { + return Collections.emptyMap(); + } + ensureIndexes(); + List list = getProperties(); + if (list.isEmpty()) { + return Collections.emptyMap(); + } + Map result = new LinkedHashMap<>(); + for (ModbusPropertyMapping pm : list) { + if (pm == null || !pm.isReadable() || pm.getProperty() == null) { + continue; + } + ModbusRegisterDefinition def = pm.getRegister(); + if (def == null || def.getKey() == null) { + continue; + } + ModbusRegisterKey key = def.getKey(); + if (key.getSlaveId() != frame.getSlaveId()) { + continue; + } + if (!functionMatchesFrame(key, frame.getFunctionCode())) { + continue; + } + try { + ByteBuf raw = extractPropertyRawFromFrame(frame, def); + if (raw == null || !raw.isReadable()) { + continue; + } + Object decoded = def.decode(raw); + if (decoded == null && (def.getCodec() == null || def.getCodec().isEmpty())) { + // 兼容线圈: 未配置 codec 时,如果是线圈类型且只有1字节数据,尝试转为 Boolean + ModbusRegisterType type = key.getType(); + if ((type == ModbusRegisterType.Coils || type == ModbusRegisterType.DiscreteInputs) + && raw.readableBytes() == 1) { + decoded = raw.readByte() != 0; + } + } + Object value = pm.isBitOperation() + ? applyPropertyMapping(pm, decoded) + : applyPropertyMapping(pm, def.applyScaleAfterDecode(decoded)); + if (value != null) { + result.put(pm.getProperty(), value); + } + } catch (Exception ex) { + if (monitor != null) { + monitor.logger().warn("modbus.mapping.decode.property.failed", pm.getProperty(), ex); + } + } + } + return result.isEmpty() ? Collections.emptyMap() : result; + } + + /** + * 从寄存器键值快照解析属性(兼容基于快照的读改写等路径),语义与 {@link #decode(ModbusFrame, Monitor)} 对齐。 + * + * @param registers 寄存器当前值 + * @param monitor 可为 null + * @return 属性 id → 属性值 + */ + public Map decodeRegisters(Map registers, Monitor monitor) { + if (registers == null || registers.isEmpty()) { + return Collections.emptyMap(); + } + ensureIndexes(); + List list = getProperties(); + if (list.isEmpty()) { + return Collections.emptyMap(); + } + Map result = new LinkedHashMap<>(); + for (ModbusPropertyMapping pm : list) { + if (pm == null || !pm.isReadable() || pm.getProperty() == null) { + continue; + } + ModbusRegisterDefinition def = pm.getRegister(); + if (def == null || def.getKey() == null) { + continue; + } + try { + ByteBuf raw = extractPropertyRawFromRegisterMap(def, registers); + if (raw == null || !raw.isReadable()) { + continue; + } + Object decoded = def.decode(raw); + Object value = pm.isBitOperation() + ? applyPropertyMapping(pm, decoded) + : applyPropertyMapping(pm, def.applyScaleAfterDecode(decoded)); + if (value != null) { + result.put(pm.getProperty(), value); + } + } catch (Exception ex) { + if (monitor != null) { + monitor.logger().warn("modbus.mapping.decode.property.failed", pm.getProperty(), ex); + } + } + } + return result.isEmpty() ? Collections.emptyMap() : result; + } + + private static boolean functionMatchesFrame(ModbusRegisterKey key, int functionCode) { + ModbusRegisterType t = key.getType(); + if (t == null) { + return false; + } + return functionCode == t.getReadFunction() + || (t.getWriteSingleFunction() != null && functionCode == t.getWriteSingleFunction()) + || (t.getWriteMultipleFunction() != null && functionCode == t.getWriteMultipleFunction()) + || (t.getMaskWriteFunction() != null && functionCode == t.getMaskWriteFunction()); + } + + /** + * 数据区第一个线圈/寄存器对应的 PDU 地址(与透传场景下读响应数据区从地址 0 起算的约定一致)。 + */ + private static int pduDataStart(ModbusFrame frame) { + int fc = frame.getFunctionCode(); + switch (fc) { + case 0x01: + case 0x02: + case 0x03: + case 0x04: + return 0; + case 0x05: + case 0x06: + case 0x0F: + case 0x10: + case 0x16: + return frame.getAddress(); + default: + return 0; + } + } + + private ByteBuf extractPropertyRawFromFrame(ModbusFrame frame, ModbusRegisterDefinition def) { + ModbusRegisterKey key = def.getKey(); + ModbusRegisterType type = key.getType(); + int fc = frame.getFunctionCode(); + ByteBuf data = frame.getValues(); + int regAddr = key.getAddress(); + int count = Math.max(1, def.getRegisterCount()); + + if (type == ModbusRegisterType.Coils || type == ModbusRegisterType.DiscreteInputs) { + if (fc == 0x05) { + if (regAddr != frame.getAddress()) { + return null; + } + if (data == null || data.readableBytes() < 2) { + return null; + } + int v = data.getUnsignedShort(data.readerIndex()); + int bitVal = (v == 0xFF00) ? 1 : 0; + return Unpooled.buffer(2).writeShort(bitVal); + } + return extractCoilRegionFromFrame(frame, data, regAddr, count); + } + if (type == ModbusRegisterType.HoldingRegisters || type == ModbusRegisterType.InputRegisters) { + return extractWordRegionFromFrame(frame, data, regAddr, count); + } + return null; + } + + private static ByteBuf extractWordRegionFromFrame(ModbusFrame frame, ByteBuf data, int regAddr, int registerCount) { + if (data == null || !data.isReadable()) { + return null; + } + int pduStart = pduDataStart(frame); + int rel = regAddr - pduStart; + if (rel < 0) { + return null; + } + int need = registerCount * 2; + int off = rel * 2; + if (data.readableBytes() < off + need) { + return null; + } + return data.slice(data.readerIndex() + off, need); + } + + private static ByteBuf extractCoilRegionFromFrame(ModbusFrame frame, ByteBuf data, int coilStart, int numCoils) { + if (data == null || !data.isReadable()) { + return null; + } + int pduStart = pduDataStart(frame); + int bitStart = coilStart - pduStart; + if (bitStart < 0 || numCoils < 1 || bitStart + numCoils > MAX_COIL_BITS_PER_DECODE) { + return null; + } + if (numCoils <= 16) { + int v = 0; + for (int i = 0; i < numCoils; i++) { + if (readCoilBit(data, bitStart + i)) { + v |= (1 << i); + } + } + int nBytes = (numCoils + 7) / 8; + ByteBuf out = Unpooled.buffer(nBytes).order(ByteOrder.LITTLE_ENDIAN); + if (nBytes == 1) { + out.writeByte(v); + } else { + out.writeShort(v); + } + return out; + } + int nBytes = (numCoils + 7) / 8; + ByteBuf out = Unpooled.buffer(nBytes); + for (int b = 0; b < nBytes; b++) { + int by = 0; + for (int bit = 0; bit < 8; bit++) { + int idx = b * 8 + bit; + if (idx >= numCoils) { + break; + } + if (readCoilBit(data, bitStart + idx)) { + by |= (1 << bit); + } + } + out.writeByte(by); + } + return out; + } + + private static boolean readCoilBit(ByteBuf coilBytes, int bitIndex) { + int byteIdx = bitIndex / 8; + int bitInByte = bitIndex % 8; + if (coilBytes.readableBytes() <= byteIdx) { + return false; + } + int b = coilBytes.getUnsignedByte(coilBytes.readerIndex() + byteIdx); + return (b & (1 << bitInByte)) != 0; + } + + private ByteBuf extractPropertyRawFromRegisterMap(ModbusRegisterDefinition def, + Map registers) { + ModbusRegisterKey key = def.getKey(); + ModbusRegisterType type = key.getType(); + int count = Math.max(1, def.getRegisterCount()); + int start = key.getAddress(); + + if (type == ModbusRegisterType.HoldingRegisters || type == ModbusRegisterType.InputRegisters) { + ByteBuf out = Unpooled.buffer(count * 2); + for (int i = 0; i < count; i++) { + ModbusRegisterKey k = new ModbusRegisterKey(key.getSlaveId(), type, start + i); + Integer v = registers.get(k); + if (v == null) { + return null; + } + out.writeShort(v & 0xFFFF); + } + return out; + } + if (type == ModbusRegisterType.Coils || type == ModbusRegisterType.DiscreteInputs) { + if (count <= 16) { + int v = 0; + for (int i = 0; i < count; i++) { + ModbusRegisterKey k = new ModbusRegisterKey(key.getSlaveId(), type, start + i); + Integer bit = registers.get(k); + if (bit == null) { + return null; + } + v |= (bit & 1) << i; + } + return Unpooled.buffer(2).writeShort(v); + } + int nBytes = (count + 7) / 8; + ByteBuf out = Unpooled.buffer(nBytes); + for (int b = 0; b < nBytes; b++) { + int by = 0; + for (int bit = 0; bit < 8; bit++) { + int idx = b * 8 + bit; + if (idx >= count) { + break; + } + ModbusRegisterKey k = new ModbusRegisterKey(key.getSlaveId(), type, start + idx); + Integer val = registers.get(k); + if (val == null) { + return null; + } + if ((val & 1) != 0) { + by |= (1 << bit); + } + } + out.writeByte(by); + } + return out; + } + return null; + } + + private static Object applyPropertyMapping(ModbusPropertyMapping pm, Object decoded) { + if (decoded == null) { + return null; + } + Object current = decoded; + Integer el = pm.getElementIndex(); + if (el != null && el >= 0) { + Object elem = extractElement(current, el); + if (elem == null) { + return null; + } + current = elem; + } + if (pm.isBitOperation()) { + int raw = toUInt16(current); + int bi = pm.getBitIndex() != null ? pm.getBitIndex() : 0; + int len = pm.getBitLengthSafe(); + int mask = (1 << len) - 1; + int bits = (raw >> bi) & mask; + return len == 1 ? (bits != 0) : bits; + } + if (current != null && current.getClass().isArray() && pm.getElementIndex() == null) { + int len = java.lang.reflect.Array.getLength(current); + int count = pm.getRegister().getRegisterCount(); + ModbusRegisterKey key = pm.getRegister().getKey(); + ModbusRegisterType type = key != null ? key.getType() : null; + int expectedLen = (type == ModbusRegisterType.Coils || type == ModbusRegisterType.DiscreteInputs) + ? count : count * 16; + + // bit_array(MSB-first BitArray):Modbus 线圈报文为 LSB-first,需根据所选 codec 进行位映射。 + // lsb_bit_array:Codec 已按 LSB 展开。 + String codecId = pm.getRegister().getCodec(); + + // 对于 bit_array,由于它是按字节解码的,位长度始终是 8 的倍数,需要根据 registerCount 裁剪 + // 对于其他 array 类型,如果长度超过了预期的寄存器所能承载的长度,也进行裁剪 + if (pm.getRegister().getCodec() != null && pm.getRegister().getCodec().endsWith("_array")) { + if (len > expectedLen) { + return copyArray(current, expectedLen); + } + } + } + return current; + } + + private static Object copyArray(Object array, int length) { + Object newArray = java.lang.reflect.Array.newInstance(array.getClass().getComponentType(), length); + System.arraycopy(array, 0, newArray, 0, length); + return newArray; + } + + private static int toUInt16(Object decoded) { + if (decoded instanceof Number) { + return ((Number) decoded).intValue() & 0xFFFF; + } + if (decoded instanceof Boolean) { + return ((Boolean) decoded) ? 1 : 0; + } + return 0; + } + + private static Object extractElement(Object decoded, int index) { + if (decoded instanceof List) { + List list = (List) decoded; + return index < list.size() ? list.get(index) : null; + } + if (decoded instanceof Object[]) { + Object[] arr = (Object[]) decoded; + return index < arr.length ? arr[index] : null; + } + if (decoded instanceof boolean[]) { + boolean[] arr = (boolean[]) decoded; + return index < arr.length ? arr[index] : null; + } + if (decoded instanceof byte[]) { + byte[] arr = (byte[]) decoded; + return index < arr.length ? (arr[index] & 0xFF) : null; + } + if (decoded instanceof int[]) { + int[] arr = (int[]) decoded; + return index < arr.length ? arr[index] : null; + } + if (decoded != null && decoded.getClass().isArray()) { + int len = java.lang.reflect.Array.getLength(decoded); + if (index >= len) { + return null; + } + return java.lang.reflect.Array.get(decoded, index); + } + return null; + } + + /** + * 创建配置构建器 + * + * @param thingId 物模型ID + * @return 构建器 + */ + public static Builder builder(String thingId) { + return new Builder(thingId); + } + + /** + * 配置构建器,用于快速构建映射配置 + */ + public static class Builder { + private final String thingId; + private final List properties = new ArrayList<>(); + + public Builder(String thingId) { + this.thingId = Objects.requireNonNull(thingId, "thingId cannot be null"); + } + + /** + * 添加属性映射 + * + * @param property 属性映射 + * @return this + */ + public Builder addProperty(ModbusPropertyMapping property) { + if (property != null) { + properties.add(property); + } + return this; + } + + /** + * 批量添加属性映射 + * + * @param properties 属性映射列表 + * @return this + */ + public Builder addProperties(List properties) { + if (properties != null) { + this.properties.addAll(properties); + } + return this; + } + + /** + * 构建映射配置 + * + * @return 映射配置 + */ + public ModbusThingsMapping build() { + ModbusThingsMapping mapping = new ModbusThingsMapping(); + mapping.setThingId(thingId); + mapping.setProperties(Collections.unmodifiableList(new ArrayList<>(properties))); + return mapping; + } + } + + /** + * 克隆配置,用于设备级覆盖产品级配置 + * + * @return 克隆的配置 + */ + public ModbusThingsMapping copy() { + ModbusThingsMapping copy = new ModbusThingsMapping(); + copy.setThingId(this.thingId); + copy.setProperties(this.properties); + // 不复制索引,让克隆对象在首次使用时自行构建 + return copy; + } + +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequest.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequest.java new file mode 100644 index 000000000..ed566c1f5 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequest.java @@ -0,0 +1,43 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 抽象的 Modbus 写请求描述, 默认按寄存器为单位。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ModbusWriteRequest { + + /** + * 从站ID + */ + private int slaveId; + + /** + * 功能码 + */ + private int functionCode; + + /** + * 起始寄存器地址 + */ + private int address; + + /** + * 写入寄存器数量 + */ + private int quantity; + + /** + * 要写入的寄存器值, 单位: 寄存器(每个元素2字节)。 + */ + private int[] values; + +} + diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequestBuilder.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequestBuilder.java new file mode 100644 index 000000000..1759fb5b5 --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ModbusWriteRequestBuilder.java @@ -0,0 +1,350 @@ +package org.jetlinks.community.things.helper.modbus; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.jetlinks.core.codec.Codec; +import org.jetlinks.core.codec.Codecs; +import org.jetlinks.core.codec.layout.ByteLayout; +import org.jetlinks.core.codec.layout.ByteLayouts; +import org.jetlinks.core.utils.ConverterUtils; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; + +/** + * 将物模型写入操作转换为 Modbus 写请求。 + *

+ * 主要职责: + *

    + *
  • 按寄存器聚合多个写操作;
  • + *
  • 在按位写入时基于快照进行读改写, 保证未修改位保持最新值;
  • + *
  • 在保持寄存器按位写且 {@link ModbusPropertyMapping#getUseMaskWrite()} 为 true 时生成 FC 0x16(掩码写);
  • + *
  • 生成按寄存器为单位的写请求描述。
  • + *
+ */ +public class ModbusWriteRequestBuilder { + + /** + * 根据物模型写入操作、映射配置和当前寄存器快照构建写请求。 + * + * @param operations 写入操作 + * @param mapping 映射配置 + * @param snapshot 当前寄存器快照, 用于按位读改写 + * @return 写请求列表 + */ + public List build(List operations, + ModbusThingsMapping mapping, + ModbusRegisterSnapshot snapshot) { + if (operations == null || operations.isEmpty() || mapping == null) { + return List.of(); + } + Map states = new HashMap<>(); + if (snapshot != null && snapshot.getValues() != null) { + snapshot.getValues().forEach((k, v) -> { + WriteMergeState s = new WriteMergeState(); + s.value = v & 0xFFFF; + states.put(k, s); + }); + } + + for (ThingWriteOperation operation : operations) { + if (operation == null || operation.getProperty() == null) { + continue; + } + mapping + .getProperty(operation.getProperty()) + .ifPresent(prop -> applyWriteOperation(prop, operation.getValue(), states)); + } + + if (states.isEmpty()) { + return List.of(); + } + + List requests = new ArrayList<>(); + Map>> groups = new HashMap<>(); + for (Map.Entry e : states.entrySet()) { + ModbusRegisterKey k = e.getKey(); + groups.computeIfAbsent(new SlaveFunctionKey(k.getSlaveId(), k.getType()), x -> new ArrayList<>()) + .add(e); + } + + for (Map.Entry>> groupEntry : groups.entrySet()) { + SlaveFunctionKey groupKey = groupEntry.getKey(); + List> sortedEntries = groupEntry.getValue().stream() + .sorted(Comparator.comparingInt(en -> en.getKey().getAddress())) + .toList(); + + int i = 0; + while (i < sortedEntries.size()) { + Map.Entry e = sortedEntries.get(i); + ModbusRegisterKey regKey = e.getKey(); + WriteMergeState st = e.getValue(); + ModbusRegisterType type = regKey.getType(); + + if (shouldEmitMaskWrite(st, type)) { + int andMask = 0xFFFF & ~st.replaceMaskBits; + int orMask = (st.value & st.replaceMaskBits) & 0xFFFF; + requests.add(new ModbusWriteRequest( + groupKey.slaveId, + type.getMaskWriteFunction(), + regKey.getAddress(), + 1, + new int[]{andMask, orMask})); + i++; + } else { + int startAddr = regKey.getAddress(); + List vals = new ArrayList<>(); + vals.add(st.value & 0xFFFF); + i++; + while (i < sortedEntries.size()) { + Map.Entry e2 = sortedEntries.get(i); + if (shouldEmitMaskWrite(e2.getValue(), e2.getKey().getType())) { + break; + } + if (e2.getKey().getAddress() != startAddr + vals.size()) { + break; + } + vals.add(e2.getValue().value & 0xFFFF); + i++; + } + requests.add(createRequest(groupKey, startAddr, vals)); + } + } + } + return requests; + } + + private static boolean shouldEmitMaskWrite(WriteMergeState st, ModbusRegisterType type) { + return type == ModbusRegisterType.HoldingRegisters + && type.getMaskWriteFunction() != null + && st.maskBitActivity + && !st.nonMaskActivity + && st.replaceMaskBits != 0; + } + + private ModbusWriteRequest createRequest(SlaveFunctionKey key, int startAddress, List values) { + int[] vals = new int[values.size()]; + for (int i = 0; i < values.size(); i++) { + vals[i] = values.get(i); + } + ModbusRegisterType type = key.type; + int functionCode = type != null ? type.getReadFunction() : 0; + if (type != null && !type.isReadOnly()) { + if (vals.length > 1) { + functionCode = type.getWriteMultipleFunction(); + } else { + functionCode = type.getWriteSingleFunction(); + } + } + return new ModbusWriteRequest(key.slaveId, functionCode, startAddress, vals.length, vals); + } + + @lombok.AllArgsConstructor + @lombok.EqualsAndHashCode + private static class SlaveFunctionKey { + int slaveId; + ModbusRegisterType type; + + public int getFunctionCode() { + return type != null ? type.getReadFunction() : 0; + } + } + + private static final class WriteMergeState { + int value; + /** 是否存在仅掩码方式的按位写(无整字/非掩码按位混用) */ + boolean maskBitActivity; + /** 存在整字写或非掩码按位写 */ + boolean nonMaskActivity; + int replaceMaskBits; + } + + private void applyWriteOperation(ModbusPropertyMapping mapping, + Object value, + Map states) { + ModbusRegisterDefinition def = mapping.getRegister(); + if (def == null || def.getKey() == null) { + return; + } + ModbusRegisterKey key = def.getKey(); + WriteMergeState state = states.computeIfAbsent(key, k -> new WriteMergeState()); + int original = state.value & 0xFFFF; + + Integer elementIndex = mapping.getElementIndex(); + Integer bitIndex = mapping.getBitIndex(); + + if (def.getCodec() != null && !def.getCodec().isEmpty() && elementIndex != null && elementIndex >= 0) { + Object aggregate = decodeByDefinition(original, def); + aggregate = applyElementUpdate(aggregate, elementIndex, value); + int encoded = encodeNumericValue(aggregate, def); + state.value = encoded & 0xFFFF; + state.nonMaskActivity = true; + } else if (bitIndex != null && bitIndex >= 0) { + int bitLength = mapping.getBitLength() == null ? 1 : mapping.getBitLength(); + int mask = (1 << bitLength) - 1; + int val = toInt(value) & mask; + int encoded = (original & ~(mask << bitIndex)) | (val << bitIndex); + state.value = encoded & 0xFFFF; + boolean maskWrite = Boolean.TRUE.equals(mapping.getUseMaskWrite()) + && key.getType() == ModbusRegisterType.HoldingRegisters; + if (maskWrite) { + if (!state.nonMaskActivity) { + state.maskBitActivity = true; + state.replaceMaskBits |= mapping.getBitMask(); + } + } else { + state.nonMaskActivity = true; + } + } else { + int encoded = encodeNumericValue(value, def); + state.value = encoded & 0xFFFF; + state.nonMaskActivity = true; + } + } + + private int toInt(Object value) { + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).intValue(); + } + if (value instanceof Boolean) { + return ((Boolean) value) ? 1 : 0; + } + return new BigDecimal(String.valueOf(value)).intValue(); + } + + private int encodeNumericValue(Object value, ModbusRegisterDefinition definition) { + if (value == null) { + return 0; + } + + // 优先使用 Codec + ByteLayout 进行编码(与 decode 侧 scale 对称:先逆缩放再编码) + if (definition.getCodec() != null && !definition.getCodec().isEmpty()) { + Codec codec = Codecs.getNow(definition.getCodec()); + ByteBuf buf = Unpooled.buffer(2); + Object rawForCodec = inverseScaleForEncode(value, definition); + Object toEncode = convertToCodecType(rawForCodec, codec.forType()); + @SuppressWarnings("unchecked") + Codec rawCodec = (Codec) codec; + rawCodec.encode(toEncode, buf); + + ByteLayout layout = resolveLayout(definition.getLayout()); + if (layout != null && layout.byteLength() <= buf.readableBytes()) { + ByteBuf slice = buf.slice(0, layout.byteLength()); + layout.reorder(slice); + } + + if (buf.readableBytes() >= 2) { + return buf.readUnsignedShort(); + } + if (buf.readableBytes() == 1) { + return buf.readUnsignedByte(); + } + return 0; + } + + // 兼容旧配置: 使用 scaleFactor/scale 进行编码(不再处理有符号位) + BigDecimal decimal = new BigDecimal(String.valueOf(value)); + if (definition.getScale() >= 0) { + decimal = decimal.setScale(definition.getScale(), RoundingMode.HALF_UP); + } + double scaled = decimal.doubleValue(); + if (definition.getScaleFactor() != 0D && definition.getScaleFactor() != 1D) { + scaled = scaled / definition.getScaleFactor(); + } + return (int) scaled; + } + + /** + * 写入值逆变换为 Codec 原始量:与 {@link ModbusRegisterDefinition#decode} 中 scale 语义对称。 + */ + private static Object inverseScaleForEncode(Object value, ModbusRegisterDefinition definition) { + if (value == null) { + return null; + } + if (definition.getScaleFactor() == 1D && definition.getScale() < 0) { + return value; + } + if (!(value instanceof Number)) { + return value; + } + BigDecimal decimal = new BigDecimal(String.valueOf(value)); + if (definition.getScale() >= 0) { + decimal = decimal.setScale(definition.getScale(), RoundingMode.HALF_UP); + } + double scaled = decimal.doubleValue(); + if (definition.getScaleFactor() != 0D && definition.getScaleFactor() != 1D) { + scaled = scaled / definition.getScaleFactor(); + } + return scaled; + } + + /** + * 根据寄存器定义, 使用 Codec 将当前寄存器值解码为完整结果。 + */ + private Object decodeByDefinition(int registerValue, ModbusRegisterDefinition definition) { + if (definition.getCodec() == null || definition.getCodec().isEmpty()) { + return registerValue; + } + ByteBuf buf = Unpooled.buffer(2); + buf.writeShort(registerValue & 0xFFFF); + try { + return definition.decode(buf); + } finally { + buf.release(); + } + } + + /** + * 在数组或集合中更新指定下标的元素。 + */ + @SuppressWarnings("all") + private Object applyElementUpdate(Object aggregate, int index, Object newValue) { + if (aggregate == null || index < 0) { + return aggregate; + } + Class type = aggregate.getClass(); + if (type.isArray()) { + int len = java.lang.reflect.Array.getLength(aggregate); + if (index >= len) { + return aggregate; + } + Class componentType = type.getComponentType(); + Object converted = convertToCodecType(newValue, componentType); + java.lang.reflect.Array.set(aggregate, index, converted); + return aggregate; + } + if (aggregate instanceof List) { + List list = (List) aggregate; + if (index >= list.size()) { + return aggregate; + } + Object converted = convertToCodecType(newValue, Object.class); + list.set(index, converted); + return aggregate; + } + return aggregate; + } + + @SuppressWarnings("all") + private Object convertToCodecType(Object value, Class type) { + if (value == null || type == null) { + return value; + } + if (type.isInstance(value)) { + return value; + } + return ConverterUtils.convert(value, type); + } + + private ByteLayout resolveLayout(String id) { + if (id == null || id.isEmpty()) { + return null; + } + return ByteLayouts.get(id).orElse(null); + } + +} diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ThingWriteOperation.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ThingWriteOperation.java new file mode 100644 index 000000000..d32b2f58f --- /dev/null +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/helper/modbus/ThingWriteOperation.java @@ -0,0 +1,28 @@ +package org.jetlinks.community.things.helper.modbus; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 抽象的物模型写入操作定义。 + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class ThingWriteOperation { + + /** + * 物模型属性标识 + */ + private String property; + + /** + * 写入的目标值 + */ + private Object value; + +} +