项目简介

  • 这是一个服务端源码
  • 使用的是hj212-2017协议进行传输
  • 监测设备通过网线接交换机,以TCP传输
  • 传输设备共监测7个点,其中3个点监测的是混合气体,4个点是单一因子
  • 存在212中的因子使用212编码,混合及不存在的使用自定义编码
  • 阈值根据中华人民共和国国家职业卫生标准GBZ 2.1—2019来定
  • 只包含实时数据传输
  • 实时数据为null使用平均值
  • 实时判断是否监测数据超标,超标数据写入influxdb,消警时将该条数据的clearTime以消警时间填充。
  • 实时数据和报警数据同步写入Redis,方便使用。
  • 服务监听端口6000,为yml中的netty.port
  • 服务默认端口server.port其实没啥用,因为不涉及web。我这边设置完全是公司开发的自动发布服务需要监听这个端口。

核心代码

首先是使用netty开发的TCP服务端

NettyServer

import com.lyc.toxicharmful.config.properties.NettyProperties;  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.buffer.ByteBuf;  
import io.netty.buffer.Unpooled;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioServerSocketChannel;  
import io.netty.handler.codec.DelimiterBasedFrameDecoder;  
import io.netty.handler.codec.string.StringDecoder;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.stereotype.Component;  
  
import javax.annotation.Resource;  
import java.net.InetSocketAddress;  
  
/**  
 * 类 NettyServer  
 * * @author ChenQi  
 * @date 2024/7/29  
 */@Slf4j  
@Component  
public class NettyServer {  
  
    @Resource  
    private NettyServerHandler nettyServerHandler;  
  
    @Resource  
    private NettyProperties nettyProperties;  
  
    public void start() throws InterruptedException {  
        // 引导辅助程序  
        ServerBootstrap b = new ServerBootstrap();  
        // 通过nio方式来接收连接和处理连接  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            b.group(group);  
            // 设置nio类型的channel  
            b.channel(NioServerSocketChannel.class);  
            // 设置监听端口  
            b.localAddress(new InetSocketAddress(nettyProperties.getPort()));  
            // 有连接到达时会创建一个channel  
            b.childHandler(new ChannelInitializer<SocketChannel>() {  
                @Override  
                protected void initChannel(SocketChannel ch) throws Exception {  
                    //解决连包问题  
                    ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes());  
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(16 * 1024, delimiter));  
                    ch.pipeline().addLast(new StringDecoder());  
                    // pipeline管理channel中的Handler,在channel队列中添加一个handler来处理业务  
                    ch.pipeline().addLast("myHandler", nettyServerHandler);  
                }  
            });  
            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功  
            ChannelFuture f = b.bind().sync();  
            log.info("{} started and listen on {}", NettyServer.class.getName(), f.channel().localAddress());  
            f.channel().closeFuture().sync();// 应用程序会一直等待,直到channel关闭  
        } catch (Exception e) {  
            log.error("NettyServer start error", e);  
        } finally {  
            // 关闭EventLoopGroup,释放掉所有资源包括创建的线程  
            group.shutdownGracefully().sync();  
        }  
    }  
}

NettyServerHandler

import cn.allbs.hj212.enums.GBT16706;  
import cn.allbs.hj212.format.T212Generator;  
import cn.allbs.hj212.format.T212Mapper;  
import cn.allbs.hj212.model.HjData;  
import com.lyc.toxicharmful.service.MessageFactory;  
import com.lyc.toxicharmful.utils.MsgHandleUtils;  
import io.netty.buffer.ByteBuf;  
import io.netty.channel.ChannelFutureListener;  
import io.netty.channel.ChannelHandler;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInboundHandlerAdapter;  
import io.netty.channel.socket.SocketChannel;  
import lombok.AllArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.stereotype.Component;  
import org.springframework.util.StringUtils;  
  
import java.io.StringWriter;  
import java.nio.charset.StandardCharsets;  
import java.time.LocalDateTime;  
import java.time.format.DateTimeFormatter;  
import java.util.Map;  
import java.util.Optional;  
import java.util.concurrent.ConcurrentHashMap;  
  
/**  
 * 类 NettyServerHandler  
 * * @author ChenQi  
 * &#064;date 2024/7/29  
 */@Slf4j  
@ChannelHandler.Sharable  
@Component  
@AllArgsConstructor  
public class NettyServerHandler extends ChannelInboundHandlerAdapter {  
  
    private final MessageFactory messageFactory;  
  
    private static final Map<String, SocketChannel> channelMap = new ConcurrentHashMap<>();  
  
    /**  
     * 连接后回调  
     */  
    @Override  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {  
        super.channelActive(ctx);  
        log.info("client:{} is connected", MsgHandleUtils.getIPPortString(ctx));  
    }  
  
    /**  
     * 接收到报文回调  
     */  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) {  
        try {  
            // 获取企业mn号与排口code对应编号  
            String msgStr = new String(msg.toString().getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8);  
            log.info("{}接收到三废排口报文:{}", LocalDateTime.now().format(DateTimeFormatter.BASIC_ISO_DATE), msgStr);  
            T212Mapper mapper = new T212Mapper().enableDefaultParserFeatures().enableDefaultVerifyFeatures();  
            HjData data = mapper.readData(msgStr);  
            if (data == null || data.getCp() == null || !StringUtils.hasText(data.getMn())) {  
                log.error("缺少必要信息或格式不正确");  
                return;  
            }  
            if (Optional.of(data).map(HjData::getMn).isPresent()) {  
                channelMap.put(data.getMn(), (SocketChannel) ctx.channel());  
            } else {  
                log.info("传的报文中mn为空,关闭该通道");  
                ctx.channel().close();  
            }  
            if (GBT16706._91.value().equals(data.getSt())) {  
                log.info("数据{}", data);  
                messageFactory.systemAction(data);  
            } else {  
                log.info("数据储存{}", data);  
                messageFactory.hj212DataSave(data);  
            }  
        } catch (Exception e) {  
            log.error("处理报文发生异常{}", e.getLocalizedMessage());  
        }  
    }  
  
    /**  
     * 方法功能: 发送系统报文信息  
     *  
     * @param mn mn号  
     */  
    public void sengMess(String mn, String ms) {  
        SocketChannel sc = channelMap.get(mn);  
        StringWriter writer = new StringWriter();  
        try (T212Generator generator = new T212Generator(writer)) {  
            generator.writeHeader();  
            generator.writeDataAndLenAndCrc(ms.toCharArray());  
            generator.writeFooter();  
            // 查询redis中是否有需要发送的数据  
            ByteBuf encoded = sc.alloc().buffer(writer.toString().length());  
            encoded.writeBytes(writer.toString().getBytes("GBK"));  
            sc.writeAndFlush(encoded).addListener((ChannelFutureListener) future -> log.info("回复:{}", writer));  
        } catch (Exception e) {  
            log.error("数据发送失败");  
        }  
  
    }  
  
    /**  
     * 数据处理完成回调  
     *  
     * @param ctx  
     */  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) {  
        log.info("channelReadComplete{}", LocalDateTime.now());  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        //cause.printStackTrace();// 捕捉异常信息  
        // 出现异常时关闭channel  
        ctx.close();  
    }  
  
    /**  
     * 客户端断开连接回调  
     */  
    @Override  
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
        // channel失效处理,客户端下线或者强制退出等任何情况都触发这个方法  
        log.info("{} 断开连接", MsgHandleUtils.getIPPortString(ctx));  
        ctx.channel().close();  
        super.channelInactive(ctx);  
    }  
}

当然必不可少的项目启动时启动netty服务

@EnableAsync  
@EnableAllbsInflux  
@SpringBootApplication  
public class PcToxicAndHarmfulApplication implements CommandLineRunner {  
  
    @Resource  
    private NettyServer nettyServer;  
  
    public static void main(String[] args) {  
        SpringApplication.run(PcToxicAndHarmfulApplication.class, args);  
    }  
  
    @Override  
    public void run(String... args) throws Exception {  
        nettyServer.start();  
    }  
  
}

业务逻辑

import cn.allbs.hj212.model.CpData;  
import cn.allbs.hj212.model.HjData;  
import cn.allbs.hj212.model.Pollution;  
import cn.allbs.influx.InfluxTemplate;  
import com.lyc.toxicharmful.config.enums.GasType;  
import com.lyc.toxicharmful.dto.AlarmFieldDTO;  
import com.lyc.toxicharmful.utils.DateUtil;  
import lombok.extern.slf4j.Slf4j;  
import org.springframework.data.redis.core.RedisTemplate;  
import org.springframework.stereotype.Service;  
  
import javax.annotation.Resource;  
import java.math.BigDecimal;  
import java.text.SimpleDateFormat;  
import java.time.LocalDateTime;  
import java.time.ZoneOffset;  
import java.time.format.DateTimeFormatter;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Optional;  
import java.util.concurrent.TimeUnit;  
  
import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA;  
import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA_ALARM;  
import static com.lyc.toxicharmful.config.constant.CommonConstant.*;  
  
/**  
 * 类 MessageFactoryImpl  
 * * @author ChenQi  
 * &#064;date 2024/7/29  
 */@Slf4j  
@Service  
public class MessageFactoryImpl implements MessageFactory {  
  
    @Resource  
    private RedisTemplate<String, Object> redisTemplate;  
  
    @Resource  
    private InfluxTemplate influxTemplate;  
  
    @Override  
    public void hj212DataSave(HjData hjData) {  
        log.info("接收到的212数据{}", hjData);  
        // 储存influxdb  
        Map<String, String> tags = new HashMap<>();  
        Map<String, Object> fields = new HashMap<>();  
        tags.put("mn", Optional.ofNullable(hjData).map(HjData::getMn).orElse(""));  
        tags.put("qnTime", Optional.ofNullable(hjData).map(a -> DateUtil.timeFormatWithMs(a.getQn())).orElse(""));  
        tags.put("dataTime", Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getDataTime).map(DateUtil::timeFormat).orElse(""));  
        redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + Optional.ofNullable(hjData).map(HjData::getMn).orElse(""), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));  
        if (Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getPollution).isPresent()) {  
            redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));  
            hjData.getCp().getPollution().forEach((key, value) -> {  
                Optional<BigDecimal> rtd = Optional.ofNullable(value.getRtd());  
                Optional<BigDecimal> avg = Optional.ofNullable(value.getAvg());  
                if (rtd.isPresent() || avg.isPresent()) {  
                    BigDecimal valueToCheck = rtd.orElseGet(avg::get);  
                    fields.put(key, valueToCheck);  
                    // redis储存实时数据  
                    try {  
                        redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), key, valueToCheck);  
                    } catch (Exception e) {  
                        log.error("保存212实时数据失败,数据{},原因{}", hjData, e.getLocalizedMessage());  
                    }  
                    checkAndHandleAlarm(key, valueToCheck);  
                }  
                fields.put(key + "_flag", Optional.of(value).map(Pollution::getFlag).orElse(null));  
                fields.put(key + "_SampleTime", Optional.of(value).map(Pollution::getSampleTime).orElse(null));  
            });  
        }  
        influxTemplate.insert(DB_TOXIC_AND_HARMFUL_DATA, tags, fields);  
    }  
  
    /**  
     * 检查是否报警  
     *  
     * @param key   key  
     * @param value value  
     */    private void checkAndHandleAlarm(String key, BigDecimal value) {  
        for (GasType factor : GasType.values()) {  
            if (key.equals(factor.getCode())) {  
                if (value.compareTo(BigDecimal.valueOf(factor.getLevelTwoThreshold())) >= 0) {  
                    log.warn("报警: {} 超过二级阈值 {}", key, factor.getLevelTwoThreshold());  
                    sendAlarmAsync(key, value, 2, factor);  
                } else if (value.compareTo(BigDecimal.valueOf(factor.getLevelOneThreshold())) >= 0) {  
                    log.warn("报警: {} 超过一级阈值 {}", key, factor.getLevelOneThreshold());  
                    sendAlarmAsync(key, value, 1, factor);  
                } else {  
                    log.info("正常: {} 未超过阈值", key);  
                    // 消警  
                    clearAlarmAsync(key, factor.getName());  
                }  
                break;  
            }  
        }  
    }  
  
    /**  
     * 发送报警数据  
     *  
     * @param key    因子编码  
     * @param value  监测值  
     * @param level  等级 1级 2级  
     * @param factor 枚举  
     */  
    public void sendAlarmAsync(String key, BigDecimal value, Integer level, GasType factor) {  
        String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN));  
        try {  
            if (Boolean.FALSE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {  
                redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "startTime", time);  
                redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "name", factor.getName());  
                redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "unit", factor.getUnit());  
                // 不存在则新增influxdb数据  
                // InfluxDB报警数据  
                Map<String, String> tags = new HashMap<>();  
                Map<String, Object> fields = new HashMap<>();  
                tags.put("code", key);  
                tags.put("name", factor.getName());  
                fields.put("value", value);  
                fields.put("alarmTime", time);  
                fields.put("unit", factor.getUnit());  
                fields.put("level", level);  
                // 额外添加type, 0为报警 1为消警  
                fields.put("type", 0);  
                fields.put("threshold", factor.getLevelOneThreshold() * level);  
                fields.put("content", String.format("监测点位%s编号%s于%s发生%s级报警,报警值%s%s,报警阈值%s%s", factor.getName(), key, time, level, value, factor.getUnit(), factor.getLevelOneThreshold() * level, factor.getUnit()));  
                influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields);  
            }  
            // Redis报警数据  
            redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "level", level);  
            redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "value", value);  
            redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "threshold", factor.getLevelOneThreshold() * level);  
  
        } catch (Exception e) {  
            log.error("发送报警数据失败, key: {}, value: {}, level: {}, 原因: {}", key, value, level, e.getLocalizedMessage());  
        }  
    }  
  
    /**  
     * 清除报警  
     *  
     * @param key key  
     */    public void clearAlarmAsync(String key, String name) {  
        if (Boolean.TRUE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) {  
            log.info("移除{}报警缓存,添加消警时间", key);  
            // 移除redis报警  
            redisTemplate.delete(TOXIC_AND_HARMFUL_DATA_ALARM + key);  
            // 获取当前点位未消警的报警数据,删除后插入带有结束时间的数据  
            String command = "SELECT * FROM " + DB_TOXIC_AND_HARMFUL_ALARM + "\n" +  
                    "WHERE \"code\" = '" + key + "' AND \"type\" = 0\n" +  
                    "ORDER BY time DESC\n" +  
                    "LIMIT 1";  
            List<AlarmFieldDTO> dataList = influxTemplate.queryBeanList(command, AlarmFieldDTO.class);  
            // 修改数据  
            if (!dataList.isEmpty()) {  
                AlarmFieldDTO fieldDTO = dataList.get(0);  
                Map<String, String> tags = new HashMap<>();  
                tags.put("code", key);  
                tags.put("name", name);  
                Map<String, Object> fields = new HashMap<>();  
                fields.put("value", fieldDTO.getValue());  
                fields.put("alarmTime", fieldDTO.getAlarmTime());  
                fields.put("unit", fieldDTO.getUnit());  
                fields.put("level", fieldDTO.getLevel());  
                fields.put("threshold", fieldDTO.getThreshold());  
                fields.put("content", fieldDTO.getContent());  
                fields.put("type", 1);  
                fields.put("clearTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)));  
                // 移除原数据  
                influxTemplate.query("DELETE FROM " + DB_TOXIC_AND_HARMFUL_ALARM + " WHERE time = '" + new SimpleDateFormat(UTC_MS_WITH_ZONE_OFFSET_PATTERN).format(fieldDTO.getTime()) + "'");  
                // 增加数据  
                influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields, fieldDTO.getTime().getTime(), TimeUnit.MILLISECONDS, ZoneOffset.of("+08:00"));  
            }  
        }  
    }  
  
    @Override  
    public void systemAction(HjData hjData) {  
        log.info("执行系统指令{}", hjData);  
    }  
}

引入的两个处理jar包说明

  • allbs-hj212 参考代码https://github.com/tracy4262/hj212.git自己重新做了封装,且发布到了maven公共库便于引入使用。用来处理hj212报文协议,解析数据
  • allbs-influx 自己做的influxdb相关操作的封装,地址https://github.com/chenqi92/allbs-influx.git也上传至maven公共库中便于引入,如果你们需要可以使用2.0.2版本,2.1.0版本没测所以使用有点问题,后面会修改并且增加一些便捷方法。

源码地址

我上面的核心代码并不完整,可以直接去看源码。如果对你有用,给个star哦😘源码地址:

https://github.com/chenqi92/pc-toxic-and-harmful.git