package org.enodeframework.eventing;

import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.enodeframework.common.function.Action1;
import org.enodeframework.common.io.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/eventing/ProcessingEventMailBox.class */
public class ProcessingEventMailBox {
    private static final Logger logger = LoggerFactory.getLogger(ProcessingEventMailBox.class);
    private String aggregateRootId;
    private boolean running;
    private int latestHandledEventVersion;
    private Action1<ProcessingEvent> handleMessageAction;
    private final Object lockObj = new Object();
    private ConcurrentHashMap<Integer, ProcessingEvent> waitingMessageDict = new ConcurrentHashMap<>();
    private ConcurrentLinkedQueue<ProcessingEvent> messageQueue = new ConcurrentLinkedQueue<>();
    private Date lastActiveTime = new Date();

    public ProcessingEventMailBox(String str, int i, Action1<ProcessingEvent> action1) {
        this.aggregateRootId = str;
        this.latestHandledEventVersion = i;
        this.handleMessageAction = action1;
    }

    public String getAggregateRootId() {
        return this.aggregateRootId;
    }

    public boolean isRunning() {
        return this.running;
    }

    public long getTotalUnHandledMessageCount() {
        return this.messageQueue.size();
    }

    public void enqueueMessage(ProcessingEvent processingEvent) {
        synchronized (this.lockObj) {
            DomainEventStreamMessage message = processingEvent.getMessage();
            if (message.getVersion() == this.latestHandledEventVersion + 1) {
                processingEvent.setMailbox(this);
                this.messageQueue.add(processingEvent);
                if (logger.isDebugEnabled()) {
                    logger.debug("{} enqueued new message, aggregateRootType: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventTypes: {}, eventIds: {}", new Object[]{getClass().getName(), message.getAggregateRootTypeName(), message.getAggregateRootId(), message.getCommandId(), Integer.valueOf(message.getVersion()), message.getId(), message.getEvents().stream().map(iDomainEvent -> {
                        return iDomainEvent.getClass().getName();
                    }).collect(Collectors.joining("|")), message.getEvents().stream().map((v0) -> {
                        return v0.getId();
                    }).collect(Collectors.joining("|"))});
                }
                this.latestHandledEventVersion = message.getVersion();
                for (int version = message.getVersion() + 1; this.waitingMessageDict.containsKey(Integer.valueOf(version)); version++) {
                    ProcessingEvent remove = this.waitingMessageDict.remove(Integer.valueOf(version));
                    DomainEventStreamMessage message2 = remove.getMessage();
                    remove.setMailbox(this);
                    this.messageQueue.add(remove);
                    this.latestHandledEventVersion = message2.getVersion();
                    if (logger.isDebugEnabled()) {
                        logger.debug("{} enqueued new message, aggregateRootType: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventTypes: {}, eventIds: {}", new Object[]{getClass().getName(), message.getAggregateRootTypeName(), message2.getAggregateRootId(), message2.getCommandId(), Integer.valueOf(message2.getVersion()), message2.getId(), message.getEvents().stream().map(iDomainEvent2 -> {
                            return iDomainEvent2.getClass().getName();
                        }).collect(Collectors.joining("|")), message.getEvents().stream().map((v0) -> {
                            return v0.getId();
                        }).collect(Collectors.joining("|"))});
                    }
                }
                this.lastActiveTime = new Date();
                tryRun();
            } else if (message.getVersion() > this.latestHandledEventVersion + 1) {
                this.waitingMessageDict.put(Integer.valueOf(message.getVersion()), processingEvent);
            }
        }
    }

    public void tryRun() {
        synchronized (this.lockObj) {
            if (isRunning()) {
                return;
            }
            setAsRunning();
            if (logger.isDebugEnabled()) {
                logger.debug("{} start run, aggregateRootId: {}", getClass().getName(), this.aggregateRootId);
            }
            CompletableFuture.runAsync(this::processMessages);
        }
    }

    public void completeRun() {
        this.lastActiveTime = new Date();
        if (logger.isDebugEnabled()) {
            logger.debug("{} complete run, aggregateRootId: {}", getClass().getName(), this.aggregateRootId);
        }
        setAsNotRunning();
        if (getTotalUnHandledMessageCount() > 0) {
            tryRun();
        }
    }

    public boolean isInactive(int i) {
        return System.currentTimeMillis() - this.lastActiveTime.getTime() >= ((long) i);
    }

    private void processMessages() {
        ProcessingEvent poll = this.messageQueue.poll();
        if (poll == null) {
            completeRun();
            return;
        }
        this.lastActiveTime = new Date();
        try {
            this.handleMessageAction.apply(poll);
        } catch (Exception e) {
            logger.error("{} run has unknown exception, aggregateRootId: {}", new Object[]{getClass().getName(), this.aggregateRootId, e});
            Task.sleep(1L);
            completeRun();
        }
    }

    private void setAsRunning() {
        this.running = true;
    }

    private void setAsNotRunning() {
        this.running = false;
    }

    public int getWaitingMessageCount() {
        return this.waitingMessageDict.size();
    }
}
