package me.ahoo.eventbus.core.compensate;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import me.ahoo.eventbus.core.consistency.ConsistencyPublisher;
import me.ahoo.eventbus.core.repository.PublishEventRepository;
import me.ahoo.eventbus.core.repository.PublishIdentity;
import me.ahoo.eventbus.core.repository.entity.PublishEventCompensateEntity;
import me.ahoo.eventbus.core.repository.entity.PublishEventEntity;
import me.ahoo.simba.core.MutexContendServiceFactory;
import me.ahoo.simba.schedule.ScheduleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/eventbus/core/compensate/PublishCompensateScheduler.class */
public class PublishCompensateScheduler extends AbstractCompensateScheduler {
    private static final Logger log = LoggerFactory.getLogger(PublishCompensateScheduler.class);
    private final CompensateConfig compensateConfig;
    private final ConsistencyPublisher consistencyPublisher;
    protected final PublishEventRepository publishEventRepository;

    public PublishCompensateScheduler(CompensateConfig compensateConfig, ScheduleConfig scheduleConfig, ConsistencyPublisher consistencyPublisher, PublishEventRepository publishEventRepository, MutexContendServiceFactory mutexContendServiceFactory) {
        super("eventbus_publish_leader", scheduleConfig, mutexContendServiceFactory);
        this.compensateConfig = compensateConfig;
        this.consistencyPublisher = consistencyPublisher;
        this.publishEventRepository = publishEventRepository;
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractCompensateScheduler
    protected String getWorker() {
        return "PublishCompensateScheduler";
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractCompensateScheduler
    protected void work() {
        try {
            List<PublishEventEntity> queryFailed = this.publishEventRepository.queryFailed(this.compensateConfig.getBatch().intValue(), this.compensateConfig.getMaxVersion().intValue(), this.compensateConfig.getBefore(), this.compensateConfig.getRange());
            if (queryFailed.isEmpty()) {
                if (log.isInfoEnabled()) {
                    log.info("work - can not find any failed publish event!");
                }
            } else {
                Iterator<PublishEventEntity> it = queryFailed.iterator();
                while (it.hasNext()) {
                    compensate(it.next());
                }
            }
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error(th.getMessage(), th);
            }
        }
    }

    protected void compensate(PublishEventEntity publishEventEntity) {
        PublishEventCompensateEntity build = PublishEventCompensateEntity.builder().publishEventId(publishEventEntity.getId()).startTime(Long.valueOf(System.currentTimeMillis())).build();
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            if (log.isInfoEnabled()) {
                log.info("compensate - PublishEvent -> id:[{}] ,version:[{}].", publishEventEntity.getId(), publishEventEntity.getVersion());
            }
            PublishIdentity publishIdentity = new PublishIdentity();
            publishIdentity.setId(publishEventEntity.getId());
            publishIdentity.setEventName(publishEventEntity.getEventName());
            publishIdentity.setStatus(publishEventEntity.getStatus());
            publishIdentity.setVersion(publishEventEntity.getVersion());
            CompensatePublishEvent compensatePublishEvent = new CompensatePublishEvent();
            compensatePublishEvent.setId(publishEventEntity.getId());
            compensatePublishEvent.setEventName(publishEventEntity.getEventName());
            compensatePublishEvent.setEventData(publishEventEntity.getEventData());
            compensatePublishEvent.setCreateTime(publishEventEntity.getCreateTime());
            this.consistencyPublisher.publish(publishIdentity, compensatePublishEvent).get();
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error(th.getMessage(), th);
            }
            build.setFailedMsg(Throwables.getStackTraceAsString(th));
        }
        try {
            build.setTaken(Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            this.publishEventRepository.compensate(build);
        } catch (Throwable th2) {
            if (log.isErrorEnabled()) {
                log.error(th2.getMessage(), th2);
            }
        }
    }
}
