package org.enodeframework.eventing.impl;

import com.google.common.base.Strings;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.enodeframework.common.exception.ENodeRuntimeException;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.io.Task;
import org.enodeframework.common.scheduling.IScheduleService;
import org.enodeframework.eventing.DomainEventStreamMessage;
import org.enodeframework.eventing.IProcessingEventProcessor;
import org.enodeframework.eventing.IPublishedVersionStore;
import org.enodeframework.eventing.ProcessingEvent;
import org.enodeframework.eventing.ProcessingEventMailBox;
import org.enodeframework.messaging.IMessageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/enodeframework/eventing/impl/DefaultProcessingEventProcessor.class */
public class DefaultProcessingEventProcessor implements IProcessingEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProcessingEventProcessor.class);

    @Autowired
    private IScheduleService scheduleService;

    @Autowired
    private IMessageDispatcher dispatcher;

    @Autowired
    private IPublishedVersionStore publishedVersionStore;
    private final Object lockObj = new Object();
    private int timeoutSeconds = 259200;
    private int scanExpiredAggregateIntervalMilliseconds = 5000;
    private String processorName = "DefaultEventProcessor";
    private ConcurrentHashMap<String, ProcessingEventMailBox> mailboxDict = new ConcurrentHashMap<>();
    private String taskName = "CleanInactiveProcessingEventMailBoxes_" + System.nanoTime() + new Random().nextInt(10000);

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void process(ProcessingEvent processingEvent) {
        String aggregateRootId = processingEvent.getMessage().getAggregateRootId();
        if (Strings.isNullOrEmpty(aggregateRootId)) {
            throw new IllegalArgumentException("aggregateRootId of domain event stream cannot be null or empty, domainEventStreamId:" + processingEvent.getMessage().getId());
        }
        synchronized (this.lockObj) {
            this.mailboxDict.computeIfAbsent(aggregateRootId, str -> {
                return new ProcessingEventMailBox(aggregateRootId, getAggregateRootLatestHandledEventVersion(processingEvent.getMessage().getAggregateRootTypeName(), aggregateRootId), processingEvent2 -> {
                    dispatchProcessingMessageAsync(processingEvent2, 0);
                });
            }).enqueueMessage(processingEvent);
        }
    }

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void start() {
        this.scheduleService.startTask(this.taskName, this::cleanInactiveMailbox, this.scanExpiredAggregateIntervalMilliseconds, this.scanExpiredAggregateIntervalMilliseconds);
    }

    @Override // org.enodeframework.eventing.IProcessingEventProcessor
    public void stop() {
        this.scheduleService.stopTask(this.taskName);
    }

    private void dispatchProcessingMessageAsync(ProcessingEvent processingEvent, int i) {
        IOHelper.tryAsyncActionRecursivelyWithoutResult("DispatchProcessingMessageAsync", () -> {
            return this.dispatcher.dispatchMessagesAsync(processingEvent.getMessage().getEvents());
        }, r6 -> {
            updatePublishedVersionAsync(processingEvent, 0);
        }, () -> {
            return String.format("sequence message [messageId:%s, messageType:%s, aggregateRootId:%s, aggregateRootVersion:%s]", processingEvent.getMessage().getId(), processingEvent.getMessage().getClass().getName(), processingEvent.getMessage().getAggregateRootId(), Integer.valueOf(processingEvent.getMessage().getVersion()));
        }, null, i, true);
    }

    private int getAggregateRootLatestHandledEventVersion(String str, String str2) {
        try {
            return ((Integer) Task.await(this.publishedVersionStore.getPublishedVersionAsync(this.processorName, str, str2))).intValue();
        } catch (Exception e) {
            throw new ENodeRuntimeException("_publishedVersionStore.GetPublishedVersionAsync has unknown exception.", e);
        }
    }

    private void updatePublishedVersionAsync(ProcessingEvent processingEvent, int i) {
        DomainEventStreamMessage message = processingEvent.getMessage();
        IOHelper.tryAsyncActionRecursivelyWithoutResult("UpdatePublishedVersionAsync", () -> {
            return this.publishedVersionStore.updatePublishedVersionAsync(this.processorName, message.getAggregateRootTypeName(), message.getAggregateRootId(), message.getVersion());
        }, r3 -> {
            processingEvent.complete();
        }, () -> {
            return String.format("DomainEventStreamMessage [messageId:%s, messageType:%s, aggregateRootId:%s, aggregateRootVersion:%s]", message.getId(), message.getClass().getName(), message.getAggregateRootId(), Integer.valueOf(message.getVersion()));
        }, null, i, true);
    }

    private void cleanInactiveMailbox() {
        ((List) this.mailboxDict.entrySet().stream().filter(entry -> {
            return ((ProcessingEventMailBox) entry.getValue()).isInactive(this.timeoutSeconds) && !((ProcessingEventMailBox) entry.getValue()).isRunning() && ((ProcessingEventMailBox) entry.getValue()).getTotalUnHandledMessageCount() == 0;
        }).collect(Collectors.toList())).forEach(entry2 -> {
            synchronized (this.lockObj) {
                if (((ProcessingEventMailBox) entry2.getValue()).isInactive(this.timeoutSeconds) && !((ProcessingEventMailBox) entry2.getValue()).isRunning() && ((ProcessingEventMailBox) entry2.getValue()).getTotalUnHandledMessageCount() == 0 && this.mailboxDict.remove(entry2.getKey()) != null) {
                    logger.info("Removed inactive domain event stream mailbox, aggregateRootId: {}", entry2.getKey());
                }
            }
        });
    }
}
