Newer
Older
newfiber-data-adapter / src / main / java / org / springnewfiber / dataadapter / xf / service / HistoryDataService.java
@silver silver on 29 Jun 2022 4 KB 讯飞历史数据接入
package org.springnewfiber.dataadapter.xf.service;

import cn.hutool.core.util.NumberUtil;
import com.alibaba.fastjson.JSONObject;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import org.springnewfiber.dataadapter.entity.MqNodeData;
import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel;
import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer;
import org.springnewfiber.dataadapter.xf.XfDataEnum;
import org.springnewfiber.dataadapter.xf.entity.BaseXfInterfaceEntity;
import org.springnewfiber.dataadapter.xf.entity.monitor.MonitorChildenData;
import org.springnewfiber.dataadapter.xf.entity.monitor.MonitorDataHistoryMongo;
import org.springnewfiber.dataadapter.xf.enums.EDataAccessType;

/**
 * @author : X.K
 * @since : 2022/6/28 下午4:00
 */
@Slf4j
@Service
@SuppressWarnings("FieldCanBeLocal")
public class HistoryDataService {

	@Resource
	private DataAccessRecordService dataAccessRecordService;

	@Resource
	private MongoTemplate mongoTemplate;

	private final String ABNORMAL_FORMAT = "%s-%s-%s";

	/**
	 * 讯飞历史数据接入
	 */
	public <T extends BaseXfInterfaceEntity> Boolean xfInterface(String data, XfDataEnum xfDataEnum){
		Long startTime = System.currentTimeMillis();

		try{
			// 原始数据保存到MySQL
			Integer accessRecordId = dataAccessRecordService.save(EDataAccessType.IflytekHistory, xfDataEnum.getCode(), data);

			// 解析原始数据
			List<T> dataList = XfDataEnum.parse(data, xfDataEnum);

			// 根据ST分组
			Map<String, List<T>> stDataMap = dataList.stream().collect(Collectors.groupingBy(BaseXfInterfaceEntity::getStcd));

			List<String> stList = new ArrayList<>();
			int monitorDataCount = 0;
			List<String> abnormalData = new ArrayList<>();

			// 保存到MongoDB
			for(Entry<String, List<T>> entry : stDataMap.entrySet()){
				List<MonitorDataHistoryMongo> monitorDataHistoryMongoList = new ArrayList<>();
				List<T> stDataList = entry.getValue();

				stList.add(entry.getKey());
				monitorDataCount = monitorDataCount + entry.getValue().size();

				for(T stData : stDataList){
					// 监测基础数据
					MonitorDataHistoryMongo monitorDataHistoryMongo = new MonitorDataHistoryMongo();
					monitorDataHistoryMongo.setSt(entry.getKey());
					monitorDataHistoryMongo.setTt(stData.getTm());
					monitorDataHistoryMongo.setUt(stData.getTm());
					monitorDataHistoryMongo.setCreateTime(new Date());

					// 监测数据
					PtReceiveBaseModel baseModel = RealTimeSerializer.xfObjectToRealMap(stData);
					for(Entry<String, MqNodeData> nodeDataEntry : baseModel.getDataMap().entrySet()){
						// 数据异常,则记录并跳过
						if(StringUtils.isBlank(nodeDataEntry.getValue().getValue().toString()) ||
							!NumberUtil.isNumber(nodeDataEntry.getValue().getValue().toString())){
							abnormalData.add(String.format(ABNORMAL_FORMAT, entry.getKey(), stData.getTm(), nodeDataEntry.getKey()));
							continue;
						}

						BigDecimal decimalValue = new BigDecimal(nodeDataEntry.getValue().getValue().toString());
						MonitorChildenData monitorChildenData = new MonitorChildenData();
						monitorChildenData.setSn(nodeDataEntry.getValue().getKey());
						monitorChildenData.setValue(nodeDataEntry.getValue().getValue().toString());
						monitorChildenData.setValidateValue(decimalValue);
						monitorChildenData.setValidateValueTwo(decimalValue);
						monitorChildenData.setChangeRate(BigDecimal.ZERO);


						monitorDataHistoryMongo.put(nodeDataEntry.getKey(), Collections.singleton(monitorChildenData));
					}

					monitorDataHistoryMongoList.add(monitorDataHistoryMongo);
				}

				mongoTemplate.insert(monitorDataHistoryMongoList, entry.getKey());
			}

			Long endTime = System.currentTimeMillis();
			String accessResult = JSONObject.toJSONString(new AccessResult(stList, monitorDataCount, abnormalData));
			dataAccessRecordService.refreshResult(accessRecordId, (int) (endTime - startTime), accessResult);

			log.info("讯飞历史数据接入成功-耗时:{},接入结果:{}", endTime - startTime, accessResult);

		} catch (Exception e){
			log.error("讯飞历史数据接入失败");
			e.printStackTrace();
			return false;
		}

		return true;
	}

	@Data
	static class AccessResult{
		private Integer monitorDataCount;
		private Integer stCount;
		private List<String> abnormalData;
		private String stList;

		public AccessResult(List<String> stList, Integer monitorDataCount, List<String> abnormalData) {
			this.stList = String.join(",", stList);
			this.stCount = stList.size();
			this.monitorDataCount = monitorDataCount;
			this.abnormalData = abnormalData;
		}
	}
}