package org.aktivecortex.core.commandhandling;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.aktivecortex.api.command.Command;
import org.aktivecortex.api.commandhandling.AsyncCommandBus;
import org.aktivecortex.api.message.Message;
import org.aktivecortex.api.message.MessageFactory;
import org.aktivecortex.core.commandhandling.interceptors.AfterReceiveMessageHandlerInterceptor;
import org.aktivecortex.core.commandhandling.interceptors.BeforeSendMessageHandlerInterceptor;
import org.aktivecortex.core.commandhandling.interceptors.DefaultAfterReceiveInterceptorChain;
import org.aktivecortex.core.commandhandling.interceptors.DefaultBeforeSendInterceptorChain;
import org.aktivecortex.core.commandhandling.interceptors.afterreceive.TaskProgressInterceptor;
import org.aktivecortex.core.message.CommandBatch;
import org.aktivecortex.core.message.CommandBatchMessage;
import org.aktivecortex.core.message.DefaultCommandBatch;
import org.aktivecortex.core.message.channel.MessageChannel;
import org.aktivecortex.core.message.channel.MessageHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.RollbackConfiguration;
import org.axonframework.commandhandling.RollbackOnAllExceptionsConfiguration;
import org.axonframework.unitofwork.DefaultUnitOfWorkFactory;
import org.axonframework.unitofwork.UnitOfWork;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.axonframework.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional;

/* loaded from: input_file:org/aktivecortex/core/commandhandling/DistributedCommandBus.class */
public class DistributedCommandBus implements AsyncCommandBus, CommandBus, MessageHandler<Command>, InitializingBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributedCommandBus.class);
    private MessageChannel<Command> channel;
    private MessageFactory<Command> messageFactory;
    private volatile Iterable<? extends AfterReceiveMessageHandlerInterceptor<Command>> afterReceiveInterceptors = Collections.emptyList();
    private volatile Iterable<? extends AfterReceiveMessageHandlerInterceptor<Command>> strippedAfterReceiveInterceptors = Collections.emptyList();
    private volatile Iterable<? extends BeforeSendMessageHandlerInterceptor<Command>> beforeSendInterceptors = Collections.emptyList();
    private final ConcurrentMap<Class<?>, CommandHandler<?>> subscriptions = new ConcurrentHashMap();
    private UnitOfWorkFactory unitOfWorkFactory = new DefaultUnitOfWorkFactory();
    private RollbackConfiguration rollbackConfiguration = new RollbackOnAllExceptionsConfiguration();

    public void setChannel(MessageChannel<Command> messageChannel) {
        this.channel = messageChannel;
    }

    public void setMessageFactory(MessageFactory<Command> messageFactory) {
        this.messageFactory = messageFactory;
    }

    public void setAfterReceiveInterceptors(List<? extends AfterReceiveMessageHandlerInterceptor<Command>> list) {
        this.afterReceiveInterceptors = list;
        this.strippedAfterReceiveInterceptors = Iterables.filter(list, Predicates.not(new Predicate<AfterReceiveMessageHandlerInterceptor<Command>>() { // from class: org.aktivecortex.core.commandhandling.DistributedCommandBus.1
            public boolean apply(AfterReceiveMessageHandlerInterceptor<Command> afterReceiveMessageHandlerInterceptor) {
                return afterReceiveMessageHandlerInterceptor.getClass().isAssignableFrom(TaskProgressInterceptor.class);
            }
        }));
    }

    public void setBeforeSendInterceptors(Iterable<? extends BeforeSendMessageHandlerInterceptor<Command>> iterable) {
        this.beforeSendInterceptors = iterable;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.channel, "channel property not set");
        Assert.notNull(this.messageFactory, "messageFactory property not set");
        this.channel.subscribe(this);
    }

    public void dispatch(Object obj) {
        Message<Command> message = (Message) obj;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("processing command {}", message.getPayload());
        }
        inProcDispatch(message, this.afterReceiveInterceptors);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("processed");
        }
    }

    public <R> void dispatch(Object obj, CommandCallback<R> commandCallback) {
        inProcDispatch((Message) obj, commandCallback, this.afterReceiveInterceptors);
    }

    protected void inProcDispatch(Message<Command> message, Iterable<? extends AfterReceiveMessageHandlerInterceptor<Command>> iterable) {
        try {
            doInProcDispatch(message, findCommandHandlerFor(message.getPayload()), iterable);
        } catch (Exception e) {
            LOGGER.error(String.format("Processing of a [%s] resulted in an exception: ", message.getPayload().getClass().getSimpleName()), e);
            throw new RuntimeException(e);
        }
    }

    protected <R> void inProcDispatch(Message<Command> message, CommandCallback<R> commandCallback, Iterable<? extends AfterReceiveMessageHandlerInterceptor<Command>> iterable) {
        try {
            commandCallback.onSuccess(doInProcDispatch(message, findCommandHandlerFor(message.getPayload()), iterable));
        } catch (Exception e) {
            commandCallback.onFailure(e);
            throw new RuntimeException(e);
        }
    }

    private CommandHandler findCommandHandlerFor(Command command) {
        CommandHandler<?> commandHandler = this.subscriptions.get(command.getClass());
        if (commandHandler == null) {
            throw new NoHandlerForCommandException(String.format("No handler was subscribed to commands of type [%s]", command.getClass().getSimpleName()));
        }
        return commandHandler;
    }

    private Object doInProcDispatch(Message<Command> message, CommandHandler commandHandler, Iterable<? extends AfterReceiveMessageHandlerInterceptor<Command>> iterable) throws Exception {
        UnitOfWork createUnitOfWork = this.unitOfWorkFactory.createUnitOfWork();
        try {
            Object proceed = new DefaultAfterReceiveInterceptorChain(message, createUnitOfWork, commandHandler, iterable).proceed();
            createUnitOfWork.commit();
            return proceed;
        } catch (Exception e) {
            if (this.rollbackConfiguration.rollBackOn(e)) {
                createUnitOfWork.rollback(e);
            } else {
                createUnitOfWork.commit();
            }
            throw e;
        }
    }

    public <T> void subscribe(Class<T> cls, CommandHandler<? super T> commandHandler) {
        this.subscriptions.put(cls, commandHandler);
    }

    public <T> void unsubscribe(Class<T> cls, CommandHandler<? super T> commandHandler) {
        this.subscriptions.remove(cls, commandHandler);
    }

    public void setSubscriptions(Map<?, ?> map) {
        for (Map.Entry<?, ?> entry : map.entrySet()) {
            subscribe((Class) entry.getKey(), (CommandHandler) entry.getValue());
        }
    }

    public void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) {
        this.unitOfWorkFactory = unitOfWorkFactory;
    }

    public void setRollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
        this.rollbackConfiguration = rollbackConfiguration;
    }

    @Override // org.aktivecortex.api.commandhandling.AsyncCommandBus
    @Transactional
    public void dispatchAsync(Command... commandArr) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("sending command(s): {}", Arrays.asList(commandArr));
        }
        new DefaultBeforeSendInterceptorChain(this.beforeSendInterceptors, this.channel, this.messageFactory, null, commandArr).proceed();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("command(s) sent");
        }
    }

    @Override // org.aktivecortex.api.commandhandling.AsyncCommandBus
    @Transactional
    public void dispatchAsync(Map<String, String> map, Command... commandArr) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("sending command(s): {}", Arrays.asList(commandArr));
        }
        new DefaultBeforeSendInterceptorChain(this.beforeSendInterceptors, this.channel, this.messageFactory, map, commandArr).proceed();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("command(s) sent");
        }
    }

    @Override // org.aktivecortex.core.message.channel.MessageHandler
    public void handleMessage(Message<Command> message) {
        if (message.getPayload() == null) {
            LOGGER.warn("Wrong type received: " + message.getClass().getName());
            throw new IllegalArgumentException(String.format("The payload of incoming messages must be of type %s or type %s", Command.class.getName(), DefaultCommandBatch.class.getName()));
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("received command: {}", message.getPayload());
        }
        routePayload(message);
    }

    protected void routePayload(Message<Command> message) {
        if (!(message.getPayload() instanceof CommandBatch)) {
            dispatch(message);
            return;
        }
        UnitOfWork createUnitOfWork = this.unitOfWorkFactory.createUnitOfWork();
        try {
            List<Message<Command>> nestedMessages = ((CommandBatchMessage) message).getNestedMessages();
            int size = nestedMessages.size();
            int i = size - 1;
            for (int i2 = 0; i2 < size; i2++) {
                Message<Command> message2 = nestedMessages.get(i2);
                if (i2 == i) {
                    inProcDispatch(message2, this.afterReceiveInterceptors);
                } else {
                    inProcDispatch(message2, this.strippedAfterReceiveInterceptors);
                }
            }
            createUnitOfWork.commit();
        } catch (Exception e) {
            if (this.rollbackConfiguration.rollBackOn(e)) {
                createUnitOfWork.rollback(e);
            } else {
                createUnitOfWork.commit();
            }
            throw new RuntimeException(e);
        }
    }
}
