Newer
Older
newfiber-data-adapter / src / main / java / org / springnewfiber / dataadapter / ziguang / yl / util / RainfallAccumulationUtil.java
package org.springnewfiber.dataadapter.ziguang.yl.util;

import cn.hutool.core.thread.NamedThreadFactory;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto;

import javax.annotation.concurrent.ThreadSafe;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
 * @program: newwater-data-engine
 * @description: 累计雨量
 * @author: djt
 * @create: 2022-04-06 09:09
 **/
@Getter
@ThreadSafe
@Slf4j
public class RainfallAccumulationUtil {
    private final static Map<String, NodeQueuedGroup> map = Maps.newHashMap();
    private final ExecutorService executor;

    public Future submit(YlDto ylDto, String stKeySnKey) {
//        handle(ylDto, stKeySnKey);
        return executor.submit(() -> handle(ylDto, stKeySnKey));
    }

    private void handle(YlDto ylDto, String stKeySnKey) {
        if (map.containsKey(stKeySnKey) && map.get(stKeySnKey) != null) {
            map.get(stKeySnKey).doHandle(ylDto);
        } else {
            NodeQueuedGroup var = new NodeQueuedGroup(stKeySnKey);
            var.doHandle(ylDto);
            map.put(stKeySnKey, var);
        }
    }

    public RainfallAccumulationUtil() {
        executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("RainfallAccumulation", Boolean.FALSE));
    }

    static final class NodeQueuedGroup {
        private final NodeQueued oneNodeQueued = new NodeQueued(60 * 1 / 5);
        private final NodeQueued threeNodeQueued = new NodeQueued(60 * 3 / 5);
        private final NodeQueued sixNodeQueued = new NodeQueued(60 * 6 / 5);
        private final NodeQueued twelveNodeQueued = new NodeQueued(60 * 12 / 5);
        private final NodeQueued twentyFourNodeQueued = new NodeQueued(60 * 24 / 5);
        //todo 每日累计降雨量算法未算
        //        private final NodeQueued oneNodeQueued = new NodeQueued(10);
//        private final NodeQueued threeNodeQueued = new NodeQueued(20);
//        private final NodeQueued sixNodeQueued = new NodeQueued(30);
//        private final NodeQueued twelveNodeQueued = new NodeQueued(40);
//        private final NodeQueued twentyFourNodeQueued = new NodeQueued(50);
        List<NodeQueued> handleList = Lists.newArrayList(oneNodeQueued, threeNodeQueued, sixNodeQueued, twelveNodeQueued, twentyFourNodeQueued);
        //        List<NodeQueued> handleList = Lists.newArrayList(oneNodeQueued);
        private String stKeySnKey;

        NodeQueuedGroup(String stKeySnKey) {
            this.stKeySnKey = stKeySnKey;
        }

        private void doHandle(YlDto ylDto) {
            handleList.forEach(i -> {
                log.info("当前处理队列为:{}", i.size);
                count(ylDto, i);
                ylDto.setCountBig(i.size, ylDto, i.countBig);
                log.info("处理队列完成队列为:{}", i.size);
            });
        }
    }

    /**
     * 基础队列
     */
    static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile YlDto ylDto;

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            } else {
                return p;
            }
        }

        private Node(YlDto ylDto) {
            this.ylDto = ylDto;
        }

        Node() {
        }
    }

    /**
     * @author djt
     * @creed: Talk is cheap,show me the code
     * @date 2022/4/6 9:58
     * @description: 主队列类,用于计算
     */
    static final class NodeQueued {
        private volatile Node head;
        private volatile Node tail;
        /**
         * 队列限定个数
         */
        private volatile int size;
        /**
         * 队列累计值
         */
        private volatile BigDecimal countBig = BigDecimal.ZERO;

        NodeQueued(int size) {
            this.size = size;
        }

        public final int getQueueLength() {
            int n = 0;
            for (Node p = tail; p != null; p = p.prev) {
                ++n;
            }
            return n;
        }

        //入栈
        private Node enq(Node node) {
            for (; ; ) {
                Node t = tail;
                if (head == null) { // Must initialize
                    head = node;
                    return node;
                } else if (t != null) {
                    node.prev = t;
                    t.next = node;
                    tail = node;
                    return t;
                } else if (t == null) {
                    tail = node;
                    node.prev = head;
                    head.next = node;
                    return node;
                }
            }
        }

        /**
         * @param node
         * @return 返回移除的头部节点
         * @author djt
         * @creed: Talk is cheap,show me the code
         * @date 2022/4/6 11:17
         * @description: 出栈,入栈
         */
        private Node outq(Node node) {
            for (; ; ) {
                Node t = head;
                if (t == null) {
                    throw new RuntimeException("头节点为null");
                } else {
                    head = t.next;
                    head.prev = null;
                    enq(node);
                    return t;
                }
            }
        }

        //计算累计值
        private void countBig(Node node) {
            if (node != null && node.ylDto != null) {
                log.info("处理数据:{},key:{},处理前累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig);
                if (getQueueLength() == size) {
                    //弹出头部,进入尾部,(个数相等,并计算累计,累计减去头部,加上尾部值)
                    Node oldHeadNode = outq(node);
                    countBig = countBig
                            .subtract(new BigDecimal(oldHeadNode.ylDto.getDRP()))
                            .add(new BigDecimal(node.ylDto.getDRP()));

                    log.info("满:{},去掉头部",size);
                } else {
                    //队列未满时,入栈时,累计
                    enq(node);
                    countBig = countBig.add(new BigDecimal(node.ylDto.getDRP()));
                }
                log.info("处理数据:{},key:{},处理后累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig);
                return;
            }
            log.error("node为null:{}", JSONObject.toJSONString(node));
        }
    }

    private static boolean checkNode(YlDto ylDto) {
        if (ylDto == null || ylDto.getDRP() == null) {
            log.error("数据为null");
            return false;
        } else {
            return true;
        }
    }

    private static void count(YlDto ylDto, NodeQueued nodeQueued) {
        if (checkNode(ylDto)) {
            nodeQueued.countBig(new Node(ylDto));
        }
    }

}