package me.ahoo.wow.event;

import java.util.ArrayList;
import java.util.Set;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.modeling.AggregateIdCapable;
import me.ahoo.wow.exception.ErrorCodes;
import me.ahoo.wow.messaging.compensation.CompensationMatcher;
import me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher;
import me.ahoo.wow.messaging.dispatcher.MessageParallelism;
import me.ahoo.wow.messaging.function.MessageFunction;
import me.ahoo.wow.messaging.function.MultipleMessageFunctionRegistrar;
import me.ahoo.wow.messaging.handler.ExchangeAck;
import me.ahoo.wow.messaging.handler.MessageExchange;
import me.ahoo.wow.metrics.Metrics;
import me.ahoo.wow.modeling.MaterializedNamedAggregateKt;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* compiled from: AbstractAggregateEventDispatcher.kt */
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��N\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\b\n\u0002\b\u0005\b&\u0018�� \u001f*\u0012\b��\u0010\u0001*\f\u0012\u0002\b\u0003\u0012\u0004\u0012\u00020\u00030\u00022\b\u0012\u0004\u0012\u0002H\u00010\u0004:\u0001\u001fB\u0005¢\u0006\u0002\u0010\u0005J'\u0010\u0012\u001a\b\u0012\u0004\u0012\u00020\u00130\u000f2\u0006\u0010\u0014\u001a\u00028��2\n\u0010\u0015\u001a\u0006\u0012\u0002\b\u00030\u0016H\u0002¢\u0006\u0002\u0010\u0017J\u001b\u0010\u0018\u001a\b\u0012\u0004\u0012\u00020\u00130\u000f2\u0006\u0010\u0014\u001a\u00028��H\u0016¢\u0006\u0002\u0010\u0019J\u0011\u0010\u001a\u001a\u00020\u001b*\u00028��H\u0016¢\u0006\u0002\u0010\u001cJ!\u0010\u001d\u001a\u0006\u0012\u0002\b\u00030\u000e*\u00028��2\n\u0010\u0015\u001a\u0006\u0012\u0002\b\u00030\u0016H&¢\u0006\u0002\u0010\u001eR\u0012\u0010\u0006\u001a\u00020\u0007X¦\u0004¢\u0006\u0006\u001a\u0004\b\b\u0010\tR2\u0010\n\u001a\"\u0012\u001e\u0012\u001c\u0012\u0004\u0012\u00020\r\u0012\b\u0012\u0006\u0012\u0002\b\u00030\u000e\u0012\b\u0012\u0006\u0012\u0002\b\u00030\u000f0\f0\u000bX¦\u0004¢\u0006\u0006\u001a\u0004\b\u0010\u0010\u0011¨\u0006 "}, d2 = {"Lme/ahoo/wow/event/AbstractAggregateEventDispatcher;", "E", "Lme/ahoo/wow/messaging/handler/MessageExchange;", "Lme/ahoo/wow/event/DomainEventStream;", "Lme/ahoo/wow/messaging/dispatcher/AggregateMessageDispatcher;", "()V", "eventHandler", "Lme/ahoo/wow/event/EventHandler;", "getEventHandler", "()Lme/ahoo/wow/event/EventHandler;", "functionRegistrar", "Lme/ahoo/wow/messaging/function/MultipleMessageFunctionRegistrar;", "Lme/ahoo/wow/messaging/function/MessageFunction;", ErrorCodes.SUCCEEDED_MESSAGE, "Lme/ahoo/wow/event/DomainEventExchange;", "Lreactor/core/publisher/Mono;", "getFunctionRegistrar", "()Lme/ahoo/wow/messaging/function/MultipleMessageFunctionRegistrar;", "handleEvent", "Ljava/lang/Void;", "exchange", Metrics.EVENT_KEY, "Lme/ahoo/wow/event/DomainEvent;", "(Lme/ahoo/wow/messaging/handler/MessageExchange;Lme/ahoo/wow/event/DomainEvent;)Lreactor/core/publisher/Mono;", "handleExchange", "(Lme/ahoo/wow/messaging/handler/MessageExchange;)Lreactor/core/publisher/Mono;", "asGroupKey", ErrorCodes.SUCCEEDED_MESSAGE, "(Lme/ahoo/wow/messaging/handler/MessageExchange;)I", "createEventExchange", "(Lme/ahoo/wow/messaging/handler/MessageExchange;Lme/ahoo/wow/event/DomainEvent;)Lme/ahoo/wow/event/DomainEventExchange;", "Companion", "wow-core"})
@SourceDebugExtension({"SMAP\nAbstractAggregateEventDispatcher.kt\nKotlin\n*S Kotlin\n*F\n+ 1 AbstractAggregateEventDispatcher.kt\nme/ahoo/wow/event/AbstractAggregateEventDispatcher\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,81:1\n766#2:82\n857#2,2:83\n*S KotlinDebug\n*F\n+ 1 AbstractAggregateEventDispatcher.kt\nme/ahoo/wow/event/AbstractAggregateEventDispatcher\n*L\n54#1:82\n54#1:83,2\n*E\n"})
/* loaded from: input_file:me/ahoo/wow/event/AbstractAggregateEventDispatcher.class */
public abstract class AbstractAggregateEventDispatcher<E extends MessageExchange<?, ? extends DomainEventStream>> extends AggregateMessageDispatcher<E> {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private static final Logger log;

    /* compiled from: AbstractAggregateEventDispatcher.kt */
    @Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lme/ahoo/wow/event/AbstractAggregateEventDispatcher$Companion;", ErrorCodes.SUCCEEDED_MESSAGE, "()V", "log", "Lorg/slf4j/Logger;", "wow-core"})
    /* loaded from: input_file:me/ahoo/wow/event/AbstractAggregateEventDispatcher$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public abstract MultipleMessageFunctionRegistrar<MessageFunction<Object, DomainEventExchange<?>, Mono<?>>> getFunctionRegistrar();

    @NotNull
    public abstract EventHandler getEventHandler();

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    public int asGroupKey(@NotNull E e) {
        Intrinsics.checkNotNullParameter(e, "<this>");
        return MessageParallelism.INSTANCE.asGroupKey((AggregateIdCapable) e.mo5getMessage(), getParallelism());
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    @NotNull
    public Mono<Void> handleExchange(@NotNull final E e) {
        Intrinsics.checkNotNullParameter(e, "exchange");
        ExchangeAck exchangeAck = ExchangeAck.INSTANCE;
        Flux fromIterable = Flux.fromIterable(e.mo5getMessage());
        Function1<DomainEvent<?>, Publisher<? extends Void>> function1 = new Function1<DomainEvent<?>, Publisher<? extends Void>>() { // from class: me.ahoo.wow.event.AbstractAggregateEventDispatcher$handleExchange$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            /* JADX WARN: Incorrect types in method signature: (Lme/ahoo/wow/event/AbstractAggregateEventDispatcher<TE;>;TE;)V */
            {
                super(1);
            }

            public final Publisher<? extends Void> invoke(DomainEvent<?> domainEvent) {
                Publisher<? extends Void> handleEvent;
                AbstractAggregateEventDispatcher<E> abstractAggregateEventDispatcher = AbstractAggregateEventDispatcher.this;
                MessageExchange messageExchange = e;
                Intrinsics.checkNotNullExpressionValue(domainEvent, "it");
                handleEvent = abstractAggregateEventDispatcher.handleEvent(messageExchange, domainEvent);
                return handleEvent;
            }
        };
        Flux<?> concatMap = fromIterable.concatMap((v1) -> {
            return handleExchange$lambda$0(r2, v1);
        });
        Intrinsics.checkNotNullExpressionValue(concatMap, "override fun handleExcha…inallyAck(exchange)\n    }");
        return exchangeAck.finallyAck(concatMap, e);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Mono<Void> handleEvent(final E e, final DomainEvent<?> domainEvent) {
        Class<?> cls = domainEvent.getBody().getClass();
        Set<MessageFunction<Object, DomainEventExchange<?>, Mono<?>>> functions = getFunctionRegistrar().getFunctions(cls);
        ArrayList arrayList = new ArrayList();
        for (Object obj : functions) {
            MessageFunction messageFunction = (MessageFunction) obj;
            if (!messageFunction.getSupportedTopics().contains(MaterializedNamedAggregateKt.materialize(domainEvent.getAggregateId())) ? false : CompensationMatcher.INSTANCE.match((Message<?, ?>) domainEvent, messageFunction.getProcessorName())) {
                arrayList.add(obj);
            }
        }
        ArrayList arrayList2 = arrayList;
        if (arrayList2.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("{} eventType[{}] not find any functions.Ignore this event:[{}].", new Object[]{domainEvent.getAggregateId(), cls, domainEvent});
            }
            Mono<Void> empty = Mono.empty();
            Intrinsics.checkNotNullExpressionValue(empty, "empty()");
            return empty;
        }
        Flux fromIterable = Flux.fromIterable(arrayList2);
        Function1<MessageFunction<Object, ? super DomainEventExchange<?>, ? extends Mono<?>>, Publisher<? extends Void>> function1 = new Function1<MessageFunction<Object, ? super DomainEventExchange<?>, ? extends Mono<?>>, Publisher<? extends Void>>() { // from class: me.ahoo.wow.event.AbstractAggregateEventDispatcher$handleEvent$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            /* JADX WARN: Incorrect types in method signature: (Lme/ahoo/wow/event/AbstractAggregateEventDispatcher<TE;>;TE;Lme/ahoo/wow/event/DomainEvent<*>;)V */
            {
                super(1);
            }

            public final Publisher<? extends Void> invoke(MessageFunction<Object, ? super DomainEventExchange<?>, ? extends Mono<?>> messageFunction2) {
                DomainEventExchange<?> createEventExchange = AbstractAggregateEventDispatcher.this.createEventExchange(e, domainEvent);
                Intrinsics.checkNotNullExpressionValue(messageFunction2, "function");
                return AbstractAggregateEventDispatcher.this.getEventHandler().handle(createEventExchange.setEventFunction(messageFunction2));
            }
        };
        Mono<Void> then = fromIterable.flatMap((v1) -> {
            return handleEvent$lambda$2(r1, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "private fun handleEvent(…           }.then()\n    }");
        return then;
    }

    @NotNull
    public abstract DomainEventExchange<?> createEventExchange(@NotNull E e, @NotNull DomainEvent<?> domainEvent);

    private static final Publisher handleExchange$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher handleEvent$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    static {
        Logger logger = LoggerFactory.getLogger(AbstractAggregateEventDispatcher.class);
        Intrinsics.checkNotNullExpressionValue(logger, "getLogger(AbstractAggreg…ntDispatcher::class.java)");
        log = logger;
    }
}
