package com.github.combinedmq.kafka;

import com.github.combinedmq.configuration.Configuration;
import com.github.combinedmq.exception.MqException;
import com.github.combinedmq.message.Message;
import com.github.combinedmq.message.Queue;
import com.github.combinedmq.pool.kafka.KafkaPoolConfig;
import com.github.combinedmq.pool.kafka.KafkaProducerPool;
import com.github.combinedmq.producer.AbstractProducer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/combinedmq/kafka/KafkaProducer.class */
public class KafkaProducer extends AbstractProducer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private KafkaConfiguration configuration;
    private KafkaProducerPool pool;

    public KafkaProducer(Configuration configuration) {
        this.configuration = (KafkaConfiguration) configuration;
        initPool(configuration);
    }

    private void initPool(Configuration configuration) {
        KafkaConfiguration kafkaConfiguration = (KafkaConfiguration) configuration;
        if (null == kafkaConfiguration.getProducerPool()) {
            throw new NullPointerException("The pool configuration does not exist");
        }
        KafkaPoolConfig kafkaPoolConfig = new KafkaPoolConfig();
        Integer maxTotal = kafkaConfiguration.getProducerPool().getMaxTotal();
        if (maxTotal != null) {
            kafkaPoolConfig.setMaxTotal(maxTotal.intValue());
        }
        Integer maxIdle = kafkaConfiguration.getProducerPool().getMaxIdle();
        if (maxIdle != null) {
            kafkaPoolConfig.setMaxIdle(maxIdle.intValue());
        }
        Integer minIdle = kafkaConfiguration.getProducerPool().getMinIdle();
        if (minIdle != null) {
            kafkaPoolConfig.setMinIdle(minIdle.intValue());
        }
        if (kafkaConfiguration.getProducerPool().getMaxWaitMillis() != null) {
            kafkaPoolConfig.setMaxWaitMillis(r0.intValue());
        }
        if (kafkaConfiguration.getProducerPool().getMinEvictableIdleTimeMillis() != null) {
            kafkaPoolConfig.setMinEvictableIdleTimeMillis(r0.intValue());
        }
        if (kafkaConfiguration.getProducerPool().getTimeBetweenEvictionRunsMillis() != null) {
            kafkaPoolConfig.setTimeBetweenEvictionRunsMillis(r0.intValue());
        }
        Boolean testOnBorrow = kafkaConfiguration.getProducerPool().getTestOnBorrow();
        if (testOnBorrow != null) {
            kafkaPoolConfig.setTestOnBorrow(testOnBorrow.booleanValue());
        }
        Boolean testOnReturn = kafkaConfiguration.getProducerPool().getTestOnReturn();
        if (testOnReturn != null) {
            kafkaPoolConfig.setTestOnReturn(testOnReturn.booleanValue());
        }
        Boolean testWhileIdle = kafkaConfiguration.getProducerPool().getTestWhileIdle();
        if (testWhileIdle != null) {
            kafkaPoolConfig.setTestWhileIdle(testWhileIdle.booleanValue());
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaConfiguration.getBootstrapServers());
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        Integer batchSize = kafkaConfiguration.getProducerPool().getBatchSize();
        if (batchSize != null) {
            properties.put("batch.size", batchSize);
        }
        Integer lingerMillis = kafkaConfiguration.getProducerPool().getLingerMillis();
        if (lingerMillis != null) {
            properties.put("linger.ms", lingerMillis);
        }
        this.pool = new KafkaProducerPool(kafkaPoolConfig, properties);
    }

    @Override // com.github.combinedmq.producer.AbstractProducer
    protected void doSend(Queue queue, Message message) throws MqException {
        Producer producer = null;
        try {
            try {
                producer = this.pool.getResource();
                producer.send(new ProducerRecord(((KafkaQueue) queue).getQueueName(), message.getBytes()));
                if (null != producer) {
                    try {
                        producer.close();
                    } catch (Exception e) {
                        throw new MqException(e);
                    }
                }
            } catch (Exception e2) {
                throw new MqException(e2);
            }
        } catch (Throwable th) {
            if (null != producer) {
                try {
                    producer.close();
                } catch (Exception e3) {
                    throw new MqException(e3);
                }
            }
            throw th;
        }
    }
}
