package cn.thinkingdata.kafka.consumer;

import cn.thinkingdata.kafka.cache.KafkaCache;
import cn.thinkingdata.kafka.close.DaemonCloseThread;
import cn.thinkingdata.kafka.close.TermMethod;
import cn.thinkingdata.kafka.constant.KafkaMysqlOffsetParameter;
import cn.thinkingdata.kafka.consumer.offset.MysqlOffsetManager;
import cn.thinkingdata.kafka.consumer.persist.MysqlOffsetPersist;
import cn.thinkingdata.kafka.consumer.persist.StorePersist;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/thinkingdata/kafka/consumer/KafkaSubscribeConsumer.class */
public class KafkaSubscribeConsumer {
    protected NewIDataLineProcessor dataProcessor;
    protected volatile ExecutorService executorService;
    private final TermMethod closeMethod;
    private volatile DaemonCloseThread closeSignal;
    private static final Logger logger = LoggerFactory.getLogger(KafkaSubscribeConsumer.class);
    private static volatile Integer startCount = 0;

    public KafkaSubscribeConsumer(Map<String, String> map, NewIDataLineProcessor newIDataLineProcessor, TermMethod termMethod) {
        KafkaMysqlOffsetParameter.createKafkaConfProp(map);
        this.dataProcessor = newIDataLineProcessor;
        this.closeMethod = termMethod;
    }

    public KafkaSubscribeConsumer(Map<String, String> map, NewIDataLineProcessor newIDataLineProcessor, TermMethod termMethod, StorePersist storePersist) {
        this(map, newIDataLineProcessor, termMethod);
        MysqlOffsetManager.getInstance().setExternalStorePersist(storePersist);
    }

    public void run() {
        if (!MysqlOffsetPersist.getInstance().mysqlStateCheckWithRetry().booleanValue()) {
            logger.info("mysql is not connected!");
            System.exit(-1);
        }
        if (!MysqlOffsetPersist.getInstance().backupStoreStateCheckWithRetry().booleanValue()) {
            logger.info("backup store is not connected!");
            System.exit(-1);
        }
        KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.set(false);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(Arrays.asList(KafkaMysqlOffsetParameter.topic.split(",")));
        this.executorService = Executors.newFixedThreadPool(KafkaMysqlOffsetParameter.processThreadNum.intValue(), new ThreadFactoryBuilder().setNameFormat("kafka-consume-thread-%d").build());
        CyclicBarrier cyclicBarrier = new CyclicBarrier(KafkaMysqlOffsetParameter.processThreadNum.intValue());
        for (int i = 0; i < KafkaMysqlOffsetParameter.processThreadNum.intValue(); i++) {
            KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread = new KafkaSubscribeConsumeThread(KafkaSubscribeConsumerManager.getInstance().createKafkaConsumer(arrayList, KafkaMysqlOffsetParameter.kafkaConf), this.dataProcessor, cyclicBarrier);
            KafkaCache.consumeThreadList.add(kafkaSubscribeConsumeThread);
            this.executorService.submit(kafkaSubscribeConsumeThread);
        }
        if (startCount.equals(0)) {
            MysqlOffsetPersist.getInstance().start();
            startCount = Integer.valueOf(startCount.intValue() + 1);
        }
        this.closeSignal = new DaemonCloseThread(this, this.closeMethod);
        this.closeSignal.setDaemon(true);
        this.closeSignal.start();
    }

    public void stop() {
        stop(120000L);
    }

    public void stop(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        logger.info("consumers start shutdown");
        for (KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread : KafkaCache.consumeThreadList) {
            if (kafkaSubscribeConsumeThread != null) {
                kafkaSubscribeConsumeThread.shutdown();
            }
        }
        Boolean bool = false;
        while (!bool.booleanValue()) {
            Boolean bool2 = false;
            for (KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread2 : KafkaCache.consumeThreadList) {
                if (kafkaSubscribeConsumeThread2 != null && kafkaSubscribeConsumeThread2.kafkaPollFlag.booleanValue()) {
                    bool2 = true;
                }
            }
            if (!bool2.booleanValue()) {
                break;
            } else if (System.currentTimeMillis() - currentTimeMillis > j) {
                stopWithTimeOUt();
                bool = true;
            }
        }
        logger.info("kafka polling closed");
        while (!bool.booleanValue()) {
            Boolean bool3 = false;
            for (KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread3 : KafkaCache.consumeThreadList) {
                if (kafkaSubscribeConsumeThread3 != null && kafkaSubscribeConsumeThread3.kafkaConsumerFlag.booleanValue()) {
                    bool3 = true;
                }
            }
            if (!bool3.booleanValue()) {
                break;
            } else if (System.currentTimeMillis() - currentTimeMillis > j) {
                stopWithTimeOUt();
                bool = true;
            }
        }
        logger.info("kafka consumer closed");
        while (!bool.booleanValue()) {
            Boolean bool4 = false;
            for (KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread4 : KafkaCache.consumeThreadList) {
                if (kafkaSubscribeConsumeThread4 != null && kafkaSubscribeConsumeThread4.processDataWorker.workingFlag.booleanValue()) {
                    bool4 = true;
                }
            }
            if (!bool4.booleanValue()) {
                break;
            } else if (System.currentTimeMillis() - currentTimeMillis > j) {
                stopWithTimeOUt();
                bool = true;
            }
        }
        logger.info("process data worker closed");
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        try {
            if (!this.executorService.awaitTermination(120000L, TimeUnit.MILLISECONDS)) {
                logger.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            logger.error("Interrupted during shutdown, exiting uncleanly");
        }
        logger.info("dataProcessor start to shutdown");
        this.dataProcessor.finishProcess();
        KafkaCache.kafkaConsumerOffsetMaps.clear();
        KafkaCache.consumeThreadList.clear();
        KafkaCache.rebalancerListenerList.clear();
        this.closeSignal.shutdown();
    }

    private void stopWithTimeOUt() {
        logger.info("kafka polling/kafka consumer/process data worker closed with timeout");
        for (KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread : KafkaCache.consumeThreadList) {
            if (kafkaSubscribeConsumeThread != null && kafkaSubscribeConsumeThread.processDataWorker != null) {
                kafkaSubscribeConsumeThread.processDataWorker.stopWithException();
            }
        }
        try {
            Thread.sleep(5000L);
            Thread.sleep(5000L);
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            logger.error("------- thread can not sleep ---------------------" + e.toString());
        }
    }

    public void destroy() {
        MysqlOffsetPersist.destoryFlag = true;
        stop();
        this.closeSignal.afterDestroyConsumer();
        logger.info("mysql start to shutdown");
        MysqlOffsetPersist.getInstance().shutdown();
    }

    public void destroy(long j) {
        MysqlOffsetPersist.destoryFlag = true;
        stop(j);
        this.closeSignal.afterDestroyConsumer();
        logger.info("mysql start to shutdown");
        MysqlOffsetPersist.getInstance().shutdown();
    }
}
