[!前言]
本项目采用spring boot框架,使用netty做网络应用程序框架。下面做全流程分析。项目源码地址:https://github.com/chenqi92/pc-electric-fence.git
报文协议
这个应该是厂商自家定义的报文协议,发过来就是一个txt,协议原文内容如下😂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 一、通讯方式 1. 串口方式: 波特率为9600bps,采用8-N-1格式 2. 网络方式: PC端做TCP服务器,默认监听端口5000 二、数据包格式 所有数据都以回车符为结束符,数据内容以空格分隔,所有数据都是有应答。 1. 应答(布撤防才会有成功和失败之说,其他的指令都回A 1) 内容: A 结果 结果: 0: 失败; 1: 成功 2: 执行成功 注:每条指令都需要回复A 1,并以回车键结束。 2. 心跳 内容: H 设备编号 通讯方式 设备类型 设备编号: 为报警主机编号 通讯方式: 0: 串口/网络; 2: GPRS 设备类型: 0: 接警机; 1: 主机 设备类型为非接警机时,设备跟PC连接建立后,立刻发送心跳包。 心跳间隔默认为10秒。 举例: 主机编号为0时:H 0 0 1 主机编号为1时:H 1 0 1 3. 事件上传 内容: E 主机编号 防区编号 事件代码 子系统号 月-日-时-分 主机编号: 接警机上报时,为接警机编号-通讯机编号-终端设备编号;主机上传时为用户编号。 防区编号: 为0时,为非防区事件。 总线主机 事件代码参考如下: 0: 防区布防事件 1: 防区撤防事件 2: 防区报警事件 3: 防区报警恢复事件 4: 设备被撬事件 5: 设备被撬恢复事件 6: 设备欠压事件 7: 设备欠压恢复事件 8: 设备连接故障事件 9: 设备连接恢复事件 10: 设备布防 11: 设备撤防 12: 设备挟持 13: 通讯机连接故障事件 14: 通讯机连接恢复事件 15: 防区旁路事件 16: 防区旁路恢复事件 2013.11.23 17: 设备紧急事件 18: 设备紧急恢复事件 19: 设备火警事件 20: 设备火警恢复事件 21: 防区布防状态 22: 防区撤防状态 23: 防区未准备 24: 防区未准备恢复 25:留守布防 26: 防区故障 27:防区故障恢复 28: 电话线故障 29: 电话线故障恢复 30: 电池故障 31: 电池恢复 32: 交流故障 33: 交流恢复 34: 围栏断线报警 35:围栏短路报警 36: 围栏电压 子系统号:为围栏的电压值 (KV) 37:触网报警 围栏电压例子 E 0001 1 36 50 //1号主机的1号围栏 电压为5.0KV 子系统号:可选字节,当主机有子系统时,带上子系统;没有时,不用带。 E 6130-0-1 4 2 0 11-7-13-2 表示6130主机1号模块4防区在11月7号13点2分 发生报警了 主机编号: 接警机上报时,为接警机编号-通讯机编号-终端设备编号; 6130-0-1 : 接警机是6130,即总线主机的系统地址, 通讯机编号是0: 表示通讯口1或2的设备, 通讯机编号是1: 表示键盘总线的设备, 终端设备编号: 当通讯机编号为0时, 终端设备编号 0-64, 其中0表示主机主板防区, 1-64表示扩展的1-64号设备 当通讯机编程为1时,表示键盘设备, 128-160 ,128表示主键盘, 129-160表示1-31号分键盘 4:表示防区4 2: 表示防区报警事件 (具体请参考 总线主机 事件代码) 0: 子系统号 固定为0 11-7-13-2: 11月7号13点2分 E 6130-1-128 0 11 0 11-7-13-3 表示 6130号主机的128号键盘(即主键盘)撤防操作在11月7号13点3分 4. 主机控制 内容: C 主机编号 防区编号 控制类型 密码 防区编号: 为0时是设备控制,否则是防区控制 控制类型: 0: 撤防 1: 布防 2: 旁路 3: 解除旁路 4:留守布防 5:高压布防 6:低压布防 例如:主机编号为6130 , C 6130-1-128 0 0 123456 表示 控制6130号主机的128号键盘(即主键盘)撤防 C 6130-1-128 0 1 123456 表示 控制6130号主机的128号键盘(即主键盘)布防 C 6130-0-1 0 1 123456 表示 控制6130号主机的1号设备布防 C 6130-0-1 1 1 123456 表示 控制6130号主机的1号设备防区1 布防 C 6130-0-1 0 0 123456 表示 控制6130号主机的1号设备撤防 C 6130-0-1 1 0 123456 表示 控制6130号主机的1号设备防区1 撤防 5. 输出点控制 内容: O 主机编号 输出点类型 输出点号 控制类型 控制时间 输出点类型: 0: 输出点 1: 灯 2: LED 控制类型: 0:断开 1:闭合 O 6130-0-1 0 1 1 10 表示 控制6130号主机的1号设备的1号输出闭合10秒 O 6130-0-1 0 1 0 0 表示 控制6130号主机的1号设备的1号输出断开
协议接收考虑
对方给了一个现场如下配置
因为不可预知对方的主机类型,从报文来看最好的方式肯定是做一个TCP服务端来接收数据,让对方配合修改客户端的发送地址即可。
开发一个服务端
开发准备
因为服务器装的jdk1.8所有spring boot也不能选择3.0+,使用的maven作为依赖管理,pom内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.lyc</groupId > <artifactId > pc-electric-fence</artifactId > <version > 0.0.1</version > <name > pc-electric-fence</name > <description > 电子围网报警数据接收</description > <url /> <licenses > <license /> </licenses > <developers > <developer /> </developers > <scm > <connection /> <developerConnection /> <tag /> <url /> </scm > <properties > <java.version > 8</java.version > <influx.version > 2.24</influx.version > <allbs-influx.version > 2.0.2</allbs-influx.version > <netty.version > 4.1.111.Final</netty.version > <spring-boot.version > 2.7.18</spring-boot.version > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <maven.compiler.version > 3.8.1</maven.compiler.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-configuration-processor</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > cn.allbs</groupId > <artifactId > allbs-influx</artifactId > <version > ${allbs-influx.version}</version > </dependency > <dependency > <groupId > org.influxdb</groupId > <artifactId > influxdb-java</artifactId > <version > ${influx.version}</version > </dependency > <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > ${netty.version}</version > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-core</artifactId > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-annotations</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-json</artifactId > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > ${spring-boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <profiles > <profile > <id > dev</id > <properties > <profiles.active > dev</profiles.active > </properties > <activation > <activeByDefault > true</activeByDefault > </activation > </profile > <profile > <id > prod</id > <properties > <profiles.active > prod</profiles.active > </properties > </profile > </profiles > <build > <finalName > ${project.name}-${project.version}</finalName > <resources > <resource > <directory > src/main/resources</directory > <filtering > true</filtering > <includes > <include > **/*</include > </includes > </resource > </resources > <pluginManagement > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > ${spring-boot.version}</version > <configuration > <executable > true</executable > </configuration > <executions > <execution > <goals > <goal > repackage</goal > </goals > </execution > </executions > </plugin > </plugins > </pluginManagement > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <configuration > <excludes > <exclude > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </exclude > </excludes > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > ${maven.compiler.version}</version > <configuration > <target > ${maven.compiler.target}</target > <source > ${maven.compiler.source}</source > <encoding > UTF-8</encoding > <skip > true</skip > </configuration > </plugin > </plugins > </build > </project >
其中allbs-influx
这个包是我自己封装的处理influxDb存和取操作的,如果没有需要可以去除,包括下面的influxdb-java
项目结构如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 pc-electric-fence ├── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── lyc │ │ │ └── pcelectricfence │ │ │ ├── constant │ │ │ │ └── CommonConstant.java │ │ │ ├── enums │ │ │ │ ├── CommunicationMode.java │ │ │ │ ├── ControlType.java │ │ │ │ ├── DeviceEnum.java │ │ │ │ ├── DeviceType.java │ │ │ │ ├── EventType.java │ │ │ │ ├── OutputControlType.java │ │ │ │ ├── OutputPointType.java │ │ │ │ └── ResponseType.java │ │ │ ├── netty │ │ │ │ ├── NettyServer.java │ │ │ │ ├── NettyServerHandlerInitializer.java │ │ │ │ └── ProtocolHandler.java │ │ │ ├── properties │ │ │ │ └── NettyServerProperties.java │ │ │ ├── utils │ │ │ │ └── CommandParserUtil.java │ │ │ └── PcElectricFenceApplication.java │ │ └── resources │ │ ├── application-dev.yml │ │ ├── application-prod.yml │ │ ├── application.yml │ │ └── logback-spring.xml │ └── test │ └── java │ └── com │ └── lyc │ └── pcelectricfence │ └── PcElectricFenceApplicationTests.java ├── README.md ├── pom.xml └── 协议.md
文件内容一览
CommonConstant
一个常量定义,只有influxdb需要储存的表名和日期格式化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface CommonConstant { /** * 电子围栏数据表 */ String INFLUXDB_DATABASE_MEASUREMENT = "cl_electronic_patrol_alarm"; /** * 时间格式 */ String NORM_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; /** * 报警信息中的时间格式 */ String ALARM_DATETIME_PATTERN = "yyyy年MM月dd日HH时mm分"; }
所有的枚举
这个没有什么好说的就是根据报文协议抽出来的东西,下面就不列了,有需要的可以到文末的源码上去看
NettyServer
主要是用来启动项目时启动server监听指定端口,还有就是编码器和解码器加上接收数据处理的handler,详细代码为如下,基本每行都有注解,就不一一阐述了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 package com.lyc.pcelectricfence.netty; import com.lyc.pcelectricfence.properties.NettyServerProperties; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.ReadTimeoutHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; /** * 类 NettyServer * * @author ChenQi * @date 2024/6/20 */ @Slf4j @Component public class NettyServer { @Resource private NettyServerProperties nettyServerProperties; /** * 心跳超时时间 */ private static final Integer READ_TIMEOUT_SECONDS = 3 * 60; /** * boss 线程组,用于服务端接受客户端的连接 */ private final EventLoopGroup bossGroup = new NioEventLoopGroup(); /** * worker 线程组,用于服务端接受客户端的数据读写 */ private final EventLoopGroup workerGroup = new NioEventLoopGroup(); @Resource private ProtocolHandler protocolHandler; /** * Netty Server Channel */ private Channel channel; /** * 启动 Netty Server */ @PostConstruct public void start() throws InterruptedException { // 创建 ServerBootstrap 对象,用于 Netty Server 启动 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 作为分隔符的数据包,防止粘包,虽然报文说是回车分隔,实际上没有,所以未使用,下面的代码注释掉同理 // ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes()); serverBootstrap.group(bossGroup, workerGroup) // 指定 Channel 为服务端 NioServerSocketChannel .channel(NioServerSocketChannel.class) // 端口 .localAddress(new InetSocketAddress(nettyServerProperties.getPort())) // 服务端接收队列的大小 .option(ChannelOption.SO_BACKLOG, 1024) // TCP Keepalive 机制,实现 TCP 层级的心跳保活功能 .childOption(ChannelOption.SO_KEEPALIVE, true) // 允许较小的数据包的发送,降低延迟 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 空闲检测 ch.pipeline().addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS)); // ch.pipeline().addLast(new DelimiterBasedFrameDecoder(16 * 1024, false, delimiter)); // 解码器,因为客户端发送的是字符串所以直接用String即可,如果是其他的需要对应的解码 ch.pipeline().addLast(new StringDecoder()); // 跟编码器同理 ch.pipeline().addLast(new StringEncoder()); // 添加连接事件处理 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("{}远程客户端连接!", ctx.channel().remoteAddress()); super.channelActive(ctx); } }); // 处理收到报文的方法 ch.pipeline().addLast(protocolHandler); } }); // 绑定端口,并同步等待成功,即启动服务端 ChannelFuture future = serverBootstrap.bind().sync(); if (future.isSuccess()) { channel = future.channel(); log.info("netty服务端已启动,启动端口为{}", nettyServerProperties.getPort()); } } @PreDestroy public void stop() { log.info("{} 服务主动断开连接!", channel.localAddress()); // 关闭 Netty Server if (channel != null) { channel.close(); } // 优雅关闭两个 EventLoopGroup 对象 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
最后是处理的handler
这个类中包含的东西不多,只是集成SimpleChannelInboundHandler
并重写channelRead0
,exceptionCaught
。一个是用来处理报文的方法,一个是用来监听异常的方法,其他代码都是根据报文内容定制。
唯一需要注意的就是注解@ChannelHandler.Sharable
这个注解的作用是标识一个 ChannelHandler
实例可以在多个ChannelPipeline
中共享使用。通常,ChannelHandler
实例是不可共享的,因为它们在处理不同的连接时可能会维护一些特定于连接的状态信息。而 @ChannelHandler.Sharable
的存在则表示这个 ChannelHandler
实例是无状态的,或者其状态对于多个连接是安全的,可以共享使用。简单点说就是为了让你这个服务端能够被多个客户端连上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 package com.lyc.pcelectricfence.netty; import cn.allbs.influx.InfluxTemplate; import com.lyc.pcelectricfence.enums.CommunicationMode; import com.lyc.pcelectricfence.enums.DeviceType; import com.lyc.pcelectricfence.enums.ResponseType; import com.lyc.pcelectricfence.utils.CommandParserUtil; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.ReadTimeoutException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import static com.lyc.pcelectricfence.constant.CommonConstant.INFLUXDB_DATABASE_MEASUREMENT; /** * 类 ProtocolHandler * * @author ChenQi * @date 2024/6/20 */ @Slf4j @Component @ChannelHandler.Sharable public class ProtocolHandler extends SimpleChannelInboundHandler<String> { @Resource private InfluxTemplate influxTemplate; @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { log.info("Received message: {}", msg); String[] parts = msg.trim().split(" "); switch (parts[0]) { case "A": handleResponse(ctx, parts); break; case "H": handleHeartbeat(ctx, parts); break; case "E": handleEventUpload(ctx, parts); break; case "C": handleControl(ctx, parts); break; case "O": handleOutputControl(ctx, parts); break; default: // Handle unknown command break; } // 发送回复消息 ctx.writeAndFlush("A 1\r\n"); } /** * 处理响应 * * @param ctx ChannelHandlerContext * @param parts 消息分割后的数组 */ private void handleResponse(ChannelHandlerContext ctx, String[] parts) { int result = Integer.parseInt(parts[1]); ResponseType responseType = ResponseType.values()[result]; log.debug("Response: {}", responseType.getDescription()); } /** * 处理心跳 * * @param ctx ChannelHandlerContext * @param parts 消息分割后的数组 */ private void handleHeartbeat(ChannelHandlerContext ctx, String[] parts) { int deviceNumber = Integer.parseInt(parts[1]); CommunicationMode communicationMode = CommunicationMode.values()[Integer.parseInt(parts[2])]; DeviceType deviceType = DeviceType.values()[Integer.parseInt(parts[3])]; log.debug("Heartbeat - Device Number: {}, Communication Mode: {}, Device Type: {}", deviceNumber, communicationMode.getDescription(), deviceType.getDescription()); } /** * 处理事件上传 * * @param ctx ChannelHandlerContext * @param parts 消息分割后的数组 */ private void handleEventUpload(ChannelHandlerContext ctx, String[] parts) { // 事件上传,储存至influxDb String command = String.join(" ", parts); Map<String, Object> map = CommandParserUtil.parseEventUploadCommand(command); log.debug("Event Upload Command: {}", map); Map<String, String> tags = new HashMap<>(); tags.put("type", "E"); tags.put("typeName", "事件上传"); // 储存至influxDb influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map); } /** * 处理控制 * * @param ctx ChannelHandlerContext * @param parts 消息分割后的数组 */ private void handleControl(ChannelHandlerContext ctx, String[] parts) { // 处理控制 TODO String command = String.join(" ", parts); Map<String, Object> map = CommandParserUtil.parseHostControlCommand(command); log.debug("Control Command: {}", map); Map<String, String> tags = new HashMap<>(); tags.put("type", "C"); tags.put("typeName", "主机控制"); // 储存至influxDb influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map); } /** * 处理输出控制 * * @param ctx ChannelHandlerContext * @param parts 消息分割后的数组 */ private void handleOutputControl(ChannelHandlerContext ctx, String[] parts) { // 输出控制 String command = String.join(" ", parts); Map<String, Object> map = CommandParserUtil.parseOutputControlCommand(command); log.debug("Output Control Command: {}", map); // 储存至influxDb Map<String, String> tags = new HashMap<>(); tags.put("type", "O"); tags.put("typeName", "输出点控制"); influxTemplate.insert(INFLUXDB_DATABASE_MEASUREMENT, tags, map); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { log.info("Read timeout occurred. {}", ctx.channel()); } else { log.info(cause.getLocalizedMessage()); } ctx.close(); } }
最后需要注意的问题
因为这个项目中并未使用spring-boot-start-web,也就是没有tomcat
或者undertow
等web框架,所以实际server.port
本来设置的6767并未使用,后来我一想,不如直接当作netty server的监听地址吧,所以NettyServerProperties
直接使用了本来配给netty server的5000端口作为监听端口,所以你会看到项目的md中有6767这个端口但又未实际使用。如果你的项目有web框架,肯定不能这么干,需要将netty sever重新配置一个另外的端口。