package org.enodeframework.queue.command;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import org.enodeframework.commanding.CommandMessage;
import org.enodeframework.commanding.CommandOptions;
import org.enodeframework.commanding.CommandResult;
import org.enodeframework.commanding.CommandReturnType;
import org.enodeframework.commanding.CommandStatus;
import org.enodeframework.common.exception.DuplicateCommandRegisterException;
import org.enodeframework.common.extensions.SystemClock;
import org.enodeframework.common.scheduling.ScheduleService;
import org.enodeframework.common.scheduling.Worker;
import org.enodeframework.common.serializing.SerializeService;
import org.enodeframework.queue.reply.GenericReplyMessage;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: DefaultCommandResultProcessor.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��n\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\u0018��2\u00020\u0001B%\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t¢\u0006\u0002\u0010\nJ\u0010\u0010\u0018\u001a\u00020\u00192\u0006\u0010\u001a\u001a\u00020\rH\u0002J\u0010\u0010\u001b\u001a\u00020\u00192\u0006\u0010\u001a\u001a\u00020\rH\u0002J\u0010\u0010\u001c\u001a\u00020\u00192\u0006\u0010\u001d\u001a\u00020\u001eH\u0002J\u000e\u0010\u001f\u001a\u00020\u00192\u0006\u0010 \u001a\u00020\rJ\u0010\u0010!\u001a\u00020\u00192\u0006\u0010\"\u001a\u00020\rH\u0016J\u001a\u0010#\u001a\u00020\u00192\u0006\u0010$\u001a\u00020\u00122\b\u0010%\u001a\u0004\u0018\u00010\u0013H\u0002J&\u0010&\u001a\u00020\u00192\u0006\u0010 \u001a\u00020'2\u0006\u0010(\u001a\u00020)2\f\u0010*\u001a\b\u0012\u0004\u0012\u00020\u001e0+H\u0016J\b\u0010,\u001a\u00020\u0012H\u0016J\u0006\u0010-\u001a\u00020\u0019J\u0006\u0010.\u001a\u00020\u0019R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\r0\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0010\u001a\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00130\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0014\u001a\n \u0016*\u0004\u0018\u00010\u00150\u0015X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006/"}, d2 = {"Lorg/enodeframework/queue/command/DefaultCommandResultProcessor;", "Lorg/enodeframework/queue/command/CommandResultProcessor;", "scheduleService", "Lorg/enodeframework/common/scheduling/ScheduleService;", "serializeService", "Lorg/enodeframework/common/serializing/SerializeService;", "commandOptions", "Lorg/enodeframework/commanding/CommandOptions;", "completionSourceTimeout", "", "(Lorg/enodeframework/common/scheduling/ScheduleService;Lorg/enodeframework/common/serializing/SerializeService;Lorg/enodeframework/commanding/CommandOptions;I)V", "commandExecutedMessageLocalQueue", "Ljava/util/concurrent/BlockingQueue;", "Lorg/enodeframework/queue/reply/GenericReplyMessage;", "commandExecutedMessageWorker", "Lorg/enodeframework/common/scheduling/Worker;", "commandTaskDict", "Lcom/google/common/cache/Cache;", "", "Lorg/enodeframework/queue/command/CommandTaskCompletionSource;", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "scanExpireCommandTaskName", "processDomainEventHandledMessage", "", "message", "processExecutedCommandMessage", "processExecutedCommandMessageInternal", "commandResult", "Lorg/enodeframework/commanding/CommandResult;", "processFailedSendingCommand", "command", "processReplyMessage", "replyMessage", "processTimeoutCommand", "commandId", "commandTaskCompletionSource", "registerProcessingCommand", "Lorg/enodeframework/commanding/CommandMessage;", "commandReturnType", "Lorg/enodeframework/commanding/CommandReturnType;", "taskCompletionSource", "Ljava/util/concurrent/CompletableFuture;", "replyAddress", "start", "stop", "enode"})
/* loaded from: input_file:org/enodeframework/queue/command/DefaultCommandResultProcessor.class */
public final class DefaultCommandResultProcessor implements CommandResultProcessor {

    @NotNull
    private final ScheduleService scheduleService;

    @NotNull
    private final SerializeService serializeService;

    @NotNull
    private final CommandOptions commandOptions;
    private final int completionSourceTimeout;

    @NotNull
    private final String scanExpireCommandTaskName;

    @NotNull
    private final Cache<String, CommandTaskCompletionSource> commandTaskDict;

    @NotNull
    private final BlockingQueue<GenericReplyMessage> commandExecutedMessageLocalQueue;

    @NotNull
    private final Worker commandExecutedMessageWorker;
    private final Logger logger;

    public DefaultCommandResultProcessor(@NotNull ScheduleService scheduleService, @NotNull SerializeService serializeService, @NotNull CommandOptions commandOptions, int i) {
        Intrinsics.checkNotNullParameter(scheduleService, "scheduleService");
        Intrinsics.checkNotNullParameter(serializeService, "serializeService");
        Intrinsics.checkNotNullParameter(commandOptions, "commandOptions");
        this.scheduleService = scheduleService;
        this.serializeService = serializeService;
        this.commandOptions = commandOptions;
        this.completionSourceTimeout = i;
        long now = SystemClock.now();
        new Random().nextInt(5000);
        this.scanExpireCommandTaskName = "CleanTimeoutCommandTask_" + now + this;
        this.logger = LoggerFactory.getLogger(DefaultCommandResultProcessor.class);
        CacheBuilder newBuilder = CacheBuilder.newBuilder();
        Function1<RemovalNotification<String, CommandTaskCompletionSource>, Unit> function1 = new Function1<RemovalNotification<String, CommandTaskCompletionSource>, Unit>() { // from class: org.enodeframework.queue.command.DefaultCommandResultProcessor.1
            {
                super(1);
            }

            public final void invoke(RemovalNotification<String, CommandTaskCompletionSource> removalNotification) {
                if (removalNotification.getCause() == RemovalCause.EXPIRED) {
                    DefaultCommandResultProcessor defaultCommandResultProcessor = DefaultCommandResultProcessor.this;
                    Object key = removalNotification.getKey();
                    Intrinsics.checkNotNull(key);
                    defaultCommandResultProcessor.processTimeoutCommand((String) key, (CommandTaskCompletionSource) removalNotification.getValue());
                }
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((RemovalNotification<String, CommandTaskCompletionSource>) obj);
                return Unit.INSTANCE;
            }
        };
        Cache<String, CommandTaskCompletionSource> build = newBuilder.removalListener((v1) -> {
            _init_$lambda$1(r2, v1);
        }).expireAfterWrite(this.completionSourceTimeout, TimeUnit.MILLISECONDS).build();
        Intrinsics.checkNotNullExpressionValue(build, "build(...)");
        this.commandTaskDict = build;
        this.commandExecutedMessageLocalQueue = new LinkedBlockingQueue();
        this.commandExecutedMessageWorker = new Worker("ProcessExecutedCommandMessage", () -> {
            _init_$lambda$2(r4);
        });
    }

    @Override // org.enodeframework.queue.command.CommandResultProcessor
    public void registerProcessingCommand(@NotNull CommandMessage commandMessage, @NotNull CommandReturnType commandReturnType, @NotNull CompletableFuture<CommandResult> completableFuture) {
        Intrinsics.checkNotNullParameter(commandMessage, "command");
        Intrinsics.checkNotNullParameter(commandReturnType, "commandReturnType");
        Intrinsics.checkNotNullParameter(completableFuture, "taskCompletionSource");
        if (this.commandTaskDict.asMap().putIfAbsent(commandMessage.getId(), new CommandTaskCompletionSource(commandMessage.getAggregateRootId(), commandReturnType, completableFuture)) != null) {
            throw new DuplicateCommandRegisterException("Duplicate processing command registration, type:" + commandMessage.getClass().getName() + ", id:" + commandMessage.getId());
        }
    }

    @Override // org.enodeframework.queue.command.CommandResultProcessor
    public void processReplyMessage(@NotNull GenericReplyMessage genericReplyMessage) {
        Intrinsics.checkNotNullParameter(genericReplyMessage, "replyMessage");
        int returnType = genericReplyMessage.getReturnType();
        if (Intrinsics.areEqual(genericReplyMessage.getStatus(), CommandStatus.SendFailed.getValue())) {
            processFailedSendingCommand(genericReplyMessage);
        } else if (returnType == CommandReturnType.CommandExecuted.getValue() || returnType == CommandReturnType.EventHandled.getValue()) {
            this.commandExecutedMessageLocalQueue.add(genericReplyMessage);
        }
    }

    @Override // org.enodeframework.queue.command.CommandResultProcessor
    @NotNull
    public String replyAddress() {
        return this.commandOptions.address();
    }

    public final void start() {
        this.commandExecutedMessageWorker.start();
        this.scheduleService.startTask(this.scanExpireCommandTaskName, () -> {
            start$lambda$0(r2);
        }, this.completionSourceTimeout, this.completionSourceTimeout);
    }

    public final void stop() {
        this.scheduleService.stopTask(this.scanExpireCommandTaskName);
        this.commandExecutedMessageWorker.stop();
    }

    private final void processExecutedCommandMessage(GenericReplyMessage genericReplyMessage) {
        int returnType = genericReplyMessage.getReturnType();
        if (returnType == CommandReturnType.CommandExecuted.getValue()) {
            processExecutedCommandMessageInternal(genericReplyMessage.asCommandResult());
        } else if (returnType == CommandReturnType.EventHandled.getValue()) {
            processDomainEventHandledMessage(genericReplyMessage);
        }
    }

    private final void processExecutedCommandMessageInternal(CommandResult commandResult) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().get(commandResult.getCommandId());
        if (commandTaskCompletionSource == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Command result return, {}, but commandTaskCompletionSource maybe timeout expired.", this.serializeService.serialize(commandResult));
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType() == CommandReturnType.CommandExecuted) {
            this.commandTaskDict.asMap().remove(commandResult.getCommandId());
            if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult) && this.logger.isDebugEnabled()) {
                this.logger.debug("Command result return CommandExecuted, {}", this.serializeService.serialize(commandResult));
                return;
            }
            return;
        }
        if (commandTaskCompletionSource.getCommandReturnType() == CommandReturnType.EventHandled) {
            if (CommandStatus.Failed == commandResult.getStatus() || CommandStatus.NoChange == commandResult.getStatus()) {
                this.commandTaskDict.asMap().remove(commandResult.getCommandId());
                if (commandTaskCompletionSource.getTaskCompletionSource().complete(commandResult) && this.logger.isDebugEnabled()) {
                    this.logger.debug("Command result return EventHandled, {}", this.serializeService.serialize(commandResult));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void processTimeoutCommand(String str, CommandTaskCompletionSource commandTaskCompletionSource) {
        if (commandTaskCompletionSource != null) {
            this.logger.error("Wait command notify timeout, commandId: {}", str);
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, str, commandTaskCompletionSource.getAggregateRootId(), "Wait command notify timeout."));
        }
    }

    public final void processFailedSendingCommand(@NotNull GenericReplyMessage genericReplyMessage) {
        Intrinsics.checkNotNullParameter(genericReplyMessage, "command");
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().remove(genericReplyMessage.getCommandId());
        if (commandTaskCompletionSource != null) {
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Failed, genericReplyMessage.getCommandId(), genericReplyMessage.getAggregateRootId(), "Failed to send the command."));
        }
    }

    private final void processDomainEventHandledMessage(GenericReplyMessage genericReplyMessage) {
        CommandTaskCompletionSource commandTaskCompletionSource = (CommandTaskCompletionSource) this.commandTaskDict.asMap().get(genericReplyMessage.getCommandId());
        if (commandTaskCompletionSource != null) {
            if (CommandReturnType.EventHandled != commandTaskCompletionSource.getCommandReturnType()) {
                this.logger.warn("event arrived early than command: {}", this.serializeService.serialize(genericReplyMessage));
                return;
            }
            this.commandTaskDict.asMap().remove(genericReplyMessage.getCommandId());
            commandTaskCompletionSource.getTaskCompletionSource().complete(new CommandResult(CommandStatus.Success, genericReplyMessage.getCommandId(), genericReplyMessage.getAggregateRootId(), genericReplyMessage.getResult()));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("DomainEvent result return, {}", this.serializeService.serialize(genericReplyMessage));
            }
        }
    }

    private static final void start$lambda$0(DefaultCommandResultProcessor defaultCommandResultProcessor) {
        Intrinsics.checkNotNullParameter(defaultCommandResultProcessor, "this$0");
        defaultCommandResultProcessor.commandTaskDict.cleanUp();
    }

    private static final void _init_$lambda$1(Function1 function1, RemovalNotification removalNotification) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(removalNotification);
    }

    private static final void _init_$lambda$2(DefaultCommandResultProcessor defaultCommandResultProcessor) {
        Intrinsics.checkNotNullParameter(defaultCommandResultProcessor, "this$0");
        GenericReplyMessage take = defaultCommandResultProcessor.commandExecutedMessageLocalQueue.take();
        Intrinsics.checkNotNullExpressionValue(take, "take(...)");
        defaultCommandResultProcessor.processExecutedCommandMessage(take);
    }
}
