package cn.thinkingdata.kafka.consumer.persist;

import cn.thinkingdata.kafka.cache.KafkaCache;
import cn.thinkingdata.kafka.constant.KafkaMysqlOffsetParameter;
import cn.thinkingdata.kafka.consumer.dao.KafkaConsumerOffset;
import cn.thinkingdata.kafka.consumer.offset.MysqlOffsetManager;
import cn.thinkingdata.kafka.util.CommonUtils;
import cn.thinkingdata.kafka.util.RetryerUtil;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.google.common.base.Predicates;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/thinkingdata/kafka/consumer/persist/MysqlOffsetPersist.class */
public class MysqlOffsetPersist extends Thread implements OffsetPersist {
    private static MysqlOffsetPersist instance;
    private final StorePersist externalStorePersist = MysqlOffsetManager.getInstance().getExternalStorePersist();
    private final Retryer<Boolean> retryerWithResultFails = RetryerUtil.initRetryerByTimesWithIfResult(3, 300, Predicates.equalTo(false));
    private static final Logger logger = LoggerFactory.getLogger(MysqlOffsetPersist.class);
    public static volatile Boolean destoryFlag = false;
    public static volatile Boolean runFlag = false;

    public static synchronized MysqlOffsetPersist getInstance() {
        if (instance == null) {
            instance = new MysqlOffsetPersist();
        }
        return instance;
    }

    private MysqlOffsetPersist() {
    }

    @Override // cn.thinkingdata.kafka.consumer.persist.OffsetPersist
    public void persist(KafkaConsumerOffset kafkaConsumerOffset) {
        if (saveOffset(kafkaConsumerOffset).booleanValue()) {
            return;
        }
        logger.error("can not persist in both mysql or backup store");
        this.externalStorePersist.executeWhenSaveOffsetFailInMysqlAndExternalStore(kafkaConsumerOffset);
    }

    @Override // cn.thinkingdata.kafka.consumer.persist.OffsetPersist
    public void shutdown() {
        logger.info("------- Shutting mysql offset thread Down ---------------------");
        MysqlOffsetManager.getInstance().shutdown();
    }

    public void mysqlAndBackupStoreStateCheckJob() {
        int i = -1;
        while (!KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.get()) {
            if (i == -1) {
                logger.info("------- mysql or backup store down, check mysql and backup store status ---------------------");
                i = 0;
            }
            Boolean mysqlStateCheckWithRetry = mysqlStateCheckWithRetry();
            Boolean backupStoreStateCheckWithRetry = backupStoreStateCheckWithRetry();
            if (mysqlStateCheckWithRetry.booleanValue() && backupStoreStateCheckWithRetry.booleanValue()) {
                i = 0;
            } else {
                i++;
                if (i > Integer.parseInt(KafkaMysqlOffsetParameter.sessionTimeout) - 10) {
                    this.externalStorePersist.executeWhenSessionTimeout(Integer.valueOf(i));
                }
            }
            KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.set(mysqlStateCheckWithRetry.booleanValue() || backupStoreStateCheckWithRetry.booleanValue());
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.error("------- thread can not sleep ---------------------" + e.toString());
            }
        }
    }

    public Boolean mysqlStateCheckWithRetry() {
        Boolean bool;
        try {
            bool = (Boolean) this.retryerWithResultFails.call(() -> {
                return MysqlOffsetManager.getInstance().mysqlStateCheck();
            });
        } catch (ExecutionException | RetryException e) {
            logger.error("retry mysqlStateCheck error, the error is " + CommonUtils.getStackTraceAsString(e));
            bool = false;
        }
        return bool;
    }

    public Boolean backupStoreStateCheckWithRetry() {
        Boolean bool;
        try {
            bool = (Boolean) this.retryerWithResultFails.call(() -> {
                return this.externalStorePersist.backupStoreStateCheck();
            });
        } catch (ExecutionException | RetryException e) {
            logger.error("retry backupStoreStateCheck error, the error is " + CommonUtils.getStackTraceAsString(e));
            bool = false;
        }
        return bool;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!destoryFlag.booleanValue()) {
            if (!KafkaMysqlOffsetParameter.kafkaSubscribeConsumerClosed.get()) {
                runFlag = true;
                mysqlAndBackupStoreStateCheckJob();
                if (KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.get()) {
                    persisit();
                }
            }
            runFlag = false;
            try {
                Thread.sleep(new Long(KafkaMysqlOffsetParameter.flushInterval.intValue()).longValue() * 100);
            } catch (InterruptedException e) {
                logger.error("------- thread can not sleep ---------------------" + e.toString());
            }
        }
        runFlag = false;
        logger.info("mysql persist stop, runFlag is " + runFlag);
    }

    private synchronized void persisit() {
        Date date = new Date();
        for (KafkaConsumerOffset kafkaConsumerOffset : KafkaCache.kafkaConsumerOffsetMaps.values()) {
            Long valueOf = Long.valueOf(kafkaConsumerOffset.getOffset().longValue() - kafkaConsumerOffset.getLast_flush_offset().longValue());
            Long valueOf2 = Long.valueOf(date.getTime() - kafkaConsumerOffset.getUpdate_time().getTime());
            if (valueOf.longValue() >= KafkaMysqlOffsetParameter.flushOffsetSize.intValue() || valueOf2.longValue() >= new Long(KafkaMysqlOffsetParameter.flushInterval.intValue()).longValue() * 1000) {
                persist(kafkaConsumerOffset);
            }
        }
    }

    public Boolean saveOffset(KafkaConsumerOffset kafkaConsumerOffset) {
        Boolean bool;
        Boolean bool2;
        Date date = new Date();
        kafkaConsumerOffset.setUpdate_time(date);
        if (kafkaConsumerOffset.getCreate_time() == null) {
            kafkaConsumerOffset.setCreate_time(date);
        }
        Long offset = kafkaConsumerOffset.getOffset();
        try {
            bool = (Boolean) this.retryerWithResultFails.call(() -> {
                return MysqlOffsetManager.getInstance().saveOffsetInCacheToMysql(kafkaConsumerOffset);
            });
        } catch (ExecutionException | RetryException e) {
            logger.error("retry to save kafkaConsumerOffset to mysql and backup external store error, the error is " + CommonUtils.getStackTraceAsString(e));
            bool = false;
        }
        try {
            bool2 = (Boolean) this.retryerWithResultFails.call(() -> {
                return MysqlOffsetManager.getInstance().getExternalStorePersist().saveOffsetInBackupExternalStore(kafkaConsumerOffset);
            });
        } catch (ExecutionException | RetryException e2) {
            logger.error("retry to save kafkaConsumerOffset to mysql and backup external store error, the error is " + CommonUtils.getStackTraceAsString(e2));
            bool2 = false;
        }
        Boolean valueOf = Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
        if (valueOf.booleanValue()) {
            kafkaConsumerOffset.setLast_flush_offset(offset);
            kafkaConsumerOffset.setCount(0L);
            KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.set(true);
        } else {
            KafkaMysqlOffsetParameter.mysqlAndBackupStoreConnState.set(false);
        }
        return valueOf;
    }

    @Override // cn.thinkingdata.kafka.consumer.persist.OffsetPersist
    public synchronized Boolean flush(KafkaConsumerOffset kafkaConsumerOffset) {
        logger.info("------- flush offset in cache to mysql ---------------------");
        Boolean saveOffset = saveOffset(kafkaConsumerOffset);
        if (!saveOffset.booleanValue()) {
            logger.error("can not flush in mysql or backup store");
            this.externalStorePersist.executeWhenSaveOffsetFailInMysqlAndExternalStore(kafkaConsumerOffset);
        }
        TopicPartition topicPartition = new TopicPartition(kafkaConsumerOffset.getTopic(), kafkaConsumerOffset.getPartition().intValue());
        KafkaConsumerOffset kafkaConsumerOffset2 = KafkaCache.kafkaConsumerOffsetMaps.get(topicPartition);
        if (kafkaConsumerOffset2 != null && kafkaConsumerOffset2.equals(kafkaConsumerOffset)) {
            KafkaCache.kafkaConsumerOffsetMaps.remove(topicPartition);
        }
        kafkaConsumerOffset.setOwner("");
        updateOwner(kafkaConsumerOffset);
        return saveOffset;
    }

    public synchronized Boolean updateOwner(KafkaConsumerOffset kafkaConsumerOffset) {
        try {
            return Boolean.valueOf(((Boolean) this.retryerWithResultFails.call(() -> {
                return MysqlOffsetManager.getInstance().updateOwner(kafkaConsumerOffset);
            })).booleanValue() || ((Boolean) this.retryerWithResultFails.call(() -> {
                return MysqlOffsetManager.getInstance().getExternalStorePersist().updateOwner(kafkaConsumerOffset);
            })).booleanValue());
        } catch (ExecutionException | RetryException e) {
            logger.error("retry to updateOwner from mysql error, the error is " + CommonUtils.getStackTraceAsString(e));
            return false;
        }
    }
}
