package cn.js.icode.common.queue.kafka;

import cn.js.icode.common.config.Config;
import cn.js.icode.common.data.StatusCode;
import cn.js.icode.common.log.Logger;
import cn.js.icode.common.queue.IQueueManager;
import cn.js.icode.common.queue.Publisher;
import cn.js.icode.common.queue.Subscriber;
import com.alibaba.fastjson.JSONObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

/* loaded from: input_file:cn/js/icode/common/queue/kafka/KafkaManager.class */
public class KafkaManager implements IQueueManager {
    private Logger log = Logger.getInstance(KafkaManager.class);
    private static Map<String, Object> props_p = new HashMap();
    private static Map<String, Object> props_s = new HashMap();
    private static AdminClient adminClient;

    private static String getGroupId(Subscriber subscriber) {
        return Config.getActiveProfile() + "_" + subscriber.getTopic() + "_group_" + subscriber.getIndex();
    }

    @Override // cn.js.icode.common.queue.IQueueManager
    public Publisher createPublisher(String str) {
        return new KafkaPublisher(new KafkaProducer(props_p), str);
    }

    @Override // cn.js.icode.common.queue.IQueueManager
    public boolean subscribe(final Subscriber subscriber) {
        String topic = subscriber.getTopic();
        String groupId = getGroupId(subscriber);
        props_s.put("group.id", groupId);
        new HashSet().add(groupId);
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add(Config.getActiveProfile() + "_" + topic);
            final KafkaConsumer kafkaConsumer = new KafkaConsumer(props_s);
            kafkaConsumer.subscribe(arrayList);
            final Duration ofMillis = Duration.ofMillis(1000L);
            new Thread(new Runnable() { // from class: cn.js.icode.common.queue.kafka.KafkaManager.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        Iterator it = kafkaConsumer.poll(ofMillis).iterator();
                        while (it.hasNext()) {
                            try {
                                subscriber.consume(JSONObject.parseObject((String) ((ConsumerRecord) it.next()).value()));
                            } catch (Exception e) {
                                e.printStackTrace();
                                KafkaManager.this.log.log(StatusCode.LOG_FATAL, (Object) e.getMessage());
                                KafkaManager.this.log.log(StatusCode.LOG_FATAL, (Object) ("队列消费者（" + subscriber.getTopic() + ":" + subscriber.getIndex() + "）因前述异常暂停"));
                            }
                        }
                        try {
                            Thread.sleep(10L);
                        } catch (InterruptedException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            }).start();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    @Override // cn.js.icode.common.queue.IQueueManager
    public boolean remove(Subscriber subscriber) {
        if (subscriber == null || adminClient == null) {
            return false;
        }
        String groupId = getGroupId(subscriber);
        ArrayList arrayList = new ArrayList();
        arrayList.add(groupId);
        adminClient.deleteConsumerGroups(arrayList);
        return true;
    }

    static {
        adminClient = null;
        props_p.put("bootstrap.servers", Config.getProperty("spring.kafka.bootstrap-servers"));
        props_p.put("acks", "all");
        props_p.put("retries", 3);
        props_p.put("batch.size", 16384);
        props_p.put("linger.ms", 1);
        props_p.put("buffer.memory", 33554432);
        props_p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props_s.put("bootstrap.servers", Config.getProperty("spring.kafka.bootstrap-servers"));
        props_s.put("enable.auto.commit", "true");
        props_s.put("auto.commit.interval.ms", "1000");
        props_s.put("auto.offset.reset", "latest");
        props_s.put("session.timeout.ms", "30000");
        props_s.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props_s.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        adminClient = AdminClient.create(props_s);
    }
}
