package cn.jdevelops.delay.redis;

import cn.jdevelops.delay.core.entity.DelayQueueMessage;
import cn.jdevelops.delay.core.factory.DelayFactory;
import cn.jdevelops.delay.core.service.DelayService;
import com.alibaba.fastjson.JSON;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

@Service
/* loaded from: input_file:cn/jdevelops/delay/redis/RedisDelayService.class */
public class RedisDelayService implements DelayService<DelayQueueMessage> {
    private static final Logger logger = LoggerFactory.getLogger(RedisDelayService.class);

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    @Resource
    private DelayFactory<DelayQueueMessage> delayRunFactory;
    private static final String DELAY_QUEUE = "delay:redis_delay_queue";

    @Resource(name = "delayRedisScript")
    private DefaultRedisScript<List<String>> delayRedisScript;
    private static final String NAME = "RedisDelayMessageTask-thread-";
    private final AtomicInteger seq = new AtomicInteger(1);
    private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1, runnable -> {
        return new Thread(runnable, NAME + this.seq.getAndIncrement());
    });

    public void produce(DelayQueueMessage delayQueueMessage) {
        this.redisTemplate.opsForZSet().add(DELAY_QUEUE, JSON.toJSONString(delayQueueMessage), delayQueueMessage.getDelayTime().longValue());
    }

    public void produce(List<DelayQueueMessage> list) {
        list.forEach(delayQueueMessage -> {
            produce(delayQueueMessage);
        });
    }

    public void consumeDelay() {
        long round = Math.round((Math.random() * 10.0d) + 10.0d);
        long round2 = Math.round(Math.random() * 10.0d);
        long j = round2 == 0 ? 1L : round2;
        logger.info("开始消费redis延时队列数据...");
        this.pool.scheduleAtFixedRate(() -> {
            try {
                Set<String> runLuaScript = runLuaScript(DELAY_QUEUE);
                if (!CollectionUtils.isEmpty(runLuaScript)) {
                    runLuaScript.forEach(str -> {
                        this.delayRunFactory.delayExecute((DelayQueueMessage) JSON.toJavaObject(JSON.parseObject(str), DelayQueueMessage.class));
                    });
                }
            } catch (Throwable th) {
                logger.error("RemindMessageTask error..", th);
            }
        }, round, j, TimeUnit.SECONDS);
    }

    public Set<String> runLuaScript(String str) {
        return new HashSet((List) this.redisTemplate.execute(this.delayRedisScript, Collections.singletonList(str), new Object[]{Double.valueOf(0.0d), Double.valueOf(System.currentTimeMillis())}));
    }
}
