package org.springframework.batch.item.redis.support;

import com.hybhub.util.concurrent.ConcurrentSetBlockingQueue;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyspaceNotificationItemReader.class */
public abstract class AbstractKeyspaceNotificationItemReader<K, V> extends AbstractPollableItemReader<K> {
    private static final Logger log = LoggerFactory.getLogger(AbstractKeyspaceNotificationItemReader.class);
    private final K pubSubPattern;
    private final Converter<K, K> keyExtractor;
    private final BlockingQueue<K> queue;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyspaceNotificationItemReader(Duration duration, K k, Converter<K, K> converter, int i) {
        super(duration);
        Assert.notNull(k, "A pub/sub subscription pattern is required.");
        Assert.notNull(converter, "A key extractor is required.");
        Assert.isTrue(i > 0, "Queue capacity must be greater than zero.");
        this.pubSubPattern = k;
        this.keyExtractor = converter;
        this.queue = new ConcurrentSetBlockingQueue(i);
    }

    @Override // org.springframework.batch.item.redis.support.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    @Override // org.springframework.batch.item.redis.support.AbstractPollableItemReader
    public void open(ExecutionContext executionContext) {
        super.open(executionContext);
        MetricsUtils.createGaugeCollectionSize("reader.notification.queue.size", this.queue, new Tag[0]);
        log.debug("Subscribing to pub/sub pattern {}, queue capacity: {}", this.pubSubPattern, Integer.valueOf(this.queue.remainingCapacity()));
        subscribe(this.pubSubPattern);
    }

    protected abstract void subscribe(K k);

    @Override // org.springframework.batch.item.redis.support.AbstractPollableItemReader
    public void close() {
        super.close();
        log.debug("Unsubscribing from pub/sub pattern {}", this.pubSubPattern);
        unsubscribe(this.pubSubPattern);
        this.queue.clear();
    }

    protected abstract void unsubscribe(K k);

    /* JADX INFO: Access modifiers changed from: protected */
    public void notification(K k, V v) {
        Object convert;
        if (k == null || (convert = this.keyExtractor.convert(k)) == null || this.queue.offer(convert)) {
            return;
        }
        log.debug("Notification queue full");
    }
}
