package xyz.noark.redis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import xyz.noark.core.annotation.Autowired;
import xyz.noark.core.annotation.Value;
import xyz.noark.core.event.DistributedDelayEvent;
import xyz.noark.core.event.DistributedEventManager;
import xyz.noark.core.event.Event;
import xyz.noark.core.event.EventManager;
import xyz.noark.core.thread.NamedThreadFactory;
import xyz.noark.core.util.ThreadUtils;
import xyz.noark.log.LogHelper;

/* loaded from: input_file:xyz/noark/redis/RedisDistributedEventManager.class */
public class RedisDistributedEventManager implements DistributedEventManager {
    private static final int SHUTDOWN_MAX_TIME = 10;
    private static final String KEY_DELAY_JOB = "noark:delay:job";
    private static final String KEY_DELAY_TIMER = "noark:delay:timer";
    private static final String KEY_DELAY_QUEUE = "noark:delay:queue";
    private static final String SCRIPT_PUBLISH_UNEXPIRED_EVENT = "local val = redis.call('HSETNX', KEYS[1], ARGV[1], ARGV[3]);\nif(val == 1) then\n    redis.call('ZADD', KEYS[2], ARGV[2], ARGV[1]);\n    return 1;\nend\nreturn 0";
    private static final String SCRIPT_PUBLISH_EXPIRED_EVENT = "local val = redis.call('HSETNX', KEYS[1], ARGV[1], ARGV[3]);\nif(val == 1) then\n    redis.call('ZADD', KEYS[2], ARGV[2], ARGV[1]);\n    redis.call('RPUSH', KEYS[3], ARGV[3]);\n    return 1;\nend\nreturn 0";
    private static final String SCRIPT_REMOVE_EVENT = "local val = redis.call('HDEL', KEYS[1], ARGV[1]);\nif(val == 1) then\n    redis.call('ZREM', KEYS[2], ARGV[1]);\n    return 1;\nend\nreturn 0";
    private static final String SCRIPT_CHECK_AND_TRANSPORT = "local val = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2])\nif(next(val) ~= nil) then\n    for i = 1, #val do\n        local job = redis.call('HGET', KEYS[3], val[i]);\n        if(job ~= nil) then\n            redis.call('ZINCRBY', KEYS[1], 60000, val[i]);\n            redis.call('rpush', KEYS[2], job);\n        else\n            redis.call('zrem', KEYS[1], val[i]);\n        end\n    end\n    return #val;\nend\nreturn 0";
    private static final int DOUBLE_PUBLISH_DELAY_TIME = 60000;

    @Autowired
    private EventManager eventManager;

    @Autowired
    private RedisTemplate redisTemplate;

    @Value("server.id")
    private int serverId;
    private final ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(2, (ThreadFactory) new NamedThreadFactory("distributed_event"));

    @PostConstruct
    public void init() {
        initDelayTakeTask();
        initDelayTriggerTask();
    }

    private void initDelayTriggerTask() {
        this.scheduledExecutor.scheduleWithFixedDelay(this::checkAndTransport, ((3 - (LocalTime.now().getSecond() % 3)) * this.serverId) % 3, 15L, TimeUnit.SECONDS);
    }

    private void initDelayTakeTask() {
        this.scheduledExecutor.execute(this::take);
    }

    public void take() {
        try {
            doTake();
        } catch (Throwable th) {
            LogHelper.logger.warn("延迟Take异常 {}", new Object[]{th});
            ThreadUtils.sleep(100L);
        } finally {
            initDelayTakeTask();
        }
    }

    private void doTake() {
        LogHelper.logger.debug("Take队列，进入一小时等待期...");
        List<String> brpop = this.redisTemplate.opsForList().brpop(3600, KEY_DELAY_QUEUE);
        if (brpop.size() == 2) {
            this.eventManager.publish((Event) JSON.parseObject(brpop.get(1), DistributedDelayEvent.class));
        } else {
            LogHelper.logger.warn("Take到异常任务：{}", new Object[]{brpop});
        }
    }

    public void shutdown() {
        LogHelper.logger.info("开始通知分布式延迟任务工作线程池停止服务.");
        this.scheduledExecutor.shutdown();
        try {
            if (!this.scheduledExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.scheduledExecutor.shutdownNow();
            }
            LogHelper.logger.info("处理分布式延迟任务工作线程池已停止服务");
        } catch (InterruptedException e) {
            LogHelper.logger.error("停止分布式延迟任务工作线程时发生异常.", new Object[]{e});
            this.scheduledExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void checkAndTransport() {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(KEY_DELAY_TIMER);
        arrayList.add(KEY_DELAY_QUEUE);
        arrayList.add(KEY_DELAY_JOB);
        ArrayList arrayList2 = new ArrayList(2);
        arrayList2.add(String.valueOf(System.currentTimeMillis()));
        arrayList2.add(String.valueOf(32));
        LogHelper.logger.debug("搬运工作中，本次搬运数量={}", new Object[]{this.redisTemplate.eval(SCRIPT_CHECK_AND_TRANSPORT, arrayList, arrayList2)});
    }

    public boolean publish(DistributedDelayEvent distributedDelayEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        return currentTimeMillis >= distributedDelayEvent.getEndTime().getTime() ? publishExpiredEvent(distributedDelayEvent, currentTimeMillis) : publishUnexpiredEvent(distributedDelayEvent);
    }

    private boolean publishUnexpiredEvent(DistributedDelayEvent distributedDelayEvent) {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(KEY_DELAY_JOB);
        arrayList.add(KEY_DELAY_TIMER);
        ArrayList arrayList2 = new ArrayList(3);
        arrayList2.add(buildEventId(distributedDelayEvent));
        arrayList2.add(String.valueOf(distributedDelayEvent.getEndTime().getTime()));
        arrayList2.add(JSON.toJSONString(distributedDelayEvent, new SerializerFeature[]{SerializerFeature.WriteClassName}));
        return "1".equals(this.redisTemplate.eval(SCRIPT_PUBLISH_UNEXPIRED_EVENT, arrayList, arrayList2));
    }

    private boolean publishExpiredEvent(DistributedDelayEvent distributedDelayEvent, long j) {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(KEY_DELAY_JOB);
        arrayList.add(KEY_DELAY_TIMER);
        arrayList.add(KEY_DELAY_QUEUE);
        ArrayList arrayList2 = new ArrayList(3);
        arrayList2.add(buildEventId(distributedDelayEvent));
        arrayList2.add(String.valueOf(j + 60000));
        arrayList2.add(JSON.toJSONString(distributedDelayEvent, new SerializerFeature[]{SerializerFeature.WriteClassName}));
        return "1".equals(this.redisTemplate.eval(SCRIPT_PUBLISH_EXPIRED_EVENT, arrayList, arrayList2));
    }

    public boolean remove(DistributedDelayEvent distributedDelayEvent) {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(KEY_DELAY_JOB);
        arrayList.add(KEY_DELAY_TIMER);
        ArrayList arrayList2 = new ArrayList(1);
        arrayList2.add(buildEventId(distributedDelayEvent));
        return "1".equals(this.redisTemplate.eval(SCRIPT_REMOVE_EVENT, arrayList, arrayList2));
    }

    private String buildEventId(DistributedDelayEvent distributedDelayEvent) {
        String name = distributedDelayEvent.getClass().getName();
        return new StringBuilder(name.length() + 1 + 18).append(name).append(':').append(distributedDelayEvent.getId()).toString();
    }
}
