package io.kareldb.server.leader;

import io.kareldb.KarelDbConfig;
import io.kareldb.KarelDbEngine;
import io.kareldb.server.handler.UrlProvider;
import io.kareldb.server.leader.KarelDbProtocol;
import java.io.Closeable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kareldb/server/leader/KarelDbLeaderElector.class */
public class KarelDbLeaderElector implements KarelDbRebalanceListener, UrlProvider, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KarelDbLeaderElector.class);
    private static final AtomicInteger KDB_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kareldb";
    private final KarelDbEngine engine;
    private final int initTimeout;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final KarelDbCoordinator coordinator;
    private KarelDbIdentity myIdentity;
    private ExecutorService executor;
    private final AtomicReference<KarelDbIdentity> leader = new AtomicReference<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final CountDownLatch joinedLatch = new CountDownLatch(1);

    public KarelDbLeaderElector(KarelDbConfig karelDbConfig, KarelDbEngine karelDbEngine) throws KarelDbElectionException {
        try {
            this.engine = karelDbEngine;
            this.clientId = "kdb-" + KDB_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.myIdentity = findIdentity(karelDbConfig.getList("listeners"), karelDbConfig.getBoolean("leader.eligibility").booleanValue());
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("client-id", this.clientId);
            MetricConfig tags = new MetricConfig().tags(linkedHashMap);
            List singletonList = Collections.singletonList(new JmxReporter());
            KafkaMetricsContext kafkaMetricsContext = new KafkaMetricsContext(JMX_PREFIX, karelDbConfig.originals());
            Time time = Time.SYSTEM;
            ClientConfig clientConfig = new ClientConfig(karelDbConfig.originalsWithPrefix("kafkacache."), false);
            this.metrics = new Metrics(tags, singletonList, time, kafkaMetricsContext);
            this.retryBackoffMs = clientConfig.getLong("retry.backoff.ms").longValue();
            String string = karelDbConfig.getString("cluster.group.id");
            LogContext logContext = new LogContext("[KarelDB clientId=" + this.clientId + ", groupId=" + string + "] ");
            this.metadata = new Metadata(this.retryBackoffMs, clientConfig.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
            this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(karelDbConfig.getList("kafkacache.bootstrap.servers"), clientConfig.getString("client.dns.lookup")));
            this.client = new ConsumerNetworkClient(logContext, new NetworkClient(new Selector(clientConfig.getLong("connections.max.idle.ms").longValue(), this.metrics, time, JMX_PREFIX, ClientUtils.createChannelBuilder(clientConfig, time, logContext), logContext), this.metadata, this.clientId, 100, clientConfig.getLong("reconnect.backoff.ms").longValue(), clientConfig.getLong("reconnect.backoff.max.ms").longValue(), clientConfig.getInt("send.buffer.bytes").intValue(), clientConfig.getInt("receive.buffer.bytes").intValue(), clientConfig.getInt("request.timeout.ms").intValue(), ClientDnsLookup.forConfig(clientConfig.getString("client.dns.lookup")), time, true, new ApiVersions(), logContext), this.metadata, time, this.retryBackoffMs, clientConfig.getInt("request.timeout.ms").intValue(), Integer.MAX_VALUE);
            this.coordinator = new KarelDbCoordinator(logContext, this.client, string, 300000, 10000, 3000, this.metrics, JMX_PREFIX, time, this.retryBackoffMs, this.myIdentity, this);
            AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId, this.metrics, time.milliseconds());
            this.initTimeout = karelDbConfig.getInt("kafkacache.init.timeout.ms").intValue();
            LOG.debug("Group member created");
        } catch (Throwable th) {
            stop(true);
            throw new KarelDbElectionException("Failed to construct kafka consumer", th);
        }
    }

    static KarelDbIdentity findIdentity(List<String> list, boolean z) {
        Iterator<URI> it = parseListeners(list).iterator();
        if (!it.hasNext()) {
            throw new ConfigException("No listeners are configured. Must have at least one listener.");
        }
        URI next = it.next();
        return new KarelDbIdentity(next.getScheme(), next.getHost(), Integer.valueOf(next.getPort()), Boolean.valueOf(z));
    }

    static List<URI> parseListeners(List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (String str : list) {
            try {
                URI uri = new URI(str);
                if (uri.getScheme() == null) {
                    throw new ConfigException("Found a listener without a scheme. All listeners must have a scheme. The listener without a scheme is: " + str);
                }
                if (uri.getPort() == -1) {
                    throw new ConfigException("Found a listener without a port. All listeners must have a port. The listener without a port is: " + str);
                }
                arrayList.add(uri);
            } catch (URISyntaxException e) {
                throw new ConfigException("Could not parse a listener URI from the `listener` configuration option.");
            }
        }
        if (arrayList.isEmpty()) {
            throw new ConfigException("No listeners are configured. Must have at least one listener.");
        }
        return arrayList;
    }

    public void init() throws KarelDbElectionException {
        LOG.debug("Initializing group member");
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(() -> {
            while (!this.stopped.get()) {
                try {
                    this.coordinator.poll(2147483647L);
                } catch (Throwable th) {
                    LOG.error("Unexpected exception in group processing thread", th);
                    return;
                }
            }
        });
        try {
            if (!this.joinedLatch.await(this.initTimeout, TimeUnit.MILLISECONDS)) {
                throw new KarelDbElectionException("Timed out waiting for join group to complete");
            }
            LOG.debug("Group member initialized and joined group");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KarelDbElectionException("Interrupted while waiting for join group to complete", e);
        }
    }

    @Override // io.kareldb.server.handler.UrlProvider
    public Optional<String> url() {
        KarelDbIdentity karelDbIdentity = this.leader.get();
        if (karelDbIdentity == null) {
            throw new KarelDbElectionException("Leader is unknown");
        }
        return karelDbIdentity.equals(this.myIdentity) ? Optional.empty() : Optional.of(karelDbIdentity.getUrl());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.stopped.get()) {
            return;
        }
        stop(false);
    }

    @Override // io.kareldb.server.leader.KarelDbRebalanceListener
    public void onAssigned(KarelDbProtocol.Assignment assignment, int i) {
        LOG.info("Finished rebalance with leader election result: {}", assignment);
        try {
            switch (assignment.error()) {
                case KarelDbProtocol.Assignment.NO_ERROR /* 0 */:
                    if (assignment.leaderIdentity() == null) {
                        LOG.error("No leader eligible instances joined the group. Rebalancing was successful and this instance can serve reads, but no writes can be processed.");
                    }
                    setLeader(assignment.leaderIdentity());
                    LOG.info(isLeader() ? "Registered as leader" : "Registered as replica");
                    this.joinedLatch.countDown();
                    return;
                case 1:
                    throw new IllegalStateException("The group contained multiple members advertising the same URL. Verify that each instance has a unique, routable listener by setting the 'listeners' configuration. This error may happen if executing in containers where the default hostname is 'localhost'.");
                default:
                    throw new IllegalStateException("Unknown error returned from the coordination protocol");
            }
        } catch (KarelDbElectionException e) {
            LOG.error("Error when updating leader, we will not be able to forward requests to the leader", e);
        }
    }

    @Override // io.kareldb.server.leader.KarelDbRebalanceListener
    public void onRevoked() {
        LOG.info("Rebalance started");
        try {
            setLeader(null);
        } catch (KarelDbElectionException e) {
            LOG.error("Error when updating leader, we will not be able to forward requests to the leader", e);
        }
    }

    public KarelDbIdentity getIdentity() {
        return this.myIdentity;
    }

    public boolean isLeader() {
        KarelDbIdentity karelDbIdentity = this.leader.get();
        return karelDbIdentity != null && karelDbIdentity.equals(this.myIdentity);
    }

    private void setLeader(KarelDbIdentity karelDbIdentity) {
        KarelDbIdentity andSet = this.leader.getAndSet(karelDbIdentity);
        if (karelDbIdentity == null || karelDbIdentity.equals(andSet) || !karelDbIdentity.equals(this.myIdentity)) {
            return;
        }
        LOG.info("Syncing caches...");
        this.engine.sync();
    }

    private void stop(boolean z) {
        LOG.trace("Stopping the group member.");
        if (this.client != null) {
            this.client.wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted waiting for group processing thread to exit", e);
            }
        }
        AtomicReference atomicReference = new AtomicReference();
        this.stopped.set(true);
        closeQuietly(this.coordinator, "coordinator", atomicReference);
        closeQuietly(this.metrics, "consumer metrics", atomicReference);
        closeQuietly(this.client, "consumer network client", atomicReference);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        if (atomicReference.get() != null && !z) {
            throw new KafkaException("Failed to stop the group member", (Throwable) atomicReference.get());
        }
        LOG.debug("The group member has stopped.");
    }

    private static void closeQuietly(AutoCloseable autoCloseable, String str, AtomicReference<Throwable> atomicReference) {
        if (autoCloseable != null) {
            try {
                autoCloseable.close();
            } catch (Throwable th) {
                atomicReference.compareAndSet(null, th);
                LOG.error("Failed to close {} with type {}", new Object[]{str, autoCloseable.getClass().getName(), th});
            }
        }
    }
}
