package me.ahoo.eventbus.core.compensation.impl;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.ahoo.eventbus.core.compensation.CompensationPublishEventWrapper;
import me.ahoo.eventbus.core.compensation.PublishCompensation;
import me.ahoo.eventbus.core.compensation.impl.config.PublishConfig;
import me.ahoo.eventbus.core.compensation.impl.config.ScheduleConfig;
import me.ahoo.eventbus.core.consistency.ConsistencyPublisher;
import me.ahoo.eventbus.core.repository.PublishEventRepository;
import me.ahoo.eventbus.core.repository.PublishIdentity;
import me.ahoo.eventbus.core.repository.entity.PublishEventCompensationEntity;
import me.ahoo.eventbus.core.repository.entity.PublishEventEntity;
import me.ahoo.eventbus.core.utils.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/eventbus/core/compensation/impl/PublishCompensationImpl.class */
public class PublishCompensationImpl implements PublishCompensation {
    private static final Logger log = LoggerFactory.getLogger(PublishCompensationImpl.class);
    private final PublishConfig publishConfig;
    private final ConsistencyPublisher consistencyPublisher;
    private final PublishEventRepository publishEventRepository;
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, Threads.defaultFactory("PublishCompensation"));
    private volatile boolean running;

    public PublishCompensationImpl(PublishConfig publishConfig, ConsistencyPublisher consistencyPublisher, PublishEventRepository publishEventRepository) {
        this.publishConfig = publishConfig;
        this.consistencyPublisher = consistencyPublisher;
        this.publishEventRepository = publishEventRepository;
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        ScheduleConfig schedule = this.publishConfig.getSchedule();
        this.scheduledThreadPoolExecutor.scheduleAtFixedRate(this::schedule, schedule.getInitialDelay(), schedule.getPeriod(), TimeUnit.SECONDS);
    }

    private void schedule() {
        log.debug("schedule.");
        try {
            List<PublishEventEntity> queryFailed = queryFailed();
            if (queryFailed.isEmpty()) {
                log.info("can not find any failed publish event!");
                return;
            }
            Iterator<PublishEventEntity> it = queryFailed.iterator();
            while (it.hasNext()) {
                compensate(it.next());
            }
        } catch (Throwable th) {
            log.error("schedule error", th);
        }
    }

    private void compensate(PublishEventEntity publishEventEntity) {
        PublishEventCompensationEntity build = PublishEventCompensationEntity.builder().publishEventId(publishEventEntity.getId()).startTime(LocalDateTime.now()).build();
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            log.info("Compensate PublishEvent -> id:[{}] ,version:[{}].", publishEventEntity.getId(), publishEventEntity.getVersion());
            PublishIdentity build2 = PublishIdentity.builder().id(publishEventEntity.getId()).eventName(publishEventEntity.getEventName()).status(publishEventEntity.getStatus()).version(publishEventEntity.getVersion()).build();
            CompensationPublishEventWrapper compensationPublishEventWrapper = new CompensationPublishEventWrapper();
            compensationPublishEventWrapper.setId(publishEventEntity.getId());
            compensationPublishEventWrapper.setEventName(publishEventEntity.getEventName());
            compensationPublishEventWrapper.setEventData(publishEventEntity.getEventData());
            compensationPublishEventWrapper.setCreateTime(publishEventEntity.getCreateTime());
            this.consistencyPublisher.publish(build2, compensationPublishEventWrapper).get();
        } catch (Throwable th) {
            build.setFailedMsg(Throwables.getStackTraceAsString(th));
        }
        try {
            build.setTaken(Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            this.publishEventRepository.compensate(build);
        } catch (Throwable th2) {
            log.error(th2.getMessage(), th2);
        }
    }

    private List<PublishEventEntity> queryFailed() {
        return this.publishEventRepository.queryFailed(this.publishConfig.getBatch().intValue(), this.publishConfig.getBefore().intValue(), this.publishConfig.getMaxVersion().intValue());
    }

    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.scheduledThreadPoolExecutor.shutdown();
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}
