Newer
Older
newfiber-data-adapter / src / main / java / org / springnewfiber / dataadapter / sswj / core / SswjCoreJob.java
package org.springnewfiber.dataadapter.sswj.core;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springnewfiber.dataadapter.config.BladeRedis;
import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel;
import org.springnewfiber.dataadapter.sswj.entity.*;
import org.springnewfiber.dataadapter.sswj.mapper.ZgRainDataMapper;
import org.springnewfiber.dataadapter.sswj.service.impl.*;
import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer;

import java.math.BigDecimal;
import java.util.List;

/**
 * @program: newfiber-data-adapter
 * @description:
 * @author: djt
 * @create: 2022-04-21 19:06
 **/
//@Component
@Slf4j
@AllArgsConstructor
public class SswjCoreJob {
    private final BladeRedis redis;
    private final RabbitTemplate rabbitTemplate;
    private final BzLiveServiceImpl bzLiveService;
    private final SqGqRServiceImpl sqGqRService;
    private final SqHdSrServiceImpl sqHdSrService;
    private final SqHpRServiceImpl sqHpRService;
    private final SqHzRServiceImpl sqHzRService;
    private final SqSkRServiceImpl sqSkRService;
    private final SqZsRServiceImpl sqZsRService;
    private final SzGssydServiceImpl szGssydService;
    private final SzyRServiceImpl szyRService;
    private final TrsqTjhsRServiceImpl trsqTjhsRService;
    private final ZgRainDataMapper zgRainDataMapper;

//    @Scheduled(cron = "0 0/1 * * * ?")
    public void selectSwj() {
        checkHasRealDate(new BzLive(), bzLiveService);
        checkHasRealDate(new SqGqR(), sqGqRService);
        checkHasRealDate(new SqHdSr(), sqHdSrService);
        checkHasRealDate(new SqHpR(), sqHpRService);
        checkHasRealDate(new SqHzR(), sqHzRService);
        checkHasRealDate(new SqSkR(), sqSkRService);
        checkHasRealDate(new SqZsR(), sqZsRService);
        checkHasRealDate(new SzGssyd(), szGssydService);
        checkHasRealDate(new SzyR(), szyRService);
        checkHasRealDate(new TrsqTjhsR(), trsqTjhsRService);
        zgRainData();
    }

    private void checkHasRealDate(BaseSwjEntity baseSwjEntity, ServiceImpl service) {
        try {
            Class clazz = baseSwjEntity.getClass();
            String name = clazz.getSimpleName();
            QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>();
            qw.groupBy("DATA_UP_UUID");
            qw.select("DATA_UP_UUID");
            //第一次初始化执行
            List<BaseSwjEntity> list = service.list(qw);
            list.forEach(i -> {
                String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid());
                BaseSwjEntity redisObject;
                if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTTDate() != null) {
                    QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>();
                    selectQwId.orderByAsc(baseSwjEntity.getTTFileName());
                    selectQwId.eq("DATA_UP_UUID", redisObject.getDataUpUuid());
                    selectQwId.gt(baseSwjEntity.getTTFileName(), redisObject.getTTDate());
                    List<BaseSwjEntity> nowList = service.list(selectQwId);
                    if (CollUtil.isNotEmpty(nowList)) {
                        nowList.forEach(now -> {
                            PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now);
                            redis.set(key, now);
                            //发送mq
                            log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model));
                            rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                        });
                    }
                } else {
                    QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>();
                    selectQwId.orderByAsc(baseSwjEntity.getTTFileName());
                    selectQwId.eq("DATA_UP_UUID", i);
                    List<BaseSwjEntity> nowList = service.list(selectQwId);
                    if (CollUtil.isNotEmpty(nowList)) {
                        nowList.forEach(now -> {
                            PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now);
                            redis.set(key, now);
                            //发送mq
                            log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model));
                            rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                        });
                    }
                }
            });
        } catch (Exception e) {
            log.error("发生错误:{},e:{}", baseSwjEntity.getClass().getSimpleName(), e.getStackTrace());
        }
    }

    private void zgRainData() {
        try {
            String name = ZgRainData.class.getSimpleName();
            QueryWrapper<ZgRainData> qw = new QueryWrapper<>();
            qw.groupBy("STCD");
            qw.select("STCD");
            //第一次初始化执行
            List<ZgRainData> list = zgRainDataMapper.selectList(qw);
            list.forEach(i -> {
                String key = name.concat(StringPool.COLON).concat(i.getSTCD());
                ZgRainData redisObject;
                if ((redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTM() != null)) {
                    QueryWrapper<ZgRainData> selectQwId = new QueryWrapper<>();
                    selectQwId.orderByAsc("TM");
                    selectQwId.eq("STCD", i.getSTCD());
                    selectQwId.gt("TM", redisObject.getTM());
                    List<ZgRainData> nowList = zgRainDataMapper.selectList(selectQwId);
                    if (CollUtil.isNotEmpty(nowList)) {
                        for (ZgRainData zgRainData : nowList) {
                            redisObject = redis.get(key);
                            zgRainData.setPt(redisObject.getPt().add(zgRainData.getDRP()));
                            PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData);
                            redis.set(key, zgRainData);
                            //发送mq
                            log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model));
                            rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                        }
                    }
                } else {
                    QueryWrapper<ZgRainData> selectQwId = new QueryWrapper<>();
                    selectQwId.orderByAsc("TM");
                    selectQwId.eq("STCD", i.getSTCD());
                    List<ZgRainData> nowList = zgRainDataMapper.selectList(selectQwId);
                    if (CollUtil.isNotEmpty(nowList)) {
                        for (ZgRainData zgRainData : nowList) {
                            redisObject = redis.get(key);
                            zgRainData.setPt(redisObject == null ? BigDecimal.ZERO.add(zgRainData.getDRP()) : redisObject.getPt().add(zgRainData.getDRP()));
                            PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData);
                            redis.set(key, zgRainData);
                            //发送mq
                            log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model));
                            rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                        }
                    }
                }
            });
        } catch (Exception e) {
            log.error("发生错误:{},e:{}", "紫光雨量数据", e.getStackTrace());
        }

    }

}