package io.kcache;

import io.kcache.exceptions.CacheException;
import io.kcache.exceptions.CacheInitializationException;
import io.kcache.exceptions.CacheTimeoutException;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.OffsetCheckpoint;
import io.kcache.utils.ShutdownableThread;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kcache/KafkaCache.class */
public class KafkaCache<K, V> implements Cache<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaCache.class);
    private String topic;
    private int desiredReplicationFactor;
    private int desiredNumPartitions;
    private String groupId;
    private String clientId;
    private CacheUpdateHandler<K, V> cacheUpdateHandler;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private Cache<K, V> localCache;
    private boolean requireCompact;
    private int initTimeout;
    private int timeout;
    private String checkpointDir;
    private String bootstrapBrokers;
    private Producer<byte[], byte[]> producer;
    private Consumer<byte[], byte[]> consumer;
    private KafkaCache<K, V>.WorkerThread kafkaTopicReader;
    private KafkaCacheConfig config;
    private OffsetCheckpoint checkpointFile;
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private Map<TopicPartition, Long> checkpointFileCache = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/kcache/KafkaCache$WorkerThread.class */
    public class WorkerThread extends ShutdownableThread {
        private final ReentrantLock consumerLock;
        private final Condition runningCondition;
        private final AtomicBoolean isRunning;
        private final ReentrantLock offsetUpdateLock;
        private final Condition offsetReachedThreshold;
        private final Map<Integer, Long> offsetsInTopic;

        public WorkerThread() {
            super("kafka-cache-reader-thread-" + KafkaCache.this.topic);
            this.offsetsInTopic = new ConcurrentHashMap();
            this.consumerLock = new ReentrantLock();
            this.runningCondition = this.consumerLock.newCondition();
            this.isRunning = new AtomicBoolean(true);
            this.offsetUpdateLock = new ReentrantLock();
            this.offsetReachedThreshold = this.offsetUpdateLock.newCondition();
            int i = 0;
            List list = null;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= 10) {
                    break;
                }
                list = KafkaCache.this.consumer.partitionsFor(KafkaCache.this.topic);
                if (list != null && list.size() >= 1) {
                    break;
                } else {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                }
            }
            if (list == null || list.size() < 1) {
                throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " + KafkaCache.this.topic + " backing this data cache. Topic may not exist.");
            }
            List<TopicPartition> list2 = (List) list.stream().peek(partitionInfo -> {
                this.offsetsInTopic.put(Integer.valueOf(partitionInfo.partition()), -1L);
            }).map(partitionInfo2 -> {
                return new TopicPartition(KafkaCache.this.topic, partitionInfo2.partition());
            }).collect(Collectors.toList());
            KafkaCache.this.consumer.assign(list2);
            if (KafkaCache.this.localCache.isPersistent()) {
                for (TopicPartition topicPartition : list2) {
                    Long l = (Long) KafkaCache.this.checkpointFileCache.get(topicPartition);
                    if (l != null) {
                        KafkaCache.log.info("Seeking to checkpoint {} for {}", l, topicPartition);
                        KafkaCache.this.consumer.seek(topicPartition, l.longValue());
                    } else {
                        KafkaCache.log.info("Seeking to beginning for {}", topicPartition);
                        KafkaCache.this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
                    }
                }
            } else {
                KafkaCache.log.info("Seeking to beginning for all partitions");
                KafkaCache.this.consumer.seekToBeginning(list2);
            }
            KafkaCache.log.info("Initialized last consumed offset to {}", this.offsetsInTopic);
            KafkaCache.log.info("KafkaTopicReader thread started for {}.", KafkaCache.this.clientId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readToEndOffsets() throws IOException {
            Set assignment = KafkaCache.this.consumer.assignment();
            Map<TopicPartition, Long> endOffsets = KafkaCache.this.consumer.endOffsets(assignment);
            KafkaCache.log.trace("Reading to end of offsets {}", endOffsets);
            int i = 0;
            while (!hasReadToEndOffsets(endOffsets)) {
                try {
                    i += poll();
                } catch (InvalidOffsetException e) {
                    if (KafkaCache.this.localCache.isPersistent()) {
                        KafkaCache.this.localCache.close();
                        KafkaCache.this.localCache.destroy();
                        KafkaCache.this.localCache.init();
                    }
                    KafkaCache.log.warn("Seeking to beginning due to invalid offset", e);
                    KafkaCache.this.consumer.seekToBeginning(assignment);
                    i = 0;
                }
            }
            KafkaCache.log.info("During init or sync, processed {} records from topic {}", Integer.valueOf(i), KafkaCache.this.topic);
        }

        private boolean hasReadToEndOffsets(Map<TopicPartition, Long> map) {
            map.entrySet().removeIf(entry -> {
                return KafkaCache.this.consumer.position((TopicPartition) entry.getKey()) >= ((Long) entry.getValue()).longValue();
            });
            return map.isEmpty();
        }

        @Override // io.kcache.utils.ShutdownableThread
        protected void doWork() {
            try {
                this.consumerLock.lock();
                while (!this.isRunning.get()) {
                    this.runningCondition.await();
                }
                poll();
            } catch (InterruptedException e) {
            } finally {
                this.consumerLock.unlock();
            }
        }

        /* JADX WARN: Finally extract failed */
        /* JADX WARN: Multi-variable type inference failed */
        private int poll() {
            Object deserialize;
            int i = 0;
            try {
                ConsumerRecords poll = KafkaCache.this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    try {
                        try {
                            try {
                                deserialize = KafkaCache.this.keySerde.deserializer().deserialize(KafkaCache.this.topic, (byte[]) consumerRecord.key());
                            } catch (Throwable th) {
                                updateOffset(consumerRecord.partition(), consumerRecord.offset());
                                throw th;
                            }
                        } catch (Exception e) {
                            KafkaCache.log.error("Failed to add record from the Kafka topic " + KafkaCache.this.topic + " to the local cache", e);
                            updateOffset(consumerRecord.partition(), consumerRecord.offset());
                        }
                        try {
                            Object deserialize2 = consumerRecord.value() == null ? null : KafkaCache.this.valueSerde.deserializer().deserialize(KafkaCache.this.topic, (byte[]) consumerRecord.value());
                            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                            long offset = consumerRecord.offset();
                            long timestamp = consumerRecord.timestamp();
                            if (KafkaCache.this.cacheUpdateHandler.validateUpdate(deserialize, deserialize2, topicPartition, offset, timestamp)) {
                                KafkaCache.log.trace("Applying update ({}, {}) to the local cache", deserialize, deserialize2);
                                KafkaCache.this.cacheUpdateHandler.handleUpdate(deserialize, deserialize2, deserialize2 == null ? KafkaCache.this.localCache.remove(deserialize) : KafkaCache.this.localCache.put(deserialize, deserialize2), topicPartition, offset, timestamp);
                            } else if (!KafkaCache.this.localCache.containsKey(deserialize)) {
                                try {
                                    KafkaCache.this.producer.send(new ProducerRecord(KafkaCache.this.topic, consumerRecord.key(), (Object) null));
                                } catch (KafkaException e2) {
                                    KafkaCache.log.warn("Failed to tombstone invalid update", e2);
                                }
                            }
                            updateOffset(consumerRecord.partition(), consumerRecord.offset());
                        } catch (Exception e3) {
                            KafkaCache.log.error("Failed to deserialize a value", e3);
                            updateOffset(consumerRecord.partition(), consumerRecord.offset());
                        }
                    } catch (Exception e4) {
                        KafkaCache.log.error("Failed to deserialize the key", e4);
                        updateOffset(consumerRecord.partition(), consumerRecord.offset());
                    }
                }
                i = poll.count();
                KafkaCache.this.cacheUpdateHandler.checkpoint(i);
                if (KafkaCache.this.localCache.isPersistent()) {
                    try {
                        KafkaCache.this.localCache.flush();
                        checkpointOffsets();
                    } catch (CacheException e5) {
                        KafkaCache.log.warn("Failed to flush", e5);
                    }
                }
            } catch (RuntimeException e6) {
                KafkaCache.log.error("KafkaTopicReader thread for {} has died for an unknown reason.", KafkaCache.this.clientId, e6);
                throw e6;
            } catch (RecordTooLargeException e7) {
                throw new IllegalStateException("Consumer threw RecordTooLargeException. Data has been written that exceeds the default maximum fetch size.", e7);
            } catch (WakeupException e8) {
            }
            return i;
        }

        private void updateOffset(int i, long j) {
            try {
                this.offsetUpdateLock.lock();
                this.offsetsInTopic.put(Integer.valueOf(i), Long.valueOf(j));
                this.offsetReachedThreshold.signalAll();
                this.offsetUpdateLock.unlock();
            } catch (Throwable th) {
                this.offsetUpdateLock.unlock();
                throw th;
            }
        }

        private void checkpointOffsets() {
            KafkaCache.this.checkpointFileCache.putAll((Map) this.offsetsInTopic.entrySet().stream().collect(Collectors.toMap(entry -> {
                return new TopicPartition(KafkaCache.this.topic, ((Integer) entry.getKey()).intValue());
            }, entry2 -> {
                return Long.valueOf(((Long) entry2.getValue()).longValue() + 1);
            })));
            try {
                KafkaCache.this.checkpointFile.write(KafkaCache.this.checkpointFileCache);
            } catch (IOException e) {
                KafkaCache.log.warn("Failed to write offset checkpoint file to {}: {}", KafkaCache.this.checkpointFile, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilOffset(int i, long j, Duration duration) throws CacheException {
            if (j < 0) {
                throw new CacheException("KafkaTopicReader thread can't wait for a negative offset.");
            }
            KafkaCache.log.trace("Waiting to read offset {}. Currently at offset {}", Long.valueOf(j), this.offsetsInTopic.get(Integer.valueOf(i)));
            try {
                this.offsetUpdateLock.lock();
                long nanos = duration.toNanos();
                while (this.offsetsInTopic.get(Integer.valueOf(i)).longValue() < j && nanos > 0) {
                    try {
                        nanos = this.offsetReachedThreshold.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        KafkaCache.log.debug("Interrupted while waiting for the background cache reader thread to reach the specified offset: " + j, e);
                    }
                }
                if (this.offsetsInTopic.get(Integer.valueOf(i)).longValue() < j) {
                    throw new CacheTimeoutException("KafkaCacheTopic thread failed to reach target offset within the timeout interval. targetOffset: " + j + ", offsetReached: " + this.offsetsInTopic.get(Integer.valueOf(i)) + ", timeout(ms): " + duration.toMillis());
                }
            } finally {
                this.offsetUpdateLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilEndOffsets() throws CacheException {
            this.isRunning.set(false);
            KafkaCache.this.consumer.wakeup();
            try {
                this.consumerLock.lock();
                try {
                    readToEndOffsets();
                } catch (Exception e) {
                    KafkaCache.log.warn("Could not read to end offsets", e);
                }
                this.isRunning.set(true);
                this.runningCondition.signalAll();
            } finally {
                this.consumerLock.unlock();
            }
        }

        @Override // io.kcache.utils.ShutdownableThread
        public void shutdown() throws InterruptedException {
            KafkaCache.log.debug("Starting shutdown of KafkaTopicReader thread for {}.", KafkaCache.this.clientId);
            super.initiateShutdown();
            if (KafkaCache.this.consumer != null) {
                KafkaCache.this.consumer.wakeup();
            }
            super.awaitShutdown();
            if (KafkaCache.this.consumer != null) {
                KafkaCache.this.consumer.close();
            }
            KafkaCache.log.info("KafkaTopicReader thread shutdown complete for {}.", KafkaCache.this.clientId);
        }
    }

    public KafkaCache(String str, Serde<K> serde, Serde<V> serde2) {
        Properties properties = new Properties();
        properties.put(KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, str);
        setUp(new KafkaCacheConfig(properties), serde, serde2, null, new InMemoryCache());
    }

    public KafkaCache(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2) {
        setUp(kafkaCacheConfig, serde, serde2, null, new InMemoryCache());
    }

    public KafkaCache(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2, CacheUpdateHandler<K, V> cacheUpdateHandler, Cache<K, V> cache) {
        setUp(kafkaCacheConfig, serde, serde2, cacheUpdateHandler, cache);
    }

    private void setUp(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2, CacheUpdateHandler<K, V> cacheUpdateHandler, Cache<K, V> cache) {
        this.topic = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG);
        this.desiredReplicationFactor = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TOPIC_REPLICATION_FACTOR_CONFIG).intValue();
        this.desiredNumPartitions = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TOPIC_NUM_PARTITIONS_CONFIG).intValue();
        this.groupId = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG);
        this.clientId = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG);
        if (this.clientId == null) {
            this.clientId = "kafka-cache-reader-" + this.topic;
        }
        this.requireCompact = kafkaCacheConfig.getBoolean(KafkaCacheConfig.KAFKACACHE_TOPIC_REQUIRE_COMPACT_CONFIG).booleanValue();
        this.initTimeout = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_INIT_TIMEOUT_CONFIG).intValue();
        this.timeout = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TIMEOUT_CONFIG).intValue();
        this.checkpointDir = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_CHECKPOINT_DIR_CONFIG);
        this.cacheUpdateHandler = cacheUpdateHandler != null ? cacheUpdateHandler : (obj, obj2, obj3, topicPartition, j, j2) -> {
        };
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.localCache = cache;
        this.config = kafkaCacheConfig;
        this.bootstrapBrokers = kafkaCacheConfig.bootstrapBrokers();
        log.info("Initializing Kafka cache {} with broker endpoints {} ", this.clientId, this.bootstrapBrokers);
    }

    @Override // java.util.SortedMap
    public Comparator<? super K> comparator() {
        return this.localCache.comparator();
    }

    @Override // io.kcache.Cache
    public boolean isPersistent() {
        return this.localCache.isPersistent();
    }

    @Override // io.kcache.Cache
    public void init() throws CacheInitializationException {
        if (this.initialized.get()) {
            throw new CacheInitializationException("Illegal state while initializing cache for " + this.clientId + ". Cache was already initialized");
        }
        if (this.localCache.isPersistent()) {
            try {
                this.checkpointFile = new OffsetCheckpoint(this.checkpointDir, 0, this.topic);
                this.checkpointFileCache.putAll(this.checkpointFile.read());
                log.info("Successfully read checkpoints");
            } catch (IOException e) {
                throw new CacheInitializationException("Failed to read checkpoints", e);
            }
        }
        this.localCache.init();
        createOrVerifyTopic();
        this.producer = createProducer();
        this.consumer = createConsumer();
        this.kafkaTopicReader = new WorkerThread();
        try {
            this.kafkaTopicReader.readToEndOffsets();
            this.kafkaTopicReader.start();
            if (!this.initialized.compareAndSet(false, true)) {
                throw new CacheInitializationException("Illegal state while initializing cache for " + this.clientId + ". Cache was already initialized");
            }
        } catch (IOException e2) {
            throw new CacheInitializationException("Failed to read to end offsets", e2);
        }
    }

    @Override // io.kcache.Cache
    public void sync() {
        assertInitialized();
        this.localCache.sync();
        this.kafkaTopicReader.waitUntilEndOffsets();
    }

    private Consumer<byte[], byte[]> createConsumer() {
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("group.id", this.groupId);
        properties.put("client.id", this.clientId);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        return new KafkaConsumer(properties);
    }

    private Producer<byte[], byte[]> createProducer() {
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        properties.put("acks", "-1");
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.put("retries", 0);
        return new KafkaProducer(properties);
    }

    private void addKafkaCacheConfigsToClientProperties(Properties properties) {
        properties.putAll(this.config.originalsWithPrefix("kafkacache."));
    }

    /* JADX WARN: Failed to calculate best type for var: r7v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r7v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r8v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 7, insn: 0x007c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:26:0x007c */
    /* JADX WARN: Not initialized variable reg: 8, insn: 0x0080: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:28:0x0080 */
    /* JADX WARN: Type inference failed for: r7v2, types: [org.apache.kafka.clients.admin.AdminClient] */
    /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
    private void createOrVerifyTopic() throws CacheInitializationException {
        ?? r7;
        ?? r8;
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        try {
            try {
                AdminClient create = AdminClient.create(properties);
                Throwable th = null;
                if (((Set) create.listTopics().names().get(this.initTimeout, TimeUnit.MILLISECONDS)).contains(this.topic)) {
                    verifyTopic(create);
                } else {
                    createTopic(create);
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } catch (Throwable th3) {
                if (r7 != 0) {
                    if (r8 != 0) {
                        try {
                            r7.close();
                        } catch (Throwable th4) {
                            r8.addSuppressed(th4);
                        }
                    } else {
                        r7.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new CacheInitializationException("Failed trying to create or validate topic " + this.topic, e);
        } catch (TimeoutException e2) {
            throw new CacheInitializationException("Timed out trying to create or validate topic " + this.topic, e2);
        }
    }

    private void createTopic(AdminClient adminClient) throws CacheInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Creating topic {}", this.topic);
        int size = ((Collection) adminClient.describeCluster().nodes().get(this.initTimeout, TimeUnit.MILLISECONDS)).size();
        if (size <= 0) {
            throw new CacheInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, this.desiredReplicationFactor);
        if (min < this.desiredReplicationFactor) {
            log.warn("Creating the topic " + this.topic + " using a replication factor of " + min + ", which is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        NewTopic newTopic = new NewTopic(this.topic, this.desiredNumPartitions, (short) min);
        newTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(this.initTimeout, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
            verifyTopic(adminClient);
        }
    }

    private void verifyTopic(AdminClient adminClient) throws CacheInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Validating topic {}", this.topic);
        try {
            TopicDescription topicDescription = (TopicDescription) ((Map) adminClient.describeTopics(Collections.singleton(this.topic)).all().get(this.initTimeout, TimeUnit.MILLISECONDS)).get(this.topic);
            if (topicDescription.partitions().size() < this.desiredNumPartitions) {
                log.warn("The number of partitions for the topic " + this.topic + " is less than the desired value of " + this.desiredReplicationFactor + ".");
            }
            if (((TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size() < this.desiredReplicationFactor) {
                log.warn("The replication factor of the topic " + this.topic + " is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
            }
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.topic);
            String value = ((Config) ((Map) adminClient.describeConfigs(Collections.singleton(configResource)).all().get(this.initTimeout, TimeUnit.MILLISECONDS)).get(configResource)).get("cleanup.policy").value();
            if ("compact".equals(value)) {
                return;
            }
            String str = "The retention policy of the topic " + this.topic + " is not 'compact'. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your data after a week. Refer to Kafka documentation for more details on cleanup policies.";
            if (this.requireCompact) {
                log.error(str);
                throw new CacheInitializationException("The retention policy of the topic " + this.topic + " is incorrect. Expected cleanup.policy to be 'compact' but it is " + value);
            }
            log.warn(str);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw e;
            }
            log.warn("Could not verify existing topic.");
        }
    }

    @Override // java.util.Map
    public int size() {
        assertInitialized();
        return this.localCache.size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        assertInitialized();
        return this.localCache.isEmpty();
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        assertInitialized();
        return this.localCache.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        assertInitialized();
        return this.localCache.containsValue(obj);
    }

    @Override // java.util.Map
    public V get(Object obj) {
        assertInitialized();
        return (V) this.localCache.get(obj);
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        if (k == null) {
            throw new CacheException("Key should not be null");
        }
        assertInitialized();
        V v2 = get(k);
        try {
            ProducerRecord producerRecord = new ProducerRecord(this.topic, this.keySerde.serializer().serialize(this.topic, k), v == null ? null : this.valueSerde.serializer().serialize(this.topic, v));
            try {
                log.trace("Sending record to Kafka cache topic: {}", producerRecord);
                RecordMetadata recordMetadata = (RecordMetadata) this.producer.send(producerRecord).get(this.timeout, TimeUnit.MILLISECONDS);
                log.trace("Waiting for the local cache to catch up to offset {}", Long.valueOf(recordMetadata.offset()));
                this.kafkaTopicReader.waitUntilOffset(recordMetadata.partition(), recordMetadata.offset(), Duration.ofMillis(this.timeout));
                return v2;
            } catch (ExecutionException e) {
                throw new CacheException("Put operation failed while waiting for an ack from Kafka", e);
            } catch (TimeoutException e2) {
                throw new CacheTimeoutException("Put operation timed out while waiting for an ack from Kafka", e2);
            } catch (KafkaException e3) {
                throw new CacheException("Put operation to Kafka failed", e3);
            } catch (InterruptedException e4) {
                throw new CacheException("Put operation interrupted while waiting for an ack from Kafka", e4);
            }
        } catch (Exception e5) {
            throw new CacheException("Error serializing key while creating the Kafka produce record", e5);
        }
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        assertInitialized();
        for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
            put(entry.getKey(), entry.getValue());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.Map
    public V remove(Object obj) {
        assertInitialized();
        return put(obj, null);
    }

    @Override // java.util.Map
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Set<K> keySet() {
        assertInitialized();
        return this.localCache.keySet();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Collection<V> values() {
        assertInitialized();
        return this.localCache.values();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Set<Map.Entry<K, V>> entrySet() {
        assertInitialized();
        return this.localCache.entrySet();
    }

    @Override // java.util.SortedMap
    public K firstKey() {
        assertInitialized();
        return this.localCache.firstKey();
    }

    @Override // java.util.SortedMap
    public K lastKey() {
        assertInitialized();
        return this.localCache.lastKey();
    }

    @Override // io.kcache.Cache
    public Cache<K, V> subCache(K k, boolean z, K k2, boolean z2) {
        assertInitialized();
        return this.localCache.subCache(k, z, k2, z2);
    }

    @Override // io.kcache.Cache
    public KeyValueIterator<K, V> range(K k, boolean z, K k2, boolean z2) {
        assertInitialized();
        return this.localCache.range(k, z, k2, z2);
    }

    @Override // io.kcache.Cache
    public KeyValueIterator<K, V> all() {
        assertInitialized();
        return this.localCache.all();
    }

    @Override // io.kcache.Cache
    public Cache<K, V> descendingCache() {
        assertInitialized();
        return this.localCache.descendingCache();
    }

    @Override // io.kcache.Cache
    public void flush() {
        assertInitialized();
        this.localCache.flush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.kafkaTopicReader != null) {
            try {
                this.kafkaTopicReader.shutdown();
            } catch (InterruptedException e) {
            }
        }
        if (this.producer != null) {
            this.producer.close();
            log.debug("Kafka cache producer shut down for {}", this.clientId);
        }
        this.localCache.close();
        if (this.checkpointFile != null) {
            this.checkpointFile.close();
        }
        if (this.cacheUpdateHandler != null) {
            this.cacheUpdateHandler.close();
        }
        log.info("Kafka cache shut down complete for {}", this.clientId);
    }

    @Override // io.kcache.Cache
    public void destroy() throws IOException {
        assertInitialized();
        this.localCache.destroy();
    }

    private void assertInitialized() throws CacheException {
        if (!this.initialized.get()) {
            throw new CacheException("Illegal state. Cache for " + this.clientId + " not initialized yet");
        }
    }

    KafkaCache<K, V>.WorkerThread getWorkerThread() {
        return this.kafkaTopicReader;
    }
}
