package com.redislabs.riot.redis.replicate;

import com.redislabs.riot.redis.KeyValue;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyValueProducer.class */
public class KeyValueProducer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KeyValueProducer.class);
    private KeyIterator keyIterator;
    private int batchSize;
    private GenericObjectPool<StatefulConnection<String, String>> pool;
    private Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> asyncApi;
    private long timeout;
    private BlockingQueue<KeyValue> queue;
    private BlockingQueue<String> keys;
    private boolean stopped;

    /* loaded from: input_file:com/redislabs/riot/redis/replicate/KeyValueProducer$KeyValueProducerBuilder.class */
    public static class KeyValueProducerBuilder {
        private KeyIterator keyIterator;
        private int batchSize;
        private GenericObjectPool<StatefulConnection<String, String>> pool;
        private Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> asyncApi;
        private long timeout;
        private BlockingQueue<KeyValue> queue;
        private BlockingQueue<String> keys;
        private boolean stopped;

        KeyValueProducerBuilder() {
        }

        public KeyValueProducerBuilder keyIterator(KeyIterator keyIterator) {
            this.keyIterator = keyIterator;
            return this;
        }

        public KeyValueProducerBuilder batchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public KeyValueProducerBuilder pool(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool) {
            this.pool = genericObjectPool;
            return this;
        }

        public KeyValueProducerBuilder asyncApi(Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> function) {
            this.asyncApi = function;
            return this;
        }

        public KeyValueProducerBuilder timeout(long j) {
            this.timeout = j;
            return this;
        }

        public KeyValueProducerBuilder queue(BlockingQueue<KeyValue> blockingQueue) {
            this.queue = blockingQueue;
            return this;
        }

        public KeyValueProducerBuilder keys(BlockingQueue<String> blockingQueue) {
            this.keys = blockingQueue;
            return this;
        }

        public KeyValueProducerBuilder stopped(boolean z) {
            this.stopped = z;
            return this;
        }

        public KeyValueProducer build() {
            return new KeyValueProducer(this.keyIterator, this.batchSize, this.pool, this.asyncApi, this.timeout, this.queue, this.keys, this.stopped);
        }

        public String toString() {
            return "KeyValueProducer.KeyValueProducerBuilder(keyIterator=" + this.keyIterator + ", batchSize=" + this.batchSize + ", pool=" + this.pool + ", asyncApi=" + this.asyncApi + ", timeout=" + this.timeout + ", queue=" + this.queue + ", keys=" + this.keys + ", stopped=" + this.stopped + ")";
        }
    }

    public void stop() {
        log.debug("Producer stopped");
        this.stopped = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.keys = new LinkedBlockingDeque(this.batchSize);
        while (this.keyIterator.hasNext() && !this.stopped) {
            try {
                String next = this.keyIterator.next();
                if (next != null) {
                    this.keys.put(next);
                    if (this.keys.size() >= this.batchSize) {
                        flush();
                    }
                }
            } catch (Throwable th) {
                log.error("Key/value producer encountered an error", th);
                return;
            }
        }
    }

    public void flush() {
        ArrayList<String> arrayList = new ArrayList(this.keys.size());
        this.keys.drainTo(arrayList);
        try {
            StatefulConnection<String, String> statefulConnection = (StatefulConnection) this.pool.borrowObject();
            try {
                RedisKeyAsyncCommands redisKeyAsyncCommands = (BaseRedisAsyncCommands) this.asyncApi.apply(statefulConnection);
                redisKeyAsyncCommands.setAutoFlushCommands(false);
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                ArrayList arrayList3 = new ArrayList(arrayList.size());
                for (String str : arrayList) {
                    arrayList2.add(redisKeyAsyncCommands.pttl(str));
                    arrayList3.add(redisKeyAsyncCommands.dump(str));
                }
                redisKeyAsyncCommands.flushCommands();
                for (int i = 0; i < arrayList.size(); i++) {
                    String str2 = (String) arrayList.get(i);
                    try {
                        this.queue.put(new KeyValue().key(str2).ttl(((Long) ((RedisFuture) arrayList2.get(i)).get(this.timeout, TimeUnit.SECONDS)).longValue()).value((byte[]) ((RedisFuture) arrayList3.get(i)).get(this.timeout, TimeUnit.SECONDS)));
                    } catch (Exception e) {
                        log.error("Could not read value for key {}", str2, e);
                    }
                }
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
            } finally {
            }
        } catch (Exception e2) {
            log.error("Could not get connection from pool for keys {}", arrayList, e2);
        }
    }

    KeyValueProducer(KeyIterator keyIterator, int i, GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> function, long j, BlockingQueue<KeyValue> blockingQueue, BlockingQueue<String> blockingQueue2, boolean z) {
        this.keyIterator = keyIterator;
        this.batchSize = i;
        this.pool = genericObjectPool;
        this.asyncApi = function;
        this.timeout = j;
        this.queue = blockingQueue;
        this.keys = blockingQueue2;
        this.stopped = z;
    }

    public static KeyValueProducerBuilder builder() {
        return new KeyValueProducerBuilder();
    }

    public KeyValueProducer keyIterator(KeyIterator keyIterator) {
        this.keyIterator = keyIterator;
        return this;
    }

    public KeyValueProducer batchSize(int i) {
        this.batchSize = i;
        return this;
    }

    public KeyValueProducer pool(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool) {
        this.pool = genericObjectPool;
        return this;
    }

    public KeyValueProducer asyncApi(Function<StatefulConnection<String, String>, RedisAsyncCommands<String, String>> function) {
        this.asyncApi = function;
        return this;
    }

    public KeyValueProducer timeout(long j) {
        this.timeout = j;
        return this;
    }

    public KeyValueProducer queue(BlockingQueue<KeyValue> blockingQueue) {
        this.queue = blockingQueue;
        return this;
    }
}
