package com.github.combinedmq.kafka;

import com.github.combinedmq.configuration.Configuration;
import com.github.combinedmq.consumer.AbstractConsumer;
import com.github.combinedmq.exception.MqException;
import com.github.combinedmq.message.MessageExecutor;
import com.github.combinedmq.message.MessageListener;
import com.github.combinedmq.message.Queue;
import com.github.combinedmq.message.QueueType;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/combinedmq/kafka/KafkaConsumer.class */
public class KafkaConsumer extends AbstractConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private Properties props;
    private Configuration configuration;

    public KafkaConsumer(Configuration configuration) {
        this.configuration = configuration;
        init(configuration);
    }

    @Override // com.github.combinedmq.consumer.AbstractConsumer
    public Configuration getConfiguration() {
        return this.configuration;
    }

    private void init(Configuration configuration) {
        KafkaConfiguration kafkaConfiguration = (KafkaConfiguration) configuration;
        if (null == kafkaConfiguration.getConsumerListener()) {
            throw new NullPointerException("The consumer configuration does not exist");
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaConfiguration.getBootstrapServers());
        properties.put("enable.auto.commit", "false");
        properties.put("max.poll.records", kafkaConfiguration.getConsumerListener().getMaxPollRecord());
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        this.props = properties;
    }

    @Override // com.github.combinedmq.consumer.Consumer
    public void listen() throws MqException {
        try {
            for (Map.Entry<Queue, List<MessageListener>> entry : getQueueListener().entrySet()) {
                KafkaQueue kafkaQueue = (KafkaQueue) entry.getKey();
                List<MessageListener> value = entry.getValue();
                MessageExecutor messageExecutor = new MessageExecutor("kafka", value.size());
                for (int i = 0; i < value.size(); i++) {
                    MessageListener messageListener = value.get(i);
                    if (null == messageExecutor.getMessageListener()) {
                        messageExecutor.setMessageListener(messageListener);
                    }
                    QueueType type = kafkaQueue.getType();
                    if (QueueType.POINT_TO_POINT == type) {
                        this.props.put("auto.offset.reset", "earliest");
                        this.props.put("group.id", kafkaQueue.getQueueName() + "_ptp");
                    } else if (QueueType.PUBLISH_SUBSCRIBE == type) {
                        this.props.put("auto.offset.reset", "latest");
                        this.props.put("group.id", kafkaQueue.getQueueName() + "_pts_" + InetAddress.getLocalHost().getHostAddress() + "_" + i);
                    }
                    org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer(this.props);
                    kafkaConsumer.subscribe(Collections.singletonList(kafkaQueue.getQueueName()));
                    new Thread(() -> {
                        while (true) {
                            Iterator it = kafkaConsumer.poll(30000L).iterator();
                            while (it.hasNext()) {
                                try {
                                    try {
                                        messageExecutor.onMessage(new KafkaMessage((byte[]) ((ConsumerRecord) it.next()).value())).get();
                                        kafkaConsumer.commitSync();
                                    } catch (Throwable th) {
                                        th.printStackTrace();
                                        kafkaConsumer.commitSync();
                                    }
                                } catch (Throwable th2) {
                                    kafkaConsumer.commitSync();
                                    throw th2;
                                }
                            }
                        }
                    }).start();
                }
            }
        } catch (Exception e) {
            throw new MqException(e);
        }
    }
}
