1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
| import cn.allbs.hj212.model.CpData; import cn.allbs.hj212.model.HjData; import cn.allbs.hj212.model.Pollution; import cn.allbs.influx.InfluxTemplate; import com.lyc.toxicharmful.config.enums.GasType; import com.lyc.toxicharmful.dto.AlarmFieldDTO; import com.lyc.toxicharmful.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA; import static com.lyc.toxicharmful.config.constant.CacheConstant.TOXIC_AND_HARMFUL_DATA_ALARM; import static com.lyc.toxicharmful.config.constant.CommonConstant.*;
@Slf4j @Service public class MessageFactoryImpl implements MessageFactory { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private InfluxTemplate influxTemplate; @Override public void hj212DataSave(HjData hjData) { log.info("接收到的212数据{}", hjData); Map<String, String> tags = new HashMap<>(); Map<String, Object> fields = new HashMap<>(); tags.put("mn", Optional.ofNullable(hjData).map(HjData::getMn).orElse("")); tags.put("qnTime", Optional.ofNullable(hjData).map(a -> DateUtil.timeFormatWithMs(a.getQn())).orElse("")); tags.put("dataTime", Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getDataTime).map(DateUtil::timeFormat).orElse("")); redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + Optional.ofNullable(hjData).map(HjData::getMn).orElse(""), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN))); if (Optional.ofNullable(hjData).map(HjData::getCp).map(CpData::getPollution).isPresent()) { redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), "time", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN))); hjData.getCp().getPollution().forEach((key, value) -> { Optional<BigDecimal> rtd = Optional.ofNullable(value.getRtd()); Optional<BigDecimal> avg = Optional.ofNullable(value.getAvg()); if (rtd.isPresent() || avg.isPresent()) { BigDecimal valueToCheck = rtd.orElseGet(avg::get); fields.put(key, valueToCheck); try { redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA + hjData.getMn(), key, valueToCheck); } catch (Exception e) { log.error("保存212实时数据失败,数据{},原因{}", hjData, e.getLocalizedMessage()); } checkAndHandleAlarm(key, valueToCheck); } fields.put(key + "_flag", Optional.of(value).map(Pollution::getFlag).orElse(null)); fields.put(key + "_SampleTime", Optional.of(value).map(Pollution::getSampleTime).orElse(null)); }); } influxTemplate.insert(DB_TOXIC_AND_HARMFUL_DATA, tags, fields); }
private void checkAndHandleAlarm(String key, BigDecimal value) { for (GasType factor : GasType.values()) { if (key.equals(factor.getCode())) { if (value.compareTo(BigDecimal.valueOf(factor.getLevelTwoThreshold())) >= 0) { log.warn("报警: {} 超过二级阈值 {}", key, factor.getLevelTwoThreshold()); sendAlarmAsync(key, value, 2, factor); } else if (value.compareTo(BigDecimal.valueOf(factor.getLevelOneThreshold())) >= 0) { log.warn("报警: {} 超过一级阈值 {}", key, factor.getLevelOneThreshold()); sendAlarmAsync(key, value, 1, factor); } else { log.info("正常: {} 未超过阈值", key); clearAlarmAsync(key, factor.getName()); } break; } } }
public void sendAlarmAsync(String key, BigDecimal value, Integer level, GasType factor) { String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN)); try { if (Boolean.FALSE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) { redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "startTime", time); redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "name", factor.getName()); redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "unit", factor.getUnit()); Map<String, String> tags = new HashMap<>(); Map<String, Object> fields = new HashMap<>(); tags.put("code", key); tags.put("name", factor.getName()); fields.put("value", value); fields.put("alarmTime", time); fields.put("unit", factor.getUnit()); fields.put("level", level); fields.put("type", 0); fields.put("threshold", factor.getLevelOneThreshold() * level); fields.put("content", String.format("监测点位%s编号%s于%s发生%s级报警,报警值%s%s,报警阈值%s%s", factor.getName(), key, time, level, value, factor.getUnit(), factor.getLevelOneThreshold() * level, factor.getUnit())); influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields); } redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "level", level); redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "value", value); redisTemplate.opsForHash().put(TOXIC_AND_HARMFUL_DATA_ALARM + key, "threshold", factor.getLevelOneThreshold() * level); } catch (Exception e) { log.error("发送报警数据失败, key: {}, value: {}, level: {}, 原因: {}", key, value, level, e.getLocalizedMessage()); } }
public void clearAlarmAsync(String key, String name) { if (Boolean.TRUE.equals(redisTemplate.hasKey(TOXIC_AND_HARMFUL_DATA_ALARM + key))) { log.info("移除{}报警缓存,添加消警时间", key); redisTemplate.delete(TOXIC_AND_HARMFUL_DATA_ALARM + key); String command = "SELECT * FROM " + DB_TOXIC_AND_HARMFUL_ALARM + "\n" + "WHERE \"code\" = '" + key + "' AND \"type\" = 0\n" + "ORDER BY time DESC\n" + "LIMIT 1"; List<AlarmFieldDTO> dataList = influxTemplate.queryBeanList(command, AlarmFieldDTO.class); if (!dataList.isEmpty()) { AlarmFieldDTO fieldDTO = dataList.get(0); Map<String, String> tags = new HashMap<>(); tags.put("code", key); tags.put("name", name); Map<String, Object> fields = new HashMap<>(); fields.put("value", fieldDTO.getValue()); fields.put("alarmTime", fieldDTO.getAlarmTime()); fields.put("unit", fieldDTO.getUnit()); fields.put("level", fieldDTO.getLevel()); fields.put("threshold", fieldDTO.getThreshold()); fields.put("content", fieldDTO.getContent()); fields.put("type", 1); fields.put("clearTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(NORM_DATETIME_PATTERN))); influxTemplate.query("DELETE FROM " + DB_TOXIC_AND_HARMFUL_ALARM + " WHERE time = '" + new SimpleDateFormat(UTC_MS_WITH_ZONE_OFFSET_PATTERN).format(fieldDTO.getTime()) + "'"); influxTemplate.insert(DB_TOXIC_AND_HARMFUL_ALARM, tags, fields, fieldDTO.getTime().getTime(), TimeUnit.MILLISECONDS, ZoneOffset.of("+08:00")); } } } @Override public void systemAction(HjData hjData) { log.info("执行系统指令{}", hjData); } }
|