package me.phoboslabs.illuminati.processor.infra.kafka.impl;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import me.phoboslabs.illuminati.common.util.NetworkUtil;
import me.phoboslabs.illuminati.common.util.StringObjectUtils;
import me.phoboslabs.illuminati.processor.exception.PublishMessageException;
import me.phoboslabs.illuminati.processor.exception.ValidationException;
import me.phoboslabs.illuminati.processor.infra.IlluminatiInfraTemplate;
import me.phoboslabs.illuminati.processor.infra.common.BasicTemplate;
import me.phoboslabs.illuminati.processor.infra.kafka.constants.KafkaConstant;
import me.phoboslabs.illuminati.processor.infra.kafka.enums.CommunicationType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/phoboslabs/illuminati/processor/infra/kafka/impl/KafkaInfraTemplateImpl.class */
public class KafkaInfraTemplateImpl extends BasicTemplate implements IlluminatiInfraTemplate<String> {
    private static final Logger KAFKA_TEMPLATE_IMPL_LOGGER = LoggerFactory.getLogger(KafkaInfraTemplateImpl.class);
    private String topic;
    private final Properties kafkaProperties;
    private Producer<String, byte[]> kafkaProducer;
    private static final String KAFKA_BROKER_CLASS_NAME = "org.apache.kafka.clients.producer.KafkaProducer";

    public KafkaInfraTemplateImpl(String str) {
        super(str);
        this.kafkaProperties = new Properties();
        validateBasicTemplateClass();
        checkRequiredValuesForInit();
        this.topic = this.illuminatiProperties.getTopic();
        initProperties();
        initPublisher();
    }

    @Override // me.phoboslabs.illuminati.processor.infra.common.BasicTemplate
    protected void checkRequiredValuesForInit() {
        if (!StringObjectUtils.isValid(this.illuminatiProperties.getTopic())) {
            KAFKA_TEMPLATE_IMPL_LOGGER.error("error : topic variable is empty.");
            throw new ValidationException("error : topic variable is empty.");
        }
        if (StringObjectUtils.isValid(this.illuminatiProperties.getClusterList())) {
            return;
        }
        KAFKA_TEMPLATE_IMPL_LOGGER.error("error : brokerList variable is empty.");
        throw new ValidationException("error : brokerList variable is empty.");
    }

    private void setBasicProperties() {
        setKafkaProperties("bootstrap.servers", this.illuminatiProperties.getClusterList());
        setKafkaProperties("key.serializer", KafkaConstant.VALUE_SERIALIZER_TYPE_BYTE);
        setKafkaProperties("value.serializer", KafkaConstant.VALUE_SERIALIZER_TYPE_BYTE);
        setKafkaProperties("partitioner.class", KafkaConstant.VALUE_PARTITIONER);
        setKafkaProperties("retries", 1);
        setKafkaProperties("metadata.max.age.ms", Integer.valueOf(KafkaConstant.VALUE_METADATA_MAX_AGE_CONFIG));
        setKafkaProperties("max.request.size", Integer.valueOf(KafkaConstant.VALUE_MAX_REQUEST_SIZE_CONFIG));
        setKafkaProperties("reconnect.backoff.ms", 100);
        setKafkaProperties("max.in.flight.requests.per.connection", Integer.valueOf(KafkaConstant.VALUE_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
        setKafkaProperties("buffer.memory", 10485760);
        setKafkaProperties("send.buffer.bytes", 10485760);
        setKafkaProperties("receive.buffer.bytes", 10485760);
        setKafkaProperties("transaction.timeout.ms", 10000);
        setKafkaProperties("request.timeout.ms", 10000);
    }

    private synchronized void initPublisher() {
        if (this.kafkaProducer == null) {
            this.kafkaProducer = new KafkaProducer(this.kafkaProperties);
        }
    }

    private void setKafkaProperties(String str, Object obj) {
        this.kafkaProperties.put(str, obj);
    }

    @Override // me.phoboslabs.illuminati.processor.infra.common.BasicTemplate
    protected void initProperties() {
        setBasicProperties();
        setPerformance();
        setIsAsync();
        setIsCompression();
    }

    @Override // me.phoboslabs.illuminati.processor.infra.IlluminatiInfraTemplate
    public void sendToIlluminati(String str) throws Exception, PublishMessageException {
        try {
            if (this.kafkaProducer == null) {
                KAFKA_TEMPLATE_IMPL_LOGGER.error("kafka publisher not initialized.");
                throw new PublishMessageException("kafka publisher not initialized.");
            }
            try {
                this.sending = true;
                Future send = this.kafkaProducer.send(new ProducerRecord(this.topic, str.getBytes()));
                KAFKA_TEMPLATE_IMPL_LOGGER.debug("Message produced, offset: " + ((RecordMetadata) send.get()).offset());
                KAFKA_TEMPLATE_IMPL_LOGGER.debug("Message produced, partition : " + ((RecordMetadata) send.get()).partition());
                KAFKA_TEMPLATE_IMPL_LOGGER.debug("Message produced, topic: " + ((RecordMetadata) send.get()).topic());
                KAFKA_TEMPLATE_IMPL_LOGGER.info("successfully transferred dto to Illuminati broker(kafka).");
                this.sending = false;
            } catch (Exception e) {
                throw new PublishMessageException("failed to publish message : (" + e.toString() + ")");
            }
        } catch (Throwable th) {
            this.sending = false;
            throw th;
        }
    }

    @Override // me.phoboslabs.illuminati.processor.infra.IlluminatiInfraTemplate
    public boolean canIConnect() {
        if (this.kafkaProducer == null) {
            return false;
        }
        int i = 0;
        try {
            List<String> clusterArrayList = this.illuminatiProperties.getClusterArrayList();
            if (CollectionUtils.isNotEmpty(clusterArrayList)) {
                for (String str : clusterArrayList) {
                    String[] split = str.split(":");
                    if (split.length != 2) {
                        KAFKA_TEMPLATE_IMPL_LOGGER.error("check kafka cluster({}). maybe typo in cluster address string.", str);
                    } else if (NetworkUtil.canIConnect(split[0], Integer.parseInt(split[1]))) {
                        i++;
                    } else {
                        KAFKA_TEMPLATE_IMPL_LOGGER.error("check kafka cluster. connection error ({})", str);
                    }
                }
            } else {
                KAFKA_TEMPLATE_IMPL_LOGGER.error("cluster address string is required value.");
            }
        } catch (Exception e) {
            KAFKA_TEMPLATE_IMPL_LOGGER.error("check kafka cluster.", e);
        }
        if (i == 0) {
            this.kafkaProducer.close();
        }
        return i > 0;
    }

    @Override // me.phoboslabs.illuminati.processor.infra.IlluminatiInfraTemplate
    public void connectionClose() {
        waitBeforeClosing();
        this.kafkaProducer.close();
    }

    @Override // me.phoboslabs.illuminati.processor.infra.IlluminatiInfraTemplate
    public void validateBasicTemplateClass() throws ValidationException {
        try {
            Class.forName(KAFKA_BROKER_CLASS_NAME);
        } catch (ClassNotFoundException e) {
            throw new ValidationException(e.toString());
        }
    }

    private void setPerformance() {
        super.performanceType();
        setKafkaProperties("acks", this.performanceType.getType());
    }

    private void setIsAsync() {
        super.isAsync();
        if (CommunicationType.ASYNC == this.communicationType) {
            setKafkaProperties("batch.size", Integer.valueOf(KafkaConstant.VALUE_ASYNC_MAX_MESSAGE_AT_ONCE));
            setKafkaProperties("linger.ms", 10000);
        }
    }

    private void setIsCompression() {
        super.isCompression();
        setKafkaProperties("compression.type", this.compressionCodecType.getType());
    }
}
