package kr.jm.utils.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaServerStartable;
import kr.jm.utils.JMString;
import kr.jm.utils.JMThread;
import kr.jm.utils.enums.OS;
import kr.jm.utils.exception.JMException;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/kafka/JMKafkaServer.class */
public class JMKafkaServer {
    public static final String DEFAULT_KAFKA_LOG = "output-log";
    private static final Logger log = LoggerFactory.getLogger(JMKafkaServer.class);
    private static final String LOG_DIR = "log.dir";
    private String kafkaServerConnect;
    private KafkaServerStartable kafkaServer;
    private Properties kafkaServerProperties;
    private ExecutorService kafkaBrokerThreadPool;

    /* loaded from: input_file:kr/jm/utils/kafka/JMKafkaServer$Builder.class */
    public static class Builder {
        private String zookeeperConnect;
        private String serverIp = OS.getIp();
        private int serverPort = 9092;
        private String logDir = JMKafkaServer.DEFAULT_KAFKA_LOG;
        private int offsetsTopicReplicationFactor = 1;
        private Properties kafkaServerProperties = new Properties();

        public Builder(String str) {
            this.zookeeperConnect = str;
        }

        public Builder serverIp(String str) {
            this.serverIp = str;
            return this;
        }

        public Builder logDir(String str) {
            this.logDir = str;
            return this;
        }

        public Builder offsetsTopicReplicationFactor(int i) {
            this.offsetsTopicReplicationFactor = i;
            return this;
        }

        public Builder serverPort(int i) {
            this.serverPort = i;
            return this;
        }

        public Builder kafkaServerProperties(Properties properties) {
            this.kafkaServerProperties = properties;
            return this;
        }

        public JMKafkaServer build() {
            return new JMKafkaServer(this.zookeeperConnect, this.serverIp, this.serverPort, this.logDir, this.offsetsTopicReplicationFactor, this.kafkaServerProperties);
        }
    }

    private JMKafkaServer(String str, String str2, int i, String str3, int i2, Properties properties) {
        this.kafkaServerConnect = JMString.buildIpOrHostnamePortPair(str2, i);
        this.kafkaServerProperties = properties;
        this.kafkaServerProperties.put("zookeeper.connect", str);
        this.kafkaServerProperties.put("offsets.topic.replication.factor", String.valueOf(i2));
        this.kafkaServerProperties.put(LOG_DIR, str3);
        this.kafkaServerProperties.put("port", Integer.valueOf(i));
        this.kafkaServerProperties.put("listeners", "PLAINTEXT://" + this.kafkaServerConnect);
    }

    public JMKafkaServer start() {
        this.kafkaServer = KafkaServerStartable.fromProps(this.kafkaServerProperties);
        this.kafkaBrokerThreadPool = JMThread.newSingleThreadPool();
        JMThread.runAsync(() -> {
            Thread.currentThread().setName("JMKafkaServer-" + OS.getHostname());
            JMLog.info(log, "startup", new Object[0]);
            this.kafkaServer.startup();
        }, this.kafkaBrokerThreadPool);
        return this;
    }

    public void stop() {
        log.info("shutdown starting {} ms !!!", Long.valueOf(System.currentTimeMillis()));
        try {
            try {
                if (this.kafkaBrokerThreadPool != null) {
                    this.kafkaBrokerThreadPool.shutdown();
                    this.kafkaBrokerThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
                }
                if (this.kafkaServer != null) {
                    this.kafkaServer.shutdown();
                }
            } catch (Exception e) {
                JMException.handleException(log, e, "stop", new Object[]{this.kafkaBrokerThreadPool.shutdownNow()});
                if (this.kafkaServer != null) {
                    this.kafkaServer.shutdown();
                }
            }
            log.info("shutdown completely Over {} ms !!!", Long.valueOf(System.currentTimeMillis()));
        } catch (Throwable th) {
            if (this.kafkaServer != null) {
                this.kafkaServer.shutdown();
            }
            throw th;
        }
    }

    public int getPort() {
        return this.kafkaServer.staticServerConfig().port().intValue();
    }

    public String getKafkaServerConnect() {
        return this.kafkaServerConnect;
    }

    public Properties getKafkaServerProperties() {
        return this.kafkaServerProperties;
    }

    public String getKafkaLogDir() {
        return this.kafkaServerProperties.getProperty(LOG_DIR);
    }
}
