package org.enodeframework.eventing;

import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.enodeframework.common.function.Action1;
import org.enodeframework.common.io.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/eventing/ProcessingEventMailBox.class */
public class ProcessingEventMailBox {
    private static final Logger logger = LoggerFactory.getLogger(ProcessingEventMailBox.class);
    private String aggregateRootId;
    private String aggregateRootTypeName;
    private int nextExpectingEventVersion;
    private Action1<ProcessingEvent> handleProcessingEventAction;
    private final Object lockObj = new Object();
    private AtomicInteger isUsing = new AtomicInteger(0);
    private AtomicInteger isRemoved = new AtomicInteger(0);
    private AtomicInteger isRunning = new AtomicInteger(0);
    private ConcurrentHashMap<Integer, ProcessingEvent> waitingProcessingEventDict = new ConcurrentHashMap<>();
    private ConcurrentLinkedQueue<ProcessingEvent> processingEventQueue = new ConcurrentLinkedQueue<>();
    private Date lastActiveTime = new Date();

    public ProcessingEventMailBox(String str, String str2, int i, Action1<ProcessingEvent> action1) {
        this.aggregateRootId = str2;
        this.aggregateRootTypeName = str;
        this.nextExpectingEventVersion = i;
        this.handleProcessingEventAction = action1;
    }

    private void tryEnqueueWaitingMessage() {
        while (this.waitingProcessingEventDict.containsKey(Integer.valueOf(this.nextExpectingEventVersion))) {
            ProcessingEvent remove = this.waitingProcessingEventDict.remove(Integer.valueOf(this.nextExpectingEventVersion));
            if (remove != null) {
                enqueueEventStream(remove);
                logger.info("{} enqueued waiting processingEvent, aggregateRootId: {}, aggregateRootTypeName: {}, eventVersion: {}", new Object[]{getClass().getName(), this.aggregateRootId, this.aggregateRootTypeName, Integer.valueOf(remove.getMessage().getVersion())});
            }
        }
    }

    public long getTotalUnHandledMessageCount() {
        return this.processingEventQueue.size();
    }

    public void setNextExpectingEventVersion(int i) {
        synchronized (this.lockObj) {
            if (i > this.nextExpectingEventVersion) {
                this.nextExpectingEventVersion = i;
                logger.info("{} refreshed nextExpectingEventVersion, aggregateRootId: {}, aggregateRootTypeName: {}, version: {}", new Object[]{getClass().getName(), this.aggregateRootId, this.aggregateRootTypeName, Integer.valueOf(this.nextExpectingEventVersion)});
                tryEnqueueWaitingMessage();
                this.lastActiveTime = new Date();
                tryRun();
            }
        }
    }

    private void enqueueEventStream(ProcessingEvent processingEvent) {
        synchronized (this.lockObj) {
            processingEvent.setMailbox(this);
            this.processingEventQueue.add(processingEvent);
            this.nextExpectingEventVersion = processingEvent.getMessage().getVersion() + 1;
            if (logger.isDebugEnabled()) {
                logger.debug("{} enqueued new message, aggregateRootType: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventTypes: {}, eventIds: {}", new Object[]{getClass().getName(), processingEvent.getMessage().getAggregateRootTypeName(), processingEvent.getMessage().getAggregateRootId(), processingEvent.getMessage().getCommandId(), Integer.valueOf(processingEvent.getMessage().getVersion()), processingEvent.getMessage().getId(), String.join("|", (Iterable<? extends CharSequence>) processingEvent.getMessage().getEvents().stream().map(iDomainEvent -> {
                    return iDomainEvent.getClass().getName();
                }).collect(Collectors.toList())), String.join("|", (Iterable<? extends CharSequence>) processingEvent.getMessage().getEvents().stream().map(iDomainEvent2 -> {
                    return iDomainEvent2.getId();
                }).collect(Collectors.toList()))});
            }
        }
    }

    public EnqueueMessageResult enqueueMessage(ProcessingEvent processingEvent) {
        synchronized (this.lockObj) {
            DomainEventStreamMessage message = processingEvent.getMessage();
            if (message.getVersion() == this.nextExpectingEventVersion) {
                enqueueEventStream(processingEvent);
                tryEnqueueWaitingMessage();
                this.lastActiveTime = new Date();
                tryRun();
                return EnqueueMessageResult.Success;
            }
            if (message.getVersion() <= this.nextExpectingEventVersion) {
                return EnqueueMessageResult.Ignored;
            }
            if (this.waitingProcessingEventDict.putIfAbsent(Integer.valueOf(message.getVersion()), processingEvent) == null) {
                logger.warn("{} later version of message arrived, added it to the waiting list, aggregateRootType: {}, aggregateRootId: {}, commandId: {}, eventVersion: {}, eventStreamId: {}, eventTypes: {}, eventIds: {}, _nextExpectingEventVersion: {}", new Object[]{getClass().getName(), message.getAggregateRootTypeName(), message.getAggregateRootId(), message.getCommandId(), Integer.valueOf(message.getVersion()), message.getId(), message.getEvents().stream().map(iDomainEvent -> {
                    return iDomainEvent.getClass().getName();
                }).collect(Collectors.joining("|")), message.getEvents().stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.joining("|")), Integer.valueOf(this.nextExpectingEventVersion)});
            }
            return EnqueueMessageResult.AddToWaitingList;
        }
    }

    public void tryRun() {
        synchronized (this.lockObj) {
            if (isRunning()) {
                return;
            }
            setAsRunning();
            if (logger.isDebugEnabled()) {
                logger.debug("{} start run, aggregateRootId: {}", getClass().getName(), this.aggregateRootId);
            }
            CompletableFuture.runAsync(this::processMessages);
        }
    }

    public void completeRun() {
        this.lastActiveTime = new Date();
        if (logger.isDebugEnabled()) {
            logger.debug("{} complete run, aggregateRootId: {}", getClass().getName(), this.aggregateRootId);
        }
        setAsNotRunning();
        if (getTotalUnHandledMessageCount() > 0) {
            tryRun();
        }
    }

    public boolean isInactive(int i) {
        return System.currentTimeMillis() - this.lastActiveTime.getTime() >= ((long) i);
    }

    private void processMessages() {
        ProcessingEvent poll = this.processingEventQueue.poll();
        if (poll == null) {
            completeRun();
            return;
        }
        this.lastActiveTime = new Date();
        try {
            this.handleProcessingEventAction.apply(poll);
        } catch (Exception e) {
            logger.error("{} run has unknown exception, aggregateRootId: {}", new Object[]{getClass().getName(), this.aggregateRootId, e});
            Task.sleep(1L);
            completeRun();
        }
    }

    public String getAggregateRootTypeName() {
        return this.aggregateRootTypeName;
    }

    public String getAggregateRootId() {
        return this.aggregateRootId;
    }

    public boolean tryUsing() {
        return this.isUsing.compareAndSet(0, 1);
    }

    public void exitUsing() {
        this.isUsing.set(0);
    }

    public void markAsRemoved() {
        this.isRemoved.set(1);
    }

    private void setAsRunning() {
        this.isRunning.set(1);
    }

    public boolean isRunning() {
        return this.isRunning.get() == 1;
    }

    public boolean isRemoved() {
        return this.isRemoved.get() == 1;
    }

    private void setAsNotRunning() {
        this.isRunning.set(0);
    }

    public int getWaitingMessageCount() {
        return this.waitingProcessingEventDict.size();
    }
}
