package me.ahoo.eventbus.core.compensate;

import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import me.ahoo.eventbus.core.publisher.PublishEvent;
import me.ahoo.eventbus.core.repository.SubscribeEventRepository;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventCompensateEntity;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventEntity;
import me.ahoo.eventbus.core.serialize.Deserializer;
import me.ahoo.eventbus.core.subscriber.Subscriber;
import me.ahoo.eventbus.core.subscriber.SubscriberRegistry;
import me.ahoo.simba.core.MutexContendServiceFactory;
import me.ahoo.simba.schedule.ScheduleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:me/ahoo/eventbus/core/compensate/SubscribeCompensateScheduler.class */
public class SubscribeCompensateScheduler extends AbstractCompensateScheduler {
    private static final Logger log = LoggerFactory.getLogger(SubscribeCompensateScheduler.class);
    private final CompensateConfig compensateConfig;
    private final Deserializer deserializer;
    private final SubscriberRegistry subscriberRegistry;
    protected final SubscribeEventRepository subscribeEventRepository;

    public SubscribeCompensateScheduler(CompensateConfig compensateConfig, ScheduleConfig scheduleConfig, Deserializer deserializer, SubscriberRegistry subscriberRegistry, SubscribeEventRepository subscribeEventRepository, MutexContendServiceFactory mutexContendServiceFactory) {
        super("eventbus_subscribe_leader", scheduleConfig, mutexContendServiceFactory);
        this.compensateConfig = compensateConfig;
        this.deserializer = deserializer;
        this.subscriberRegistry = subscriberRegistry;
        this.subscribeEventRepository = subscribeEventRepository;
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractCompensateScheduler
    protected String getWorker() {
        return "SubscribeCompensateScheduler";
    }

    @Override // me.ahoo.eventbus.core.compensate.AbstractCompensateScheduler
    protected void work() {
        try {
            List<SubscribeEventEntity> queryFailed = this.subscribeEventRepository.queryFailed(this.compensateConfig.getBatch().intValue(), this.compensateConfig.getMaxVersion().intValue(), this.compensateConfig.getBefore(), this.compensateConfig.getRange());
            if (queryFailed.isEmpty()) {
                if (log.isInfoEnabled()) {
                    log.info("work - can not find any failed subscribe event!");
                }
            } else {
                Iterator<SubscribeEventEntity> it = queryFailed.iterator();
                while (it.hasNext()) {
                    compensate(it.next());
                }
            }
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error(th.getMessage(), th);
            }
        }
    }

    protected void compensate(SubscribeEventEntity subscribeEventEntity) {
        SubscribeEventCompensateEntity build = SubscribeEventCompensateEntity.builder().subscribeEventId(subscribeEventEntity.getId()).startTime(Long.valueOf(System.currentTimeMillis())).build();
        if (log.isInfoEnabled()) {
            log.info("compensate - SubscribeEvent -> id:[{}] ,version:[{}].", subscribeEventEntity.getId(), subscribeEventEntity.getVersion());
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        Subscriber subscriber = this.subscriberRegistry.getSubscriber(subscribeEventEntity.getSubscriberName());
        try {
            subscriber.invoke(convert(subscribeEventEntity, subscriber));
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error(th.getMessage(), th);
            }
            build.setFailedMsg(Throwables.getStackTraceAsString(th));
        }
        try {
            build.setTaken(Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
            this.subscribeEventRepository.compensate(build);
        } catch (Throwable th2) {
            if (log.isErrorEnabled()) {
                log.error(th2.getMessage(), th2);
            }
        }
    }

    private PublishEvent convert(SubscribeEventEntity subscribeEventEntity, Subscriber subscriber) {
        Object deserialize = this.deserializer.deserialize(subscribeEventEntity.getEventData(), subscriber.getSubscribeEventClass());
        PublishEvent publishEvent = new PublishEvent();
        publishEvent.setId(subscribeEventEntity.getEventId());
        publishEvent.setEventName(subscribeEventEntity.getEventName());
        publishEvent.setEventData(deserialize);
        publishEvent.setCreateTime(subscribeEventEntity.getEventCreateTime());
        return publishEvent;
    }
}
