使用java开发了一个有毒可燃气体监测数据接收的服务端
项目简介
- 这是一个服务端源码
- 使用的是hj212-2017协议进行传输
- 监测设备通过网线接交换机,以TCP传输
- 传输设备共监测7个点,其中3个点监测的是混合气体,4个点是单一因子
- 存在212中的因子使用212编码,混合及不存在的使用自定义编码
- 阈值根据中华人民共和国国家职业卫生标准GBZ 2.1—2019来定
- 只包含实时数据传输
- 实时数据为null使用平均值
- 实时判断是否监测数据超标,超标数据写入influxdb,消警时将该条数据的
clearTime
以消警时间填充。 - 实时数据和报警数据同步写入Redis,方便使用。
- 服务监听端口
6000
,为yml中的netty.port
- 服务默认端口
server.port
其实没啥用,因为不涉及web。我这边设置完全是公司开发的自动发布服务需要监听这个端口。
核心代码
首先是使用netty开发的TCP服务端
NettyServer
import com.lyc.toxicharmful.config.properties.NettyProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* 类 NettyServer
* * @author ChenQi
* @date 2024/7/29
*/@Slf4j
@Component
public class NettyServer {
@Resource
private NettyServerHandler nettyServerHandler;
@Resource
private NettyProperties nettyProperties;
public void start() throws InterruptedException {
// 引导辅助程序
ServerBootstrap b = new ServerBootstrap();
// 通过nio方式来接收连接和处理连接
EventLoopGroup group = new NioEventLoopGroup();
try {
b.group(group);
// 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
// 设置监听端口
b.localAddress(new InetSocketAddress(nettyProperties.getPort()));
// 有连接到达时会创建一个channel
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//解决连包问题
ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(16 * 1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
// pipeline管理channel中的Handler,在channel队列中添加一个handler来处理业务
ch.pipeline().addLast("myHandler", nettyServerHandler);
}
});
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture f = b.bind().sync();
log.info("{} started and listen on {}", NettyServer.class.getName(), f.channel().localAddress());
f.channel().closeFuture().sync();// 应用程序会一直等待,直到channel关闭
} catch (Exception e) {
log.error("NettyServer start error", e);
} finally {
// 关闭EventLoopGroup,释放掉所有资源包括创建的线程
group.shutdownGracefully().sync();
}
}
}
NettyServerHandler
import cn.allbs.hj212.enums.GBT16706;
import cn.allbs.hj212.format.T212Generator;
import cn.allbs.hj212.format.T212Mapper;
import cn.allbs.hj212.model.HjData;
import com.lyc.toxicharmful.service.MessageFactory;
import com.lyc.toxicharmful.utils.MsgHandleUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* 类 NettyServerHandler
* * @author ChenQi
* @date 2024/7/29
*/@Slf4j
@ChannelHandler.Sharable
@Component
@AllArgsConstructor
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final MessageFactory messageFactory;
private static final Map<String, SocketChannel> channelMap = new ConcurrentHashMap<>();
/**
* 连接后回调
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("client:{} is connected", MsgHandleUtils.getIPPortString(ctx));
}
/**
* 接收到报文回调
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// 获取企业mn号与排口code对应编号
String msgStr = new String(msg.toString().getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8);
log.info("{}接收到三废排口报文:{}", LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE), msgStr);
T212Mapper mapper = new T212Mapper().enableDefaultParserFeatures().enableDefaultVerifyFeatures();
HjData data = mapper.readData(msgStr);
if (data == null || data.getCp() == null || !StringUtils.hasText(data.getMn())) {
log.error("缺少必要信息或格式不正确");
return;
}
if (Optional.of(data).map(HjData::getMn).isPresent()) {
channelMap.put(data.getMn(), (SocketChannel) ctx.channel());
} else {
log.info("传的报文中mn为空,关闭该通道");
ctx.channel().close();
}
if (GBT16706._91.value().equals(data.getSt())) {
log.info("数据{}", data);
messageFactory.systemAction(data);
} else {
log.info("数据储存{}", data);
messageFactory.hj212DataSave(data);
}
} catch (Exception e) {
log.error("处理报文发生异常{}", e.getLocalizedMessage());
}
}
/**
* 方法功能: 发送系统报文信息
*
* @param mn mn号
*/
public void sengMess(String mn, String ms) {
SocketChannel sc = channelMap.get(mn);
StringWriter writer = new StringWriter();
try (T212Generator generator = new T212Generator(writer)) {
generator.writeHeader();
generator.writeDataAndLenAndCrc(ms.toCharArray());
generator.writeFooter();
// 查询redis中是否有需要发送的数据
ByteBuf encoded = sc.alloc().buffer(writer.toString().length());
encoded.writeBytes(writer.toString().getBytes("GBK"));
sc.writeAndFlush(encoded).addListener((ChannelFutureListener) future -> log.info("回复:{}", writer));
} catch (Exception e) {
log.error("数据发送失败");
}
}
/**
* 数据处理完成回调
*
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
log.info("channelReadComplete{}", LocalDateTime.now());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//cause.printStackTrace();// 捕捉异常信息
// 出现异常时关闭channel
ctx.close();
}
/**
* 客户端断开连接回调
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// channel失效处理,客户端下线或者强制退出等任何情况都触发这个方法
log.info("{} 断开连接", MsgHandleUtils.getIPPortString(ctx));
ctx.channel().close();
super.channelInactive(ctx);
}
}
当然必不可少的项目启动时启动netty服务
@EnableAsync
@EnableAllbsInflux
@SpringBootApplication
public class PcToxicAndHarmfulApplication implements CommandLineRunner {
@Resource
private NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(PcToxicAndHarmfulApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
nettyServer.start();
}
}
业务逻辑
import cn.allbs.hj212.model.CpData;
import cn.allbs.hj212.model.HjData;
import cn.allbs.hj212.model.Pollution;
import cn.allbs.influx.InfluxTemplate;
import com.lyc.toxicharmful.config.enums.GasType;
import com.lyc.toxicharmful.dto.AlarmFieldDTO;
import com.lyc.toxicharmful.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA;
import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA_ALARM;
import static com.lyc.toxicharmful.config.constant.CommonConstant.*;
/**
* 类 MessageFactoryImpl
* * @author ChenQi
* @date 2024/7/29
*/@Slf4j
@Service
public class MessageFactoryImpl implements MessageFactory {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private InfluxTemplate influxTemplate;
@Override
public void hj212DataSave(HjData hjData) {
log.info("接收到的212数据{}", hjData);
// 储存influxdb
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put("mn", Optional.ofNullable(hjData).map(HjData::getMn).orElse(""));
tags.put("qnTime", Optional.ofNullable(hjData).map(a -> DateUtil.timeFormatWithMs(a.getQn())).orElse(""));
tags.put("dataTime", Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getDataTime).map(DateUtil::timeFormat).orElse(""));
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + Optional.ofNullable(hjData).map(HjData::getMn).orElse(""), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
if (Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getPollution).isPresent()) {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
hjData.getCp().getPollution().forEach((key, value) -> {
Optional<BigDecimal> rtd = Optional.ofNullable(value.getRtd());
Optional<BigDecimal> avg = Optional.ofNullable(value.getAvg());
if (rtd.isPresent() || avg.isPresent()) {
BigDecimal valueToCheck = rtd.orElseGet(avg::get);
fields.put(key, valueToCheck);
// redis储存实时数据
try {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), key, valueToCheck);
} catch (Exception e) {
log.error("保存212实时数据失败,数据{},原因{}", hjData, e.getLocalizedMessage());
}
checkAndHandleAlarm(key, valueToCheck);
}
fields.put(key + "_flag", Optional.of(value).map(Pollution::getFlag).orElse(null));
fields.put(key + "_SampleTime", Optional.of(value).map(Pollution::getSampleTime).orElse(null));
});
}
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_DATA, tags, fields);
}
/**
* 检查是否报警
*
* @param key key
* @param value value
*/ private void checkAndHandleAlarm(String key, BigDecimal value) {
for (GasType factor : GasType.values()) {
if (key.equals(factor.getCode())) {
if (value.compareTo(BigDecimal.valueOf(factor.getLevelTwoThreshold())) >= 0) {
log.warn("报警: {} 超过二级阈值 {}", key, factor.getLevelTwoThreshold());
sendAlarmAsync(key, value, 2, factor);
} else if (value.compareTo(BigDecimal.valueOf(factor.getLevelOneThreshold())) >= 0) {
log.warn("报警: {} 超过一级阈值 {}", key, factor.getLevelOneThreshold());
sendAlarmAsync(key, value, 1, factor);
} else {
log.info("正常: {} 未超过阈值", key);
// 消警
clearAlarmAsync(key, factor.getName());
}
break;
}
}
}
/**
* 发送报警数据
*
* @param key 因子编码
* @param value 监测值
* @param level 等级 1级 2级
* @param factor 枚举
*/
public void sendAlarmAsync(String key, BigDecimal value, Integer level, GasType factor) {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN));
try {
if (Boolean.FALSE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "startTime", time);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "name", factor.getName());
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "unit", factor.getUnit());
// 不存在则新增influxdb数据
// InfluxDB报警数据
Map<String, String> tags = new HashMap<>();
Map<String, Object> fields = new HashMap<>();
tags.put("code", key);
tags.put("name", factor.getName());
fields.put("value", value);
fields.put("alarmTime", time);
fields.put("unit", factor.getUnit());
fields.put("level", level);
// 额外添加type, 0为报警 1为消警
fields.put("type", 0);
fields.put("threshold", factor.getLevelOneThreshold() * level);
fields.put("content", String.format("监测点位%s编号%s于%s发生%s级报警,报警值%s%s,报警阈值%s%s", factor.getName(), key, time, level, value, factor.getUnit(), factor.getLevelOneThreshold() * level, factor.getUnit()));
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields);
}
// Redis报警数据
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "level", level);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "value", value);
redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "threshold", factor.getLevelOneThreshold() * level);
} catch (Exception e) {
log.error("发送报警数据失败, key: {}, value: {}, level: {}, 原因: {}", key, value, level, e.getLocalizedMessage());
}
}
/**
* 清除报警
*
* @param key key
*/ public void clearAlarmAsync(String key, String name) {
if (Boolean.TRUE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {
log.info("移除{}报警缓存,添加消警时间", key);
// 移除redis报警
redisTemplate.delete(TOXIC_AND_HARMFUL_DATA_ALARM + key);
// 获取当前点位未消警的报警数据,删除后插入带有结束时间的数据
String command = "SELECT * FROM " + DB_TOXIC_AND_HARMFUL_ALARM + "\n" +
"WHERE \"code\" = '" + key + "' AND \"type\" = 0\n" +
"ORDER BY time DESC\n" +
"LIMIT 1";
List<AlarmFieldDTO> dataList = influxTemplate.queryBeanList(command, AlarmFieldDTO.class);
// 修改数据
if (!dataList.isEmpty()) {
AlarmFieldDTO fieldDTO = dataList.get(0);
Map<String, String> tags = new HashMap<>();
tags.put("code", key);
tags.put("name", name);
Map<String, Object> fields = new HashMap<>();
fields.put("value", fieldDTO.getValue());
fields.put("alarmTime", fieldDTO.getAlarmTime());
fields.put("unit", fieldDTO.getUnit());
fields.put("level", fieldDTO.getLevel());
fields.put("threshold", fieldDTO.getThreshold());
fields.put("content", fieldDTO.getContent());
fields.put("type", 1);
fields.put("clearTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));
// 移除原数据
influxTemplate.query("DELETE FROM " + DB_TOXIC_AND_HARMFUL_ALARM + " WHERE time = '" + new SimpleDateFormat(UTC_MS_WITH_ZONE_OFFSET_PATTERN).format(fieldDTO.getTime()) + "'");
// 增加数据
influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields, fieldDTO.getTime().getTime(), TimeUnit.MILLISECONDS, ZoneOffset.of("+08:00"));
}
}
}
@Override
public void systemAction(HjData hjData) {
log.info("执行系统指令{}", hjData);
}
}
引入的两个处理jar包说明
- allbs-hj212 参考代码
https://github.com/tracy4262/hj212.git
自己重新做了封装,且发布到了maven公共库便于引入使用。用来处理hj212报文协议,解析数据 - allbs-influx 自己做的influxdb相关操作的封装,地址
https://github.com/chenqi92/allbs-influx.git
也上传至maven公共库中便于引入,如果你们需要可以使用2.0.2
版本,2.1.0
版本没测所以使用有点问题,后面会修改并且增加一些便捷方法。
源码地址
我上面的核心代码并不完整,可以直接去看源码。如果对你有用,给个star哦😘源码地址:
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ALLBS!
评论