package me.ahoo.eventbus.core.compensate.db;

import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.ahoo.eventbus.core.compensate.AbstractSubscribeCompensate;
import me.ahoo.eventbus.core.compensate.ScheduleConfig;
import me.ahoo.eventbus.core.compensate.db.config.SubscribeConfig;
import me.ahoo.eventbus.core.repository.SubscribeEventRepository;
import me.ahoo.eventbus.core.serialize.Deserializer;
import me.ahoo.eventbus.core.subscriber.SubscriberRegistry;
import me.ahoo.eventbus.core.utils.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/eventbus/core/compensate/db/DbSubscribeCompensate.class */
public class DbSubscribeCompensate extends AbstractSubscribeCompensate {
    private static final Logger log = LoggerFactory.getLogger(DbSubscribeCompensate.class);
    private final SubscribeConfig subscribeConfig;
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final String leaderId;

    public DbSubscribeCompensate(Deserializer deserializer, SubscribeConfig subscribeConfig, SubscriberRegistry subscriberRegistry, SubscribeEventRepository subscribeEventRepository) {
        super(subscribeConfig, deserializer, subscriberRegistry, subscribeEventRepository);
        this.subscribeConfig = subscribeConfig;
        this.leaderId = CompensateLeaderService.generateLeaderId();
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractSubscribeCompensate
    protected void start0() {
        if (Objects.isNull(this.scheduledThreadPoolExecutor)) {
            this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, Threads.defaultFactory("ShardingSubscribeCompensate"));
        }
        ScheduleConfig schedule = this.subscribeConfig.getSchedule();
        this.scheduledThreadPoolExecutor.scheduleAtFixedRate(this::doWork, schedule.getInitialDelay(), schedule.getPeriod(), TimeUnit.SECONDS);
    }

    private void doWork() {
        if (CompensateLeaderService.fightLeadership(this.subscribeEventRepository, this.leaderId, this.subscribeConfig.getLeader())) {
            schedule();
        }
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractSubscribeCompensate
    protected void stop0() {
        if (Objects.nonNull(this.scheduledThreadPoolExecutor)) {
            this.scheduledThreadPoolExecutor.shutdown();
        }
        boolean releaseLeadership = this.subscribeEventRepository.releaseLeadership(this.leaderId);
        if (log.isInfoEnabled()) {
            if (releaseLeadership) {
                log.info("stop0 - Release leadership successfully, leaderId:[{}].", this.leaderId);
            } else {
                log.info("stop0 - Failed to release leadership, because I'm not a leader.leaderId:[{}].", this.leaderId);
            }
        }
    }
}
