package org.springnewfiber.dataadapter.xf.controller; import cn.hutool.core.collection.CollUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springnewfiber.dataadapter.config.BladeRedis; import org.springnewfiber.dataadapter.config.R; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer; import org.springnewfiber.dataadapter.xf.XfDataEnum; import org.springnewfiber.dataadapter.xf.entity.*; import java.util.Date; import java.util.List; import java.util.Map; /** * @program: newfiber-data-adapter * @description: * @author: djt * @create: 2022-06-14 20:44 **/ @RestController @RequiredArgsConstructor //@PreAuth(AuthConstant.PERMISSION_ALL) @RequestMapping("/monitorData") @Api(value = "讯飞数据接受接口", tags = "讯飞数据接受接口") @Slf4j public class MonitorDataController { private final RabbitTemplate rabbitTemplate; private final BladeRedis redis; @Value("${business.hankouSaveUrl}") private String hankouSaveUrl; @PostMapping("/xfInterface") public R xfInterface(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) { log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark()); List<? extends BaseXfInterfaceEntity> DTO = null; try { if (xfDataEnum == XfDataEnum.ChnlData) { DTO = JSONObject.parseArray(data, MonitorChnlDataDto.class); } else { if (xfDataEnum == XfDataEnum.ForcastData) { List<MonitorForcastData> dataList = JSONObject.parseArray(data, MonitorForcastData.class); dataList.forEach((i) -> { // this.build(i); log.info("cover:{}", JSONObject.toJSONString(i)); String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(i), 30000); log.info("发送汉口站数据:{}", request); }); return R.status(true); } if (xfDataEnum == XfDataEnum.LakeData) { DTO = JSONObject.parseArray(data, MonitorLakeData.class); } else if (xfDataEnum == XfDataEnum.Meteorological) { DTO = JSONObject.parseArray(data, MonitorMeteorologicalData.class); } else if (xfDataEnum == XfDataEnum.PptnData) { DTO = JSONObject.parseArray(data, MonitorPptnData.class); DTO.forEach(i->{ checkSet((MonitorPptnData) i); }); } else if (xfDataEnum == XfDataEnum.PumpData) { DTO = JSONObject.parseArray(data, MonitorPumpData.class); } else if (xfDataEnum == XfDataEnum.RiverData) { DTO = JSONObject.parseArray(data, MonitorRiverData.class); } else if (xfDataEnum == XfDataEnum.SoilData) { DTO = JSONObject.parseArray(data, MonitorSoilData.class); } else if (xfDataEnum == XfDataEnum.WasData) { DTO = JSONObject.parseArray(data, MonitorWasData.class); } else if (xfDataEnum == XfDataEnum.WetlogData) { DTO = JSONObject.parseArray(data, MonitorWetlogData.class); } else { throw new RuntimeException("暂未开发"); } } if (CollUtil.isNotEmpty(DTO)) { DTO.forEach((i) -> { PtReceiveBaseModel model = RealTimeSerializer.xfObjectToRealMap(i); log.info("cover:{}", JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); }); return R.status(true); } else { throw new RuntimeException("类型转换错误"); } } catch (Exception var5) { log.error("解析错误,:{}", var5.getStackTrace()); throw new RuntimeException("类型转换错误"); } } private void checkSet(MonitorPptnData pptnData) { String st = pptnData.getStcd(); Date tm = pptnData.getTm(); MonitorPptnData old; if (redis.exists(st) && (old = redis.get(st)) != null && tm.compareTo(old.getTm()) > 0) { pptnData.setPt(old.getPt().add(pptnData.getDrp())); redis.set(pptnData.getStcd(), pptnData); } else if (!redis.exists(st)) { redis.set(pptnData.getStcd(), pptnData); } } private PtReceiveBaseModel build(MonitorForcastData DTO) { PtReceiveBaseModel model = new PtReceiveBaseModel(); Map<String, MqNodeData> dataMap = Maps.newHashMap(); MqNodeData dataQ = new MqNodeData(); dataQ.setSn("q"); dataQ.setKey("q"); dataQ.setValue(DTO.getQ()); MqNodeData dataZ = new MqNodeData(); dataZ.setSn("z"); dataZ.setKey("z"); dataZ.setValue(DTO.getZ()); MqNodeData dataYbt = new MqNodeData(); dataYbt.setSn("ybt"); dataYbt.setKey("ybt"); dataYbt.setValue(DTO.getFocTime()); dataMap.put("q", dataQ); dataMap.put("z", dataZ); dataMap.put("ybt", dataYbt); model.setDataMap(dataMap); model.setSt(DTO.getStcd()); model.setTt(DTO.getPubTime()); model.setUt(new Date()); return model; } @PostMapping("/xfInterface/single") public R xfInterfaceSingle(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) { log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark()); PtReceiveBaseModel model = new PtReceiveBaseModel(); try { if (xfDataEnum == XfDataEnum.ChnlData) { MonitorChnlDataDto DTO = JSONObject.parseObject(data, MonitorChnlDataDto.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.ForcastData) { MonitorForcastData DTO = JSONObject.parseObject(data, MonitorForcastData.class); log.info("cover:{}", JSONObject.toJSONString(DTO)); String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(DTO), 30000); log.info("发送汉口站数据:{}", request); return R.status(true); } else if (xfDataEnum == XfDataEnum.LakeData) { MonitorLakeData DTO = JSONObject.parseObject(data, MonitorLakeData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.Meteorological) { MonitorMeteorologicalData DTO = JSONObject.parseObject(data, MonitorMeteorologicalData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.PptnData) { MonitorPptnData DTO = JSONObject.parseObject(data, MonitorPptnData.class); checkSet((MonitorPptnData) DTO); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.PumpData) { MonitorPumpData DTO = JSONObject.parseObject(data, MonitorPumpData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.RiverData) { MonitorRiverData DTO = JSONObject.parseObject(data, MonitorRiverData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.SoilData) { MonitorSoilData DTO = JSONObject.parseObject(data, MonitorSoilData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.WasData) { MonitorWasData DTO = JSONObject.parseObject(data, MonitorWasData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.WetlogData) { MonitorWetlogData DTO = JSONObject.parseObject(data, MonitorWetlogData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else { throw new RuntimeException("暂未开发"); } log.info("cover:{}", JSONObject.toJSONString(model)); if (CollUtil.isEmpty(model.getDataMap())) { throw new RuntimeException("数据转换错误"); } else { rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); return R.status(true); } } catch (Exception var5) { log.error("解析错误,:{}", var5.getStackTrace()); throw new RuntimeException("类型转换错误"); } } }