package com.facebook.presto.kafka.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.airlift.testing.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;

/* loaded from: input_file:com/facebook/presto/kafka/util/EmbeddedKafka.class */
public class EmbeddedKafka implements Closeable {
    private final EmbeddedZookeeper zookeeper;
    private final int port;
    private final File kafkaDataDir;
    private final KafkaServerStartable kafka;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();

    /* loaded from: input_file:com/facebook/presto/kafka/util/EmbeddedKafka$CloseableProducer.class */
    public static class CloseableProducer<K, V> extends Producer<K, V> implements AutoCloseable {
        public CloseableProducer(ProducerConfig producerConfig) {
            super(producerConfig);
        }
    }

    public static EmbeddedKafka createEmbeddedKafka() throws IOException {
        return new EmbeddedKafka(new EmbeddedZookeeper(), new Properties());
    }

    public static EmbeddedKafka createEmbeddedKafka(Properties properties) throws IOException {
        return new EmbeddedKafka(new EmbeddedZookeeper(), properties);
    }

    EmbeddedKafka(EmbeddedZookeeper embeddedZookeeper, Properties properties) throws IOException {
        this.zookeeper = (EmbeddedZookeeper) Objects.requireNonNull(embeddedZookeeper, "zookeeper is null");
        Objects.requireNonNull(properties, "overrideProperties is null");
        this.port = TestUtils.findUnusedPort();
        this.kafkaDataDir = Files.createTempDir();
        this.kafka = new KafkaServerStartable(new KafkaConfig(TestUtils.toProperties(ImmutableMap.builder().put("broker.id", "0").put("host.name", "localhost").put("num.partitions", "2").put("log.flush.interval.messages", "10000").put("log.flush.interval.ms", "1000").put("log.retention.minutes", "60").put("log.segment.bytes", "1048576").put("auto.create.topics.enable", "false").put("zookeeper.connection.timeout.ms", "1000000").put("port", Integer.toString(this.port)).put("log.dirs", this.kafkaDataDir.getAbsolutePath()).put("zookeeper.connect", embeddedZookeeper.getConnectString()).putAll(Maps.fromProperties(properties)).build())));
    }

    public void start() throws InterruptedException, IOException {
        if (this.started.getAndSet(true)) {
            return;
        }
        this.zookeeper.start();
        this.kafka.startup();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (!this.started.get() || this.stopped.getAndSet(true)) {
            return;
        }
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        this.zookeeper.close();
        FileUtils.deleteRecursively(this.kafkaDataDir);
    }

    public void createTopics(String... strArr) {
        createTopics(2, 1, new Properties(), strArr);
    }

    public void createTopics(int i, int i2, Properties properties, String... strArr) {
        Preconditions.checkState(this.started.get() && !this.stopped.get(), "not started!");
        ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
        try {
            for (String str : strArr) {
                AdminUtils.createTopic(zkClient, str, i, i2, properties);
            }
        } finally {
            zkClient.close();
        }
    }

    public CloseableProducer<Long, Object> createProducer() {
        return new CloseableProducer<>(new ProducerConfig(TestUtils.toProperties(ImmutableMap.builder().put("metadata.broker.list", getConnectString()).put("serializer.class", JsonEncoder.class.getName()).put("key.serializer.class", NumberEncoder.class.getName()).put("partitioner.class", NumberPartitioner.class.getName()).put("request.required.acks", "1").build())));
    }

    public int getZookeeperPort() {
        return this.zookeeper.getPort();
    }

    public int getPort() {
        return this.port;
    }

    public String getConnectString() {
        return "localhost:" + Integer.toString(this.port);
    }

    public String getZookeeperConnectString() {
        return this.zookeeper.getConnectString();
    }
}
