package com.redislabs.riot.redis.replicate;

import com.redislabs.riot.redis.KeyValue;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyValueReader.class */
public class KeyValueReader extends AbstractItemStreamItemReader<KeyValue> {
    private static final Logger log = LoggerFactory.getLogger(KeyValueReader.class);
    private KeyIterator keyIterator;
    private GenericObjectPool<StatefulConnection<String, String>> pool;
    private Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> asyncApi;
    private int timeout;
    private int valueQueueCapacity;
    private int threads;
    private int batchSize;
    private Long flushRate;
    private Object lock;
    private BlockingQueue<KeyValue> queue;
    private ExecutorService executor;
    private ScheduledExecutorService scheduler;
    private List<KeyValueProducer> producers;

    /* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyValueReader$KeyValueReaderBuilder.class */
    public static class KeyValueReaderBuilder {
        private KeyIterator keyIterator;
        private GenericObjectPool<StatefulConnection<String, String>> pool;
        private Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> asyncApi;
        private int timeout;
        private int valueQueueCapacity;
        private int threads;
        private int batchSize;
        private Long flushRate;

        KeyValueReaderBuilder() {
        }

        public KeyValueReaderBuilder keyIterator(KeyIterator keyIterator) {
            this.keyIterator = keyIterator;
            return this;
        }

        public KeyValueReaderBuilder pool(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool) {
            this.pool = genericObjectPool;
            return this;
        }

        public KeyValueReaderBuilder asyncApi(Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> function) {
            this.asyncApi = function;
            return this;
        }

        public KeyValueReaderBuilder timeout(int i) {
            this.timeout = i;
            return this;
        }

        public KeyValueReaderBuilder valueQueueCapacity(int i) {
            this.valueQueueCapacity = i;
            return this;
        }

        public KeyValueReaderBuilder threads(int i) {
            this.threads = i;
            return this;
        }

        public KeyValueReaderBuilder batchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public KeyValueReaderBuilder flushRate(Long l) {
            this.flushRate = l;
            return this;
        }

        public KeyValueReader build() {
            return new KeyValueReader(this.keyIterator, this.pool, this.asyncApi, this.timeout, this.valueQueueCapacity, this.threads, this.batchSize, this.flushRate);
        }

        public String toString() {
            return "KeyValueReader.KeyValueReaderBuilder(keyIterator=" + this.keyIterator + ", pool=" + this.pool + ", asyncApi=" + this.asyncApi + ", timeout=" + this.timeout + ", valueQueueCapacity=" + this.valueQueueCapacity + ", threads=" + this.threads + ", batchSize=" + this.batchSize + ", flushRate=" + this.flushRate + ")";
        }
    }

    private KeyValueReader(KeyIterator keyIterator, GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> function, int i, int i2, int i3, int i4, Long l) {
        this.lock = new Object();
        setName(ClassUtils.getShortName(KeyValueReader.class));
        this.keyIterator = keyIterator;
        this.pool = genericObjectPool;
        this.asyncApi = function;
        this.timeout = i;
        this.valueQueueCapacity = i2;
        this.threads = i3;
        this.batchSize = i4;
        this.flushRate = l;
    }

    public void open(ExecutionContext executionContext) {
        synchronized (this.lock) {
            if (this.queue != null) {
                return;
            }
            log.debug("Starting key iterator");
            this.keyIterator.start();
            log.debug("Creating queue with capacity {}", Integer.valueOf(this.valueQueueCapacity));
            this.queue = new LinkedBlockingDeque(this.valueQueueCapacity);
            log.debug("Creating thread pool of size {}", Integer.valueOf(this.threads));
            this.executor = Executors.newFixedThreadPool(this.threads);
            this.scheduler = Executors.newSingleThreadScheduledExecutor();
            this.producers = new ArrayList(this.threads);
            for (int i = 0; i < this.threads; i++) {
                log.debug("Adding KeyValue producer");
                this.producers.add(KeyValueProducer.builder().keyIterator(this.keyIterator).batchSize(this.batchSize).pool(this.pool).asyncApi(this.asyncApi).timeout(this.timeout).queue(this.queue).build());
            }
            for (KeyValueProducer keyValueProducer : this.producers) {
                log.debug("Starting producer");
                this.executor.submit(keyValueProducer);
                if (this.flushRate != null) {
                    ScheduledExecutorService scheduledExecutorService = this.scheduler;
                    Objects.requireNonNull(keyValueProducer);
                    scheduledExecutorService.scheduleAtFixedRate(keyValueProducer::flush, this.flushRate.longValue(), this.flushRate.longValue(), TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    public void close() {
        synchronized (this.lock) {
            this.producers.forEach(keyValueProducer -> {
                keyValueProducer.stop();
            });
            this.scheduler.shutdown();
            this.scheduler = null;
            this.executor.shutdown();
            this.executor = null;
            this.queue = null;
            this.keyIterator.stop();
        }
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public KeyValue m27read() {
        KeyValue poll;
        do {
            try {
                poll = this.queue.poll(100L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    break;
                }
            } catch (InterruptedException e) {
                return null;
            }
        } while (this.keyIterator.hasNext());
        return poll;
    }

    public static KeyValueReaderBuilder builder() {
        return new KeyValueReaderBuilder();
    }
}
