package com.redis.spring.batch.reader;

import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.util.SetBlockingQueue;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K, V> extends AbstractItemStreamItemReader<K> implements PollableItemReader<K>, RedisPubSubListener<String, String>, RedisClusterPubSubListener<String, String> {
    public static final String QUEUE_METER = "redis.batch.notification.queue.size";
    private static final String SEPARATOR = ":";
    private final AbstractRedisClient client;
    private final Function<String, K> stringKeyEncoder;
    private String keyType;
    private BlockingQueue<KeyspaceNotification> queue;
    private StatefulRedisPubSubConnection<String, String> connection;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    public static final OrderingStrategy DEFAULT_ORDERING = OrderingStrategy.PRIORITY;
    private static final Map<String, KeyEvent> keyEvents = (Map) Stream.of((Object[]) KeyEvent.values()).collect(Collectors.toMap((v0) -> {
        return v0.getString();
    }, Function.identity()));
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();
    private final Log log = LogFactory.getLog(getClass());
    protected String pattern = RedisItemReader.DEFAULT_PUBSUB_PATTERN;
    private OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private int queueCapacity = 10000;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$OrderingStrategy.class */
    public enum OrderingStrategy {
        FIFO,
        PRIORITY
    }

    public KeyspaceNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        this.client = abstractRedisClient;
        this.stringKeyEncoder = CodecUtils.stringKeyFunction(redisCodec);
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public String getPattern() {
        return this.pattern;
    }

    public void setPattern(String str) {
        this.pattern = str;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }

    public OrderingStrategy getOrderingStrategy() {
        return this.orderingStrategy;
    }

    public void setOrderingStrategy(OrderingStrategy orderingStrategy) {
        this.orderingStrategy = orderingStrategy;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (isOpen()) {
            return;
        }
        this.queue = new SetBlockingQueue(notificationQueue(), this.queueCapacity);
        Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
        this.connection = RedisModulesUtils.pubSubConnection(this.client);
        if (!(this.connection instanceof StatefulRedisClusterPubSubConnection)) {
            this.connection.sync().psubscribe(new String[]{this.pattern});
            this.connection.addListener(this);
        } else {
            StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.connection;
            statefulRedisClusterPubSubConnection.addListener(this);
            statefulRedisClusterPubSubConnection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().upstream().commands()).psubscribe(new String[]{this.pattern});
        }
    }

    private BlockingQueue<KeyspaceNotification> notificationQueue() {
        return this.orderingStrategy == OrderingStrategy.PRIORITY ? new PriorityBlockingQueue(this.queueCapacity, NOTIFICATION_COMPARATOR) : new LinkedBlockingQueue(this.queueCapacity);
    }

    public boolean isOpen() {
        return this.connection != null;
    }

    protected void notification(String str, String str2) {
        if (str == null) {
            return;
        }
        String substring = str.substring(str.indexOf(":") + 1);
        KeyEvent keyEvent = keyEvent(str2);
        if (this.keyType == null || this.keyType.equalsIgnoreCase(keyEvent.getType())) {
            KeyspaceNotification keyspaceNotification = new KeyspaceNotification();
            keyspaceNotification.setKey(substring);
            keyspaceNotification.setEvent(keyEvent);
            if (enqueue(keyspaceNotification)) {
                return;
            }
            this.log.debug("Keyspace notification queue is full");
        }
    }

    private boolean enqueue(KeyspaceNotification keyspaceNotification) {
        if (this.queue.remainingCapacity() > 0) {
            return this.queue.offer(keyspaceNotification);
        }
        return false;
    }

    private KeyEvent keyEvent(String str) {
        return keyEvents.getOrDefault(str, KeyEvent.UNKNOWN);
    }

    public synchronized void close() {
        if (isOpen()) {
            if (this.connection instanceof StatefulRedisClusterPubSubConnection) {
                StatefulRedisClusterPubSubConnection statefulRedisClusterPubSubConnection = this.connection;
                ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().upstream().commands()).punsubscribe(new String[]{this.pattern});
                statefulRedisClusterPubSubConnection.removeListener(this);
            } else {
                this.connection.sync().punsubscribe(new String[]{this.pattern});
                this.connection.removeListener(this);
            }
            this.connection.close();
            this.connection = null;
        }
        super.close();
    }

    public K read() throws Exception {
        return poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyspaceNotification poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return this.stringKeyEncoder.apply(poll.getKey());
    }

    public void message(String str, String str2) {
        notification(str, str2);
    }

    public void message(String str, String str2, String str3) {
        notification(str2, str3);
    }

    public void subscribed(String str, long j) {
    }

    public void psubscribed(String str, long j) {
    }

    public void unsubscribed(String str, long j) {
    }

    public void punsubscribed(String str, long j) {
    }

    public void message(RedisClusterNode redisClusterNode, String str, String str2) {
        notification(str, str2);
    }

    public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
        notification(str2, str3);
    }

    public void subscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void psubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void unsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }

    public void punsubscribed(RedisClusterNode redisClusterNode, String str, long j) {
    }
}
