package cn.thinkingdata.kafka.consumer;

import cn.thinkingdata.kafka.cache.KafkaCache;
import cn.thinkingdata.kafka.constant.KafkaMysqlOffsetParameter;
import cn.thinkingdata.kafka.consumer.dao.KafkaConsumerOffset;
import cn.thinkingdata.kafka.consumer.offset.MysqlOffsetManager;
import cn.thinkingdata.kafka.consumer.offset.OffsetManager;
import cn.thinkingdata.kafka.consumer.persist.MysqlOffsetPersist;
import cn.thinkingdata.kafka.util.CommonUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/thinkingdata/kafka/consumer/KafkaSubscribeConsumeThread.class */
public class KafkaSubscribeConsumeThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSubscribeConsumeThread.class);
    private final NewIDataLineProcessor dataProcessor;
    public KafkaConsumer<String, String> consumer;
    private final CyclicBarrier offsetFlushBarrier;
    private volatile Thread consumerThread;
    private final OffsetManager offsetManager = MysqlOffsetManager.getInstance();
    public volatile Boolean kafkaPollFlag = false;
    public volatile Boolean kafkaConsumerFlag = false;
    public volatile Collection<TopicPartition> assignedPartitions = null;
    private final BlockingQueue<ConsumerRecord<String, String>> unsent = new LinkedBlockingQueue();
    public Set<KafkaConsumerOffset> kafkaConsumerOffsetSet = new HashSet();
    private final BlockingQueue<ConsumerRecord<String, String>> processDataQueue = new LinkedBlockingQueue(3000);
    public ProcessDataWorker processDataWorker = new ProcessDataWorker();
    private volatile Boolean paused = false;

    /* loaded from: input_file:cn/thinkingdata/kafka/consumer/KafkaSubscribeConsumeThread$ProcessDataWorker.class */
    public final class ProcessDataWorker implements Runnable {
        private volatile Thread executingThread;
        private volatile ConsumerRecord<String, String> consumerRecord;
        private static final long MAX_WAIT_MS = 1000;
        private final CountDownLatch exitLatch = new CountDownLatch(1);
        private volatile Boolean workerStopFlag = false;
        public volatile Boolean workingFlag = false;

        public ProcessDataWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            this.workingFlag = true;
            try {
                try {
                    this.executingThread = Thread.currentThread();
                    while (true) {
                        processOperationData();
                        if (KafkaSubscribeConsumeThread.this.processDataQueue.size() == 0 && this.workerStopFlag.booleanValue() && KafkaSubscribeConsumeThread.this.unsent.size() == 0) {
                            break;
                        }
                    }
                    KafkaSubscribeConsumeThread.logger.info("processDataWorker " + Thread.currentThread().getName() + " is safely closed...");
                    this.exitLatch.countDown();
                    this.workingFlag = false;
                } catch (Exception e) {
                    KafkaSubscribeConsumeThread.logger.error("processDataWorker thread is failed, the error is " + e.toString());
                    KafkaSubscribeConsumeThread.logger.info("processDataWorker " + Thread.currentThread().getName() + " is safely closed...");
                    this.exitLatch.countDown();
                    this.workingFlag = false;
                }
            } catch (Throwable th) {
                KafkaSubscribeConsumeThread.logger.info("processDataWorker " + Thread.currentThread().getName() + " is safely closed...");
                this.exitLatch.countDown();
                this.workingFlag = false;
                throw th;
            }
        }

        private void processOperationData() throws InterruptedException {
            try {
                this.consumerRecord = (ConsumerRecord) KafkaSubscribeConsumeThread.this.processDataQueue.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
                if (this.consumerRecord != null) {
                    KafkaSubscribeConsumeThread.this.dataProcessor.processData(this.consumerRecord);
                }
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                KafkaSubscribeConsumeThread.logger.error("processOperationData error, the error is " + CommonUtils.getStackTraceAsString(e2));
            }
        }

        public void stopWithException() {
            try {
                Thread.sleep(MAX_WAIT_MS);
            } catch (InterruptedException e) {
                KafkaSubscribeConsumeThread.logger.error("------- thread can not sleep ---------------------" + e.toString());
            }
            this.workerStopFlag = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            while (KafkaSubscribeConsumeThread.this.kafkaPollFlag.booleanValue()) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    KafkaSubscribeConsumeThread.logger.error("------- thread can not sleep ---------------------" + e.toString());
                }
            }
            this.workerStopFlag = true;
        }
    }

    public KafkaSubscribeConsumeThread(KafkaConsumer<String, String> kafkaConsumer, NewIDataLineProcessor newIDataLineProcessor, CyclicBarrier cyclicBarrier) {
        this.consumer = kafkaConsumer;
        this.dataProcessor = newIDataLineProcessor;
        this.offsetFlushBarrier = cyclicBarrier;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.consumerThread = Thread.currentThread();
        this.kafkaConsumerFlag = true;
        new Thread(this.processDataWorker, this.consumerThread.getName() + "-working thread").start();
        HashSet hashSet = new HashSet();
        Long l = 0L;
        DateTime plusSeconds = new DateTime().plusSeconds(Integer.parseInt(KafkaMysqlOffsetParameter.sessionTimeout));
        while (true) {
            try {
                try {
                    try {
                        if (KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get()) {
                            break;
                        }
                        if (KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.get()) {
                            this.kafkaPollFlag = true;
                            l = 0L;
                            if (!plusSeconds.isAfterNow()) {
                                logger.info("kafka session time out, the consumer is " + this.consumer.toString());
                                MysqlOffsetManager.getInstance().getExternalStorePersist().executeWhenExecuteDataSessionTimeout(this);
                                break;
                            }
                            ConsumerRecords<String, String> consumerRecords = null;
                            try {
                                consumerRecords = this.consumer.poll(KafkaMysqlOffsetParameter.pollInterval.intValue());
                            } catch (OffsetOutOfRangeException e) {
                                logger.error("consumer poll out of range, the error is " + CommonUtils.getStackTraceAsString(e));
                                for (TopicPartition topicPartition : this.consumer.assignment()) {
                                    logger.error("the topicPartition is " + topicPartition.toString() + ",the offset is " + this.consumer.position(topicPartition));
                                }
                                synchronized (OffsetManager.class) {
                                    MysqlOffsetManager.getInstance().getExternalStorePersist().executeWhenOffsetReset(this);
                                }
                            }
                            plusSeconds = new DateTime().plusSeconds(Integer.parseInt(KafkaMysqlOffsetParameter.sessionTimeout) / 1000);
                            logger.debug("sessionTimeoutDataTime is " + plusSeconds.toString());
                            if (consumerRecords != null) {
                                if (consumerRecords.count() > 0) {
                                    logger.debug("poll records size: " + consumerRecords.count() + ", partition is " + consumerRecords.partitions() + ", thread is " + Thread.currentThread().getName());
                                }
                                pause();
                                sendToQueue(consumerRecords);
                                if (isResume().booleanValue()) {
                                    resume();
                                } else {
                                    Thread.sleep(30L);
                                }
                                if (CollectionUtils.isNotEmpty(this.assignedPartitions)) {
                                    Iterator<TopicPartition> it = this.assignedPartitions.iterator();
                                    while (it.hasNext()) {
                                        List records = consumerRecords.records(it.next());
                                        if (CollectionUtils.isNotEmpty(records)) {
                                            hashSet.add((ConsumerRecord) records.get(records.size() - 1));
                                        }
                                    }
                                }
                                saveLastConsumerRecordSet(this, hashSet, null, false);
                                hashSet.clear();
                            }
                        } else {
                            logger.info("mysql and backup store connect error, the mysqlAndBackupStoreConnState is " + KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.get());
                            this.kafkaPollFlag = false;
                            try {
                                Thread.sleep(50L);
                            } catch (InterruptedException e2) {
                                logger.error("------- thread can not sleep ---------------------" + e2.toString());
                            }
                        }
                    } finally {
                        try {
                            closeKafkaSubscribeConsumeThread();
                        } catch (Exception e3) {
                            logger.error("closeKafkaSubscribeConsumeThread error, the thread is " + Thread.currentThread().getName() + ", the Exception is " + CommonUtils.getStackTraceAsString(e3));
                        }
                    }
                } catch (WakeupException | InterruptedException e4) {
                    logger.info("stop consumer with wakeup or interupted, the kafkaSubscribeConsumerClosed is " + KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get() + ", the thread is " + Thread.currentThread().getName() + ", the consumeThreadList is " + StringUtils.join((Iterable) KafkaCache.consumeThreadList.stream().map(kafkaSubscribeConsumeThread -> {
                        return kafkaSubscribeConsumeThread.consumerThread.getName();
                    }).collect(Collectors.toList()), ",") + " the kafka cluster name is " + KafkaMysqlOffsetParameter.kafkaClusterName + ", the Exception is " + CommonUtils.getStackTraceAsString(e4));
                    saveLastConsumerRecordSet(this, hashSet, l, true);
                    this.kafkaPollFlag = false;
                    logger.info("stop consumer with wakeup finished");
                    try {
                        closeKafkaSubscribeConsumeThread();
                        return;
                    } catch (Exception e5) {
                        logger.error("closeKafkaSubscribeConsumeThread error, the thread is " + Thread.currentThread().getName() + ", the Exception is " + CommonUtils.getStackTraceAsString(e5));
                        return;
                    }
                }
            } catch (Exception e6) {
                saveLastConsumerRecordSet(this, hashSet, l, false);
                logger.error("stop consumer with exception, the kafkaSubscribeConsumerClosed is " + KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get() + ", the thread is " + Thread.currentThread().getName() + ", the consumeThreadList is " + StringUtils.join((Iterable) KafkaCache.consumeThreadList.stream().map(kafkaSubscribeConsumeThread2 -> {
                    return kafkaSubscribeConsumeThread2.consumerThread.getName();
                }).collect(Collectors.toList()), ",") + " the kafka cluster name is " + KafkaMysqlOffsetParameter.kafkaClusterName + ", the kafkaConsumerOffsetMaps In cache is" + Arrays.toString(KafkaCache.kafkaConsumerOffsetMaps.entrySet().toArray()) + ", the Exception is " + CommonUtils.getStackTraceAsString(e6));
                this.kafkaPollFlag = false;
                logger.info("stop consumer finished");
                synchronized (OffsetManager.class) {
                    MysqlOffsetManager.getInstance().getExternalStorePersist().executeWhenException();
                    try {
                        closeKafkaSubscribeConsumeThread();
                        return;
                    } catch (Exception e7) {
                        logger.error("closeKafkaSubscribeConsumeThread error, the thread is " + Thread.currentThread().getName() + ", the Exception is " + CommonUtils.getStackTraceAsString(e7));
                        return;
                    }
                }
            }
        }
        this.kafkaPollFlag = false;
        logger.info("kafka consumer close, the kafkaSubscribeConsumerClosed is " + KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get());
    }

    private void saveLastConsumerRecordSet(KafkaSubscribeConsumeThread kafkaSubscribeConsumeThread, Set<ConsumerRecord<String, String>> set, Long l, Boolean bool) {
        for (ConsumerRecord<String, String> consumerRecord : set) {
            Date date = new Date();
            KafkaConsumerOffset kafkaConsumerOffset = KafkaCache.kafkaConsumerOffsetMaps.get(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()));
            if (kafkaConsumerOffset == null) {
                logger.error("kafkaConsumerOffset is null in cache, the lastConsumerRecord is " + consumerRecord + ", the kafkaConsumerOffsetMaps is " + Arrays.toString(KafkaCache.kafkaConsumerOffsetMaps.entrySet().toArray()));
                kafkaConsumerOffset = this.offsetManager.readOffsetFromCache(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()));
                kafkaConsumerOffset.setOwner(KafkaMysqlOffsetParameter.kafkaClusterName + "-" + consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + KafkaMysqlOffsetParameter.consumerGroup + "-" + date.getTime() + "-" + KafkaMysqlOffsetParameter.hostname + "-" + this.consumer.toString().substring(this.consumer.toString().lastIndexOf("@") + 1));
            }
            kafkaConsumerOffset.setTopic(consumerRecord.topic());
            kafkaConsumerOffset.setPartition(Integer.valueOf(consumerRecord.partition()));
            kafkaConsumerOffset.setConsumer_group(KafkaMysqlOffsetParameter.consumerGroup);
            kafkaConsumerOffset.setOffset(Long.valueOf(consumerRecord.offset() + 1));
            kafkaConsumerOffset.setKafka_cluster_name(KafkaMysqlOffsetParameter.kafkaClusterName);
            kafkaConsumerOffset.setCount(l);
            if (bool.booleanValue()) {
                kafkaConsumerOffset.setOwner("");
                logger.info("clean owner, the thread is " + Thread.currentThread().getName() + ", the kafkaConsumerOffset is " + kafkaConsumerOffset.toString());
            }
            this.kafkaConsumerOffsetSet.add(kafkaConsumerOffset);
            this.offsetManager.saveOffsetInCache(kafkaSubscribeConsumeThread, kafkaConsumerOffset);
        }
    }

    public void closeKafkaSubscribeConsumeThread() throws InterruptedException {
        logger.info("start to stop processDataWorker " + this.processDataWorker.executingThread.getName());
        this.processDataWorker.stop();
        logger.info("wait for the mysql persist finish");
        while (true) {
            if (!MysqlOffsetPersist.runFlag.booleanValue() && KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get()) {
                break;
            }
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                logger.error("------- thread can not sleep ---------------------" + e.toString());
            }
        }
        logger.info("flush before kafka consumer close");
        logger.debug("kafkaConsumerOffsetMaps is " + Arrays.toString(KafkaCache.kafkaConsumerOffsetMaps.entrySet().toArray()));
        logger.debug("kafkaConsumerOffsetSet is " + this.kafkaConsumerOffsetSet + " ,the thread is " + Thread.currentThread().getName());
        try {
            for (KafkaConsumerOffset kafkaConsumerOffset : this.kafkaConsumerOffsetSet) {
                TopicPartition topicPartition = new TopicPartition(kafkaConsumerOffset.getTopic(), kafkaConsumerOffset.getPartition().intValue());
                KafkaConsumerOffset kafkaConsumerOffset2 = KafkaCache.kafkaConsumerOffsetMaps.get(topicPartition);
                if (kafkaConsumerOffset2 != null) {
                    Long l = null;
                    try {
                        l = Long.valueOf(this.consumer.position(topicPartition));
                    } catch (Exception e2) {
                        logger.info("the consumer get position error, the error is " + e2.toString() + ", the topicPartition is " + topicPartition);
                    }
                    logger.debug("consumer position is " + l);
                    if (l != null && l.longValue() != 0 && kafkaConsumerOffset2 != null && l.longValue() > kafkaConsumerOffset2.getOffset().longValue()) {
                        logger.info("consumer position " + l + "is bigger than the offset in kafkaConsumerOffsetInCache " + kafkaConsumerOffset2);
                        kafkaConsumerOffset2.setOffset(l);
                    }
                    MysqlOffsetPersist.getInstance().flush(kafkaConsumerOffset2);
                } else {
                    logger.error("kafkaConsumerOffsetInCache is null, kafkaConsumerOffset is " + kafkaConsumerOffset + ", kafkaConsumerOffsetSet is " + this.kafkaConsumerOffsetSet + ", kafkaConsumerOffsetMaps is " + Arrays.toString(KafkaCache.kafkaConsumerOffsetMaps.entrySet().toArray()));
                }
            }
            this.offsetFlushBarrier.await();
            logger.info("start to flush the rest KafkaCache.kafkaConsumerOffsetMaps " + Arrays.toString(KafkaCache.kafkaConsumerOffsetMaps.entrySet().toArray()) + ", the thread is " + Thread.currentThread().getName());
            flushKafkaConsumerOffsetsInKafkaCache();
            this.consumer.close();
        } catch (Exception e3) {
            logger.error("close consumer error, the exception is " + CommonUtils.getStackTraceAsString(e3));
            this.consumer.close();
        }
        this.kafkaConsumerFlag = false;
        sendUnsentToProcessDataQueue(true);
        logger.info("kafka consumer finally close");
    }

    private synchronized void flushKafkaConsumerOffsetsInKafkaCache() {
        for (KafkaConsumerOffset kafkaConsumerOffset : KafkaCache.kafkaConsumerOffsetMaps.values()) {
            logger.info("kafkaConsumerOffset in cache is not be consumed, kafkaConsumerOffset is " + kafkaConsumerOffset);
            TopicPartition topicPartition = new TopicPartition(kafkaConsumerOffset.getTopic(), kafkaConsumerOffset.getPartition().intValue());
            Long l = null;
            try {
                l = Long.valueOf(this.consumer.position(topicPartition));
            } catch (IllegalArgumentException | IllegalStateException e) {
                logger.info("flushKafkaConsumerOffsetsInKafkaCache, the consumer get position error, the error is " + e.toString() + ", the topicPartition is " + topicPartition);
            }
            if (kafkaConsumerOffset != null) {
                if (l != null && l.longValue() != 0 && l.longValue() > kafkaConsumerOffset.getOffset().longValue()) {
                    logger.debug("consumer position is " + l);
                    logger.info("consumer position " + l + " is bigger than the offset in kafkaConsumerOffset " + kafkaConsumerOffset);
                    kafkaConsumerOffset.setOffset(l);
                }
                MysqlOffsetPersist.getInstance().flush(kafkaConsumerOffset);
            }
        }
    }

    public void shutdown() {
        KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.set(true);
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public void pause() {
        if (this.assignedPartitions != null) {
            this.consumer.pause(this.assignedPartitions);
            this.paused = true;
        }
    }

    public Boolean isResume() {
        return Boolean.valueOf(CollectionUtils.isEmpty(this.unsent));
    }

    public void resume() {
        if (this.assignedPartitions != null) {
            this.consumer.resume(this.assignedPartitions);
            this.paused = false;
        }
    }

    private void sendToQueue(ConsumerRecords<String, String> consumerRecords) throws InterruptedException {
        Boolean bool = true;
        if (CollectionUtils.isEmpty(this.unsent)) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it.next();
                if (bool.booleanValue()) {
                    bool = Boolean.valueOf(this.processDataQueue.offer(consumerRecord, 200L, TimeUnit.MILLISECONDS));
                    if (!bool.booleanValue()) {
                        logger.info("the processDataQueue is full...");
                        this.unsent.put(consumerRecord);
                    }
                } else {
                    this.unsent.put(consumerRecord);
                }
            }
            return;
        }
        if (consumerRecords.count() > 0) {
            logger.info("the unsent is not empty but the consummer still polling records, it can be only happed after rebalanced");
        }
        Iterator it2 = consumerRecords.iterator();
        while (it2.hasNext()) {
            this.unsent.put((ConsumerRecord) it2.next());
        }
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            logger.error("------- thread can not sleep ---------------------" + e.toString());
        }
        sendUnsentToProcessDataQueue(false);
    }

    private void sendUnsentToProcessDataQueue(Boolean bool) throws InterruptedException {
        while (CollectionUtils.isNotEmpty(this.unsent)) {
            ConsumerRecord<String, String> peek = this.unsent.peek();
            if (peek == null) {
                logger.error("the unsent is not empty, but the recordInUnsent is null!!");
            } else if (Boolean.valueOf(this.processDataQueue.offer(peek, 200L, TimeUnit.MILLISECONDS)).booleanValue()) {
                this.unsent.poll();
            } else {
                logger.info("the processDataQueue is full... and the unsent is not empty");
                if (!bool.booleanValue()) {
                    return;
                }
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    logger.error("------- thread can not sleep ---------------------" + e.toString());
                }
            }
        }
    }
}
