项目简介

  • 这是一个服务端源码
  • 使用的是hj212-2017协议进行传输
  • 监测设备通过网线接交换机,以TCP传输
  • 传输设备共监测7个点,其中3个点监测的是混合气体,4个点是单一因子
  • 存在212中的因子使用212编码,混合及不存在的使用自定义编码
  • 阈值根据中华人民共和国国家职业卫生标准GBZ 2.1—2019来定
  • 只包含实时数据传输
  • 实时数据为null使用平均值
  • 实时判断是否监测数据超标,超标数据写入influxdb,消警时将该条数据的clearTime以消警时间填充。
  • 实时数据和报警数据同步写入Redis,方便使用。
  • 服务监听端口6000,为yml中的netty.port
  • 服务默认端口server.port其实没啥用,因为不涉及web。我这边设置完全是公司开发的自动发布服务需要监听这个端口。

核心代码

首先是使用netty开发的TCP服务端

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import com.lyc.toxicharmful.config.properties.NettyProperties;  
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;

/**
* 类 NettyServer
* * @author ChenQi
* @date 2024/7/29
*/@Slf4j
@Component
public class NettyServer {

@Resource
private NettyServerHandler nettyServerHandler;

@Resource
private NettyProperties nettyProperties;

public void start() throws InterruptedException {
// 引导辅助程序
ServerBootstrap b = new ServerBootstrap();
// 通过nio方式来接收连接和处理连接
EventLoopGroup group = new NioEventLoopGroup();
try {
b.group(group);
// 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
// 设置监听端口
b.localAddress(new InetSocketAddress(nettyProperties.getPort()));
// 有连接到达时会创建一个channel
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//解决连包问题
ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(16 * 1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
// pipeline管理channel中的Handler,在channel队列中添加一个handler来处理业务
ch.pipeline().addLast("myHandler", nettyServerHandler);
}
});
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture f = b.bind().sync();
log.info("{} started and listen on {}", NettyServer.class.getName(), f.channel().localAddress());
f.channel().closeFuture().sync();// 应用程序会一直等待,直到channel关闭
} catch (Exception e) {
log.error("NettyServer start error", e);
} finally {
// 关闭EventLoopGroup,释放掉所有资源包括创建的线程
group.shutdownGracefully().sync();
}
}
}

NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import cn.allbs.hj212.enums.GBT16706;  
import cn.allbs.hj212.format.T212Generator;
import cn.allbs.hj212.format.T212Mapper;
import cn.allbs.hj212.model.HjData;
import com.lyc.toxicharmful.service.MessageFactory;
import com.lyc.toxicharmful.utils.MsgHandleUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* 类 NettyServerHandler
* * @author ChenQi
* &#064;date 2024/7/29
*/@Slf4j
@ChannelHandler.Sharable
@Component
@AllArgsConstructor
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

private final MessageFactory messageFactory;

private static final Map<String, SocketChannel> channelMap = new ConcurrentHashMap<>();

/**
* 连接后回调
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("client:{} is connected", MsgHandleUtils.getIPPortString(ctx));
}

/**
* 接收到报文回调
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// 获取企业mn号与排口code对应编号
String msgStr = new String(msg.toString().getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8);
log.info("{}接收到三废排口报文:{}", LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE), msgStr);
T212Mapper mapper = new T212Mapper().enableDefaultParserFeatures().enableDefaultVerifyFeatures();
HjData data = mapper.readData(msgStr);
if (data == null || data.getCp() == null || !StringUtils.hasText(data.getMn())) {
log.error("缺少必要信息或格式不正确");
return;
}
if (Optional.of(data).map(HjData::getMn).isPresent()) {
channelMap.put(data.getMn(), (SocketChannel) ctx.channel());
} else {
log.info("传的报文中mn为空,关闭该通道");
ctx.channel().close();
}
if (GBT16706._91.value().equals(data.getSt())) {
log.info("数据{}", data);
messageFactory.systemAction(data);
} else {
log.info("数据储存{}", data);
messageFactory.hj212DataSave(data);
}
} catch (Exception e) {
log.error("处理报文发生异常{}", e.getLocalizedMessage());
}
}

/**
* 方法功能: 发送系统报文信息
*
* @param mn mn号
*/
public void sengMess(String mn, String ms) {
SocketChannel sc = channelMap.get(mn);
StringWriter writer = new StringWriter();
try (T212Generator generator = new T212Generator(writer)) {
generator.writeHeader();
generator.writeDataAndLenAndCrc(ms.toCharArray());
generator.writeFooter();
// 查询redis中是否有需要发送的数据
ByteBuf encoded = sc.alloc().buffer(writer.toString().length());
encoded.writeBytes(writer.toString().getBytes("GBK"));
sc.writeAndFlush(encoded).addListener((ChannelFutureListener) future -> log.info("回复:{}", writer));
} catch (Exception e) {
log.error("数据发送失败");
}

}

/**
* 数据处理完成回调
*
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
log.info("channelReadComplete{}", LocalDateTime.now());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//cause.printStackTrace();// 捕捉异常信息
// 出现异常时关闭channel
ctx.close();
}

/**
* 客户端断开连接回调
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// channel失效处理,客户端下线或者强制退出等任何情况都触发这个方法
log.info("{} 断开连接", MsgHandleUtils.getIPPortString(ctx));
ctx.channel().close();
super.channelInactive(ctx);
}
}

当然必不可少的项目启动时启动netty服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@EnableAsync  
@EnableAllbsInflux
@SpringBootApplication
public class PcToxicAndHarmfulApplication implements CommandLineRunner {

@Resource
private NettyServer nettyServer;

public static void main(String[] args) {
SpringApplication.run(PcToxicAndHarmfulApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
nettyServer.start();
}

}

业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import cn.allbs.hj212.model.CpData;  
import cn.allbs.hj212.model.HjData;
import cn.allbs.hj212.model.Pollution;
import cn.allbs.influx.InfluxTemplate;
import com.lyc.toxicharmful.config.enums.GasType;
import com.lyc.toxicharmful.dto.AlarmFieldDTO;
import com.lyc.toxicharmful.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA;
import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA_ALARM;
import static com.lyc.toxicharmful.config.constant.CommonConstant.*;

/**
* 类 MessageFactoryImpl
* * @author ChenQi
* &#064;date 2024/7/29
*/@Slf4j
@Service
public class MessageFactoryImpl implements MessageFactory {

@Resource
private RedisTemplate<String, Object> redisTemplate;

@Resource
private InfluxTemplate influxTemplate;

@Override
public void hj212DataSave(HjData hjData) {
log.info("接收到的212数据{}", hjData);
// 储存influxdb
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put("mn", Optional.ofNullable(hjData).map(HjData::getMn).orElse(""));
tags.put("qnTime", Optional.ofNullable(hjData).map(a -> DateUtil.timeFormatWithMs(a.getQn())).orElse(""));
tags.put("dataTime", Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getDataTime).map(DateUtil::timeFormat).orElse(""));
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + Optional.ofNullable(hjData).map(HjData::getMn).orElse(""), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
if (Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getPollution).isPresent()) {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
hjData.getCp().getPollution().forEach((key, value) -> {
Optional<BigDecimal> rtd = Optional.ofNullable(value.getRtd());
Optional<BigDecimal> avg = Optional.ofNullable(value.getAvg());
if (rtd.isPresent() || avg.isPresent()) {
BigDecimal valueToCheck = rtd.orElseGet(avg::get);
fields.put(key, valueToCheck);
// redis储存实时数据
try {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), key, valueToCheck);
} catch (Exception e) {
log.error("保存212实时数据失败,数据{},原因{}", hjData, e.getLocalizedMessage());
}
checkAndHandleAlarm(key, valueToCheck);
}
fields.put(key + "_flag", Optional.of(value).map(Pollution::getFlag).orElse(null));
fields.put(key + "_SampleTime", Optional.of(value).map(Pollution::getSampleTime).orElse(null));
});
}
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_DATA, tags, fields);
}

/**
* 检查是否报警
*
* @param key key
* @param value value
*/ private void checkAndHandleAlarm(String key, BigDecimal value) {
for (GasType factor : GasType.values()) {
if (key.equals(factor.getCode())) {
if (value.compareTo(BigDecimal.valueOf(factor.getLevelTwoThreshold())) >= 0) {
log.warn("报警: {} 超过二级阈值 {}", key, factor.getLevelTwoThreshold());
sendAlarmAsync(key, value, 2, factor);
} else if (value.compareTo(BigDecimal.valueOf(factor.getLevelOneThreshold())) >= 0) {
log.warn("报警: {} 超过一级阈值 {}", key, factor.getLevelOneThreshold());
sendAlarmAsync(key, value, 1, factor);
} else {
log.info("正常: {} 未超过阈值", key);
// 消警
clearAlarmAsync(key, factor.getName());
}
break;
}
}
}

/**
* 发送报警数据
*
* @param key 因子编码
* @param value 监测值
* @param level 等级 1级 2级
* @param factor 枚举
*/
public void sendAlarmAsync(String key, BigDecimal value, Integer level, GasType factor) {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN));
try {
if (Boolean.FALSE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "startTime", time);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "name", factor.getName());
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "unit", factor.getUnit());
// 不存在则新增influxdb数据
// InfluxDB报警数据
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put("code", key);
tags.put("name", factor.getName());
fields.put("value", value);
fields.put("alarmTime", time);
fields.put("unit", factor.getUnit());
fields.put("level", level);
// 额外添加type, 0为报警 1为消警
fields.put("type", 0);
fields.put("threshold", factor.getLevelOneThreshold() * level);
fields.put("content", String.format("监测点位%s编号%s于%s发生%s级报警,报警值%s%s,报警阈值%s%s", factor.getName(), key, time, level, value, factor.getUnit(), factor.getLevelOneThreshold() * level, factor.getUnit()));
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields);
}
// Redis报警数据
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "level", level);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "value", value);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "threshold", factor.getLevelOneThreshold() * level);

} catch (Exception e) {
log.error("发送报警数据失败, key: {}, value: {}, level: {}, 原因: {}", key, value, level, e.getLocalizedMessage());
}
}

/**
* 清除报警
*
* @param key key
*/ public void clearAlarmAsync(String key, String name) {
if (Boolean.TRUE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {
log.info("移除{}报警缓存,添加消警时间", key);
// 移除redis报警
redisTemplate.delete(TOXIC_AND_HARMFUL_DATA_ALARM + key);
// 获取当前点位未消警的报警数据,删除后插入带有结束时间的数据
String command = "SELECT * FROM " + DB_TOXIC_AND_HARMFUL_ALARM + "\n" +
"WHERE \"code\" = '" + key + "' AND \"type\" = 0\n" +
"ORDER BY time DESC\n" +
"LIMIT 1";
List<AlarmFieldDTO> dataList = influxTemplate.queryBeanList(command, AlarmFieldDTO.class);
// 修改数据
if (!dataList.isEmpty()) {
AlarmFieldDTO fieldDTO = dataList.get(0);
Map<String, String> tags = new HashMap<>();
tags.put("code", key);
tags.put("name", name);
Map<String, Object> fields = new HashMap<>();
fields.put("value", fieldDTO.getValue());
fields.put("alarmTime", fieldDTO.getAlarmTime());
fields.put("unit", fieldDTO.getUnit());
fields.put("level", fieldDTO.getLevel());
fields.put("threshold", fieldDTO.getThreshold());
fields.put("content", fieldDTO.getContent());
fields.put("type", 1);
fields.put("clearTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
// 移除原数据
influxTemplate.query("DELETE FROM " + DB_TOXIC_AND_HARMFUL_ALARM + " WHERE time = '" + new SimpleDateFormat(UTC_MS_WITH_ZONE_OFFSET_PATTERN).format(fieldDTO.getTime()) + "'");
// 增加数据
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields, fieldDTO.getTime().getTime(), TimeUnit.MILLISECONDS, ZoneOffset.of("+08:00"));
}
}
}

@Override
public void systemAction(HjData hjData) {
log.info("执行系统指令{}", hjData);
}
}

引入的两个处理jar包说明

  • allbs-hj212 参考代码https://github.com/tracy4262/hj212.git自己重新做了封装,且发布到了maven公共库便于引入使用。用来处理hj212报文协议,解析数据
  • allbs-influx 自己做的influxdb相关操作的封装,地址https://github.com/chenqi92/allbs-influx.git也上传至maven公共库中便于引入,如果你们需要可以使用2.0.2版本,2.1.0版本没测所以使用有点问题,后面会修改并且增加一些便捷方法。

源码地址

我上面的核心代码并不完整,可以直接去看源码。如果对你有用,给个star哦😘源码地址:

https://github.com/chenqi92/pc-toxic-and-harmful.git