package org.springnewfiber.dataadapter.xf.service; import cn.hutool.core.util.NumberUtil; import com.alibaba.fastjson.JSONObject; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; import javax.annotation.Resource; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.stereotype.Service; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer; import org.springnewfiber.dataadapter.xf.XfDataEnum; import org.springnewfiber.dataadapter.xf.entity.BaseXfInterfaceEntity; import org.springnewfiber.dataadapter.xf.entity.monitor.MonitorChildenData; import org.springnewfiber.dataadapter.xf.entity.monitor.MonitorDataHistoryMongo; import org.springnewfiber.dataadapter.xf.enums.EDataAccessType; /** * @author : X.K * @since : 2022/6/28 下午4:00 */ @Slf4j @Service @SuppressWarnings("FieldCanBeLocal") public class HistoryDataService { @Resource private DataAccessRecordService dataAccessRecordService; @Resource private MongoTemplate mongoTemplate; private final String ABNORMAL_FORMAT = "%s-%s-%s"; /** * 讯飞历史数据接入 */ public <T extends BaseXfInterfaceEntity> Boolean xfInterface(String data, XfDataEnum xfDataEnum){ Long startTime = System.currentTimeMillis(); try{ // 原始数据保存到MySQL Integer accessRecordId = dataAccessRecordService.save(EDataAccessType.IflytekHistory, xfDataEnum.getCode(), data); // 解析原始数据 List<T> dataList = XfDataEnum.parse(data, xfDataEnum); // 根据ST分组 Map<String, List<T>> stDataMap = dataList.stream().collect(Collectors.groupingBy(BaseXfInterfaceEntity::getStcd)); List<String> stList = new ArrayList<>(); int monitorDataCount = 0; List<String> abnormalData = new ArrayList<>(); // 保存到MongoDB for(Entry<String, List<T>> entry : stDataMap.entrySet()){ List<MonitorDataHistoryMongo> monitorDataHistoryMongoList = new ArrayList<>(); List<T> stDataList = entry.getValue(); stList.add(entry.getKey()); monitorDataCount = monitorDataCount + entry.getValue().size(); for(T stData : stDataList){ // 监测基础数据 MonitorDataHistoryMongo monitorDataHistoryMongo = new MonitorDataHistoryMongo(); monitorDataHistoryMongo.setSt(entry.getKey()); monitorDataHistoryMongo.setTt(stData.getTm()); monitorDataHistoryMongo.setUt(stData.getTm()); monitorDataHistoryMongo.setCreateTime(new Date()); // 监测数据 PtReceiveBaseModel baseModel = RealTimeSerializer.xfObjectToRealMap(stData); for(Entry<String, MqNodeData> nodeDataEntry : baseModel.getDataMap().entrySet()){ // 数据异常,则记录并跳过 if(StringUtils.isBlank(nodeDataEntry.getValue().getValue().toString()) || !NumberUtil.isNumber(nodeDataEntry.getValue().getValue().toString())){ abnormalData.add(String.format(ABNORMAL_FORMAT, entry.getKey(), stData.getTm(), nodeDataEntry.getKey())); continue; } BigDecimal decimalValue = new BigDecimal(nodeDataEntry.getValue().getValue().toString()); MonitorChildenData monitorChildenData = new MonitorChildenData(); monitorChildenData.setSn(nodeDataEntry.getValue().getKey()); monitorChildenData.setValue(nodeDataEntry.getValue().getValue().toString()); monitorChildenData.setValidateValue(decimalValue); monitorChildenData.setValidateValueTwo(decimalValue); monitorChildenData.setChangeRate(BigDecimal.ZERO); monitorDataHistoryMongo.put(nodeDataEntry.getKey(), Collections.singleton(monitorChildenData)); } monitorDataHistoryMongoList.add(monitorDataHistoryMongo); } mongoTemplate.insert(monitorDataHistoryMongoList, entry.getKey()); } Long endTime = System.currentTimeMillis(); String accessResult = JSONObject.toJSONString(new AccessResult(stList, monitorDataCount, abnormalData)); dataAccessRecordService.refreshResult(accessRecordId, (int) (endTime - startTime), accessResult); log.info("讯飞历史数据接入成功-耗时:{},接入结果:{}", endTime - startTime, accessResult); } catch (Exception e){ log.error("讯飞历史数据接入失败"); e.printStackTrace(); return false; } return true; } @Data static class AccessResult{ private Integer monitorDataCount; private Integer stCount; private List<String> abnormalData; private String stList; public AccessResult(List<String> stList, Integer monitorDataCount, List<String> abnormalData) { this.stList = String.join(",", stList); this.stCount = stList.size(); this.monitorDataCount = monitorDataCount; this.abnormalData = abnormalData; } } }