package me.ahoo.wow.event;

import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import me.ahoo.wow.api.event.DomainEvent;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.exception.ErrorCodes;
import me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher;
import me.ahoo.wow.messaging.dispatcher.MessageParallelism;
import me.ahoo.wow.messaging.function.MessageFunction;
import me.ahoo.wow.metrics.Metrics;
import me.ahoo.wow.modeling.MaterializedNamedAggregateKt;
import me.ahoo.wow.naming.annotation.PascalToSnakeConverterKt;
import me.ahoo.wow.serialization.MessageRecords;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;

/* compiled from: AggregateEventDispatcher.kt */
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��P\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\f\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\u0018�� %*\f\b��\u0010\u0001*\u0006\u0012\u0002\b\u00030\u00022\b\u0012\u0004\u0012\u00020\u00040\u0003:\u0001%BM\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\b\b\u0002\u0010\u0007\u001a\u00020\b\u0012\b\b\u0002\u0010\t\u001a\u00020\n\u0012\f\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\u00040\f\u0012\f\u0010\r\u001a\b\u0012\u0004\u0012\u00028��0\u000e\u0012\u0006\u0010\u000f\u001a\u00020\u0010\u0012\u0006\u0010\u0011\u001a\u00020\u0012¢\u0006\u0002\u0010\u0013J\"\u0010\u001e\u001a\b\u0012\u0004\u0012\u00020\u001f0\u00022\u0006\u0010 \u001a\u00020\u00042\n\u0010!\u001a\u0006\u0012\u0002\b\u00030\"H\u0002J\u0016\u0010#\u001a\b\u0012\u0004\u0012\u00020\u001f0\u00022\u0006\u0010 \u001a\u00020\u0004H\u0016J\f\u0010$\u001a\u00020\n*\u00020\u0004H\u0016R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00028��0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\u00040\fX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0014\u0010\u0015R\u0014\u0010\u0007\u001a\u00020\bX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0016\u0010\u0017R\u0014\u0010\u0005\u001a\u00020\u0006X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0018\u0010\u0019R\u0014\u0010\t\u001a\u00020\nX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u001a\u0010\u001bR\u0014\u0010\u0011\u001a\u00020\u0012X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u001c\u0010\u001d¨\u0006&"}, d2 = {"Lme/ahoo/wow/event/AggregateEventDispatcher;", "R", "Lreactor/core/publisher/Mono;", "Lme/ahoo/wow/messaging/dispatcher/AggregateMessageDispatcher;", "Lme/ahoo/wow/event/EventStreamExchange;", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", MessageRecords.NAME, ErrorCodes.SUCCEEDED_MESSAGE, "parallelism", ErrorCodes.SUCCEEDED_MESSAGE, "messageFlux", "Lreactor/core/publisher/Flux;", "functionRegistrar", "Lme/ahoo/wow/event/AbstractEventFunctionRegistrar;", "eventHandler", "Lme/ahoo/wow/event/EventHandler;", "scheduler", "Lreactor/core/scheduler/Scheduler;", "(Lme/ahoo/wow/api/modeling/NamedAggregate;Ljava/lang/String;ILreactor/core/publisher/Flux;Lme/ahoo/wow/event/AbstractEventFunctionRegistrar;Lme/ahoo/wow/event/EventHandler;Lreactor/core/scheduler/Scheduler;)V", "getMessageFlux", "()Lreactor/core/publisher/Flux;", "getName", "()Ljava/lang/String;", "getNamedAggregate", "()Lme/ahoo/wow/api/modeling/NamedAggregate;", "getParallelism", "()I", "getScheduler", "()Lreactor/core/scheduler/Scheduler;", "handleEvent", "Ljava/lang/Void;", "exchange", Metrics.EVENT_KEY, "Lme/ahoo/wow/api/event/DomainEvent;", "handleExchange", "asGroupKey", "Companion", "wow-core"})
@SourceDebugExtension({"SMAP\nAggregateEventDispatcher.kt\nKotlin\n*S Kotlin\n*F\n+ 1 AggregateEventDispatcher.kt\nme/ahoo/wow/event/AggregateEventDispatcher\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,92:1\n766#2:93\n857#2,2:94\n*S KotlinDebug\n*F\n+ 1 AggregateEventDispatcher.kt\nme/ahoo/wow/event/AggregateEventDispatcher\n*L\n62#1:93\n62#1:94,2\n*E\n"})
/* loaded from: input_file:me/ahoo/wow/event/AggregateEventDispatcher.class */
public final class AggregateEventDispatcher<R extends Mono<?>> extends AggregateMessageDispatcher<EventStreamExchange> {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private final NamedAggregate namedAggregate;

    @NotNull
    private final String name;
    private final int parallelism;

    @NotNull
    private final Flux<EventStreamExchange> messageFlux;

    @NotNull
    private final AbstractEventFunctionRegistrar<R> functionRegistrar;

    @NotNull
    private final EventHandler eventHandler;

    @NotNull
    private final Scheduler scheduler;

    @NotNull
    private static final Logger log;

    /* compiled from: AggregateEventDispatcher.kt */
    @Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lme/ahoo/wow/event/AggregateEventDispatcher$Companion;", ErrorCodes.SUCCEEDED_MESSAGE, "()V", "log", "Lorg/slf4j/Logger;", "wow-core"})
    /* loaded from: input_file:me/ahoo/wow/event/AggregateEventDispatcher$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public AggregateEventDispatcher(@NotNull NamedAggregate namedAggregate, @NotNull String str, int i, @NotNull Flux<EventStreamExchange> flux, @NotNull AbstractEventFunctionRegistrar<R> abstractEventFunctionRegistrar, @NotNull EventHandler eventHandler, @NotNull Scheduler scheduler) {
        Intrinsics.checkNotNullParameter(namedAggregate, "namedAggregate");
        Intrinsics.checkNotNullParameter(str, MessageRecords.NAME);
        Intrinsics.checkNotNullParameter(flux, "messageFlux");
        Intrinsics.checkNotNullParameter(abstractEventFunctionRegistrar, "functionRegistrar");
        Intrinsics.checkNotNullParameter(eventHandler, "eventHandler");
        Intrinsics.checkNotNullParameter(scheduler, "scheduler");
        this.namedAggregate = namedAggregate;
        this.name = str;
        this.parallelism = i;
        this.messageFlux = flux;
        this.functionRegistrar = abstractEventFunctionRegistrar;
        this.eventHandler = eventHandler;
        this.scheduler = scheduler;
    }

    /* JADX WARN: Illegal instructions before constructor call */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public /* synthetic */ AggregateEventDispatcher(me.ahoo.wow.api.modeling.NamedAggregate r10, java.lang.String r11, int r12, reactor.core.publisher.Flux r13, me.ahoo.wow.event.AbstractEventFunctionRegistrar r14, me.ahoo.wow.event.EventHandler r15, reactor.core.scheduler.Scheduler r16, int r17, kotlin.jvm.internal.DefaultConstructorMarker r18) {
        /*
            r9 = this;
            r0 = r17
            r1 = 2
            r0 = r0 & r1
            if (r0 == 0) goto L31
            java.lang.StringBuilder r0 = new java.lang.StringBuilder
            r1 = r0
            r1.<init>()
            r1 = r10
            java.lang.String r1 = r1.getAggregateName()
            java.lang.StringBuilder r0 = r0.append(r1)
            r1 = 45
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.Class<me.ahoo.wow.event.AggregateEventDispatcher> r1 = me.ahoo.wow.event.AggregateEventDispatcher.class
            kotlin.reflect.KClass r1 = kotlin.jvm.internal.Reflection.getOrCreateKotlinClass(r1)
            java.lang.String r1 = r1.getSimpleName()
            r2 = r1
            kotlin.jvm.internal.Intrinsics.checkNotNull(r2)
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r0 = r0.toString()
            r11 = r0
        L31:
            r0 = r17
            r1 = 4
            r0 = r0 & r1
            if (r0 == 0) goto L3f
            me.ahoo.wow.messaging.dispatcher.MessageParallelism r0 = me.ahoo.wow.messaging.dispatcher.MessageParallelism.INSTANCE
            int r0 = r0.getDEFAULT_PARALLELISM()
            r12 = r0
        L3f:
            r0 = r9
            r1 = r10
            r2 = r11
            r3 = r12
            r4 = r13
            r5 = r14
            r6 = r15
            r7 = r16
            r0.<init>(r1, r2, r3, r4, r5, r6, r7)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: me.ahoo.wow.event.AggregateEventDispatcher.<init>(me.ahoo.wow.api.modeling.NamedAggregate, java.lang.String, int, reactor.core.publisher.Flux, me.ahoo.wow.event.AbstractEventFunctionRegistrar, me.ahoo.wow.event.EventHandler, reactor.core.scheduler.Scheduler, int, kotlin.jvm.internal.DefaultConstructorMarker):void");
    }

    @NotNull
    public NamedAggregate getNamedAggregate() {
        return this.namedAggregate;
    }

    @NotNull
    public String getName() {
        return this.name;
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    public int getParallelism() {
        return this.parallelism;
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    @NotNull
    public Flux<EventStreamExchange> getMessageFlux() {
        return this.messageFlux;
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    @NotNull
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    public int asGroupKey(@NotNull EventStreamExchange eventStreamExchange) {
        Intrinsics.checkNotNullParameter(eventStreamExchange, "<this>");
        return MessageParallelism.INSTANCE.asGroupKey(eventStreamExchange.mo8getMessage(), getParallelism());
    }

    @Override // me.ahoo.wow.messaging.dispatcher.AggregateMessageDispatcher
    @NotNull
    public Mono<Void> handleExchange(@NotNull final EventStreamExchange eventStreamExchange) {
        Intrinsics.checkNotNullParameter(eventStreamExchange, "exchange");
        Flux fromIterable = Flux.fromIterable(eventStreamExchange.mo8getMessage());
        Function1<DomainEvent<?>, Publisher<? extends Void>> function1 = new Function1<DomainEvent<?>, Publisher<? extends Void>>(this) { // from class: me.ahoo.wow.event.AggregateEventDispatcher$handleExchange$1
            final /* synthetic */ AggregateEventDispatcher<R> this$0;

            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
                this.this$0 = this;
            }

            public final Publisher<? extends Void> invoke(DomainEvent<?> domainEvent) {
                Publisher<? extends Void> handleEvent;
                AggregateEventDispatcher<R> aggregateEventDispatcher = this.this$0;
                EventStreamExchange eventStreamExchange2 = eventStreamExchange;
                Intrinsics.checkNotNullExpressionValue(domainEvent, "it");
                handleEvent = aggregateEventDispatcher.handleEvent(eventStreamExchange2, domainEvent);
                return handleEvent;
            }
        };
        Mono<Void> then = fromIterable.concatMap((v1) -> {
            return handleExchange$lambda$0(r1, v1);
        }).doFinally((v1) -> {
            handleExchange$lambda$1(r1, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "override fun handleExcha…            .then()\n    }");
        return then;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Mono<Void> handleEvent(final EventStreamExchange eventStreamExchange, final DomainEvent<?> domainEvent) {
        Class<?> cls = domainEvent.getBody().getClass();
        Set<MessageFunction<Object, DomainEventExchange<?>, R>> functions = this.functionRegistrar.getFunctions(cls);
        ArrayList arrayList = new ArrayList();
        for (Object obj : functions) {
            MessageFunction messageFunction = (MessageFunction) obj;
            if (!messageFunction.getSupportedTopics().contains(MaterializedNamedAggregateKt.materialize(domainEvent.getAggregateId())) ? false : EventCompensatorKt.shouldHandle$default((Message) domainEvent, PascalToSnakeConverterKt.asName(messageFunction.getProcessor().getClass()), false, 2, null)) {
                arrayList.add(obj);
            }
        }
        ArrayList arrayList2 = arrayList;
        if (arrayList2.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("{} eventType[{}] not find any functions.Ignore this event:[{}].", new Object[]{domainEvent.getAggregateId(), cls, domainEvent});
            }
            Mono<Void> empty = Mono.empty();
            Intrinsics.checkNotNullExpressionValue(empty, "empty()");
            return empty;
        }
        Flux fromIterable = Flux.fromIterable(arrayList2);
        Function1 function1 = new Function1<MessageFunction<Object, ? super DomainEventExchange<?>, ? extends R>, Publisher<? extends Void>>() { // from class: me.ahoo.wow.event.AggregateEventDispatcher$handleEvent$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final Publisher<? extends Void> invoke(MessageFunction<Object, ? super DomainEventExchange<?>, ? extends R> messageFunction2) {
                EventHandler eventHandler;
                SimpleDomainEventExchange simpleDomainEventExchange = new SimpleDomainEventExchange(domainEvent, messageFunction2, new ConcurrentHashMap(eventStreamExchange.getAttributes()));
                eventHandler = ((AggregateEventDispatcher) this).eventHandler;
                return eventHandler.handle(simpleDomainEventExchange);
            }
        };
        Mono<Void> then = fromIterable.flatMap((v1) -> {
            return handleEvent$lambda$3(r1, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "private fun handleEvent(…           }.then()\n    }");
        return then;
    }

    private static final Publisher handleExchange$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final void handleExchange$lambda$1(EventStreamExchange eventStreamExchange, SignalType signalType) {
        Intrinsics.checkNotNullParameter(eventStreamExchange, "$exchange");
        eventStreamExchange.acknowledge();
    }

    private static final Publisher handleEvent$lambda$3(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    static {
        Logger logger = LoggerFactory.getLogger(AggregateEventDispatcher.class);
        Intrinsics.checkNotNullExpressionValue(logger, "getLogger(AggregateEventDispatcher::class.java)");
        log = logger;
    }
}
