package me.ahoo.wow.kafka;

import java.util.Set;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import me.ahoo.wow.api.messaging.Message;
import me.ahoo.wow.api.modeling.AggregateIdCapable;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.messaging.DistributedMessageBus;
import me.ahoo.wow.messaging.handler.MessageExchange;
import me.ahoo.wow.serialization.JsonSerializerKt;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

/* compiled from: AbstractKafkaBus.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��\u0088\u0001\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\b&\u0018�� 1*\u0018\b��\u0010\u0001*\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0002*\u00020\u0003*\u00020\u0004*\u0012\b\u0001\u0010\u0005*\f\u0012\u0002\b\u0003\u0012\u0004\u0012\u0002H\u00010\u00062\u000e\u0012\u0004\u0012\u0002H\u0001\u0012\u0004\u0012\u0002H\u00050\u0007:\u00011B?\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0012\u0010\n\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u000b\u0012\u0012\u0010\r\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u000e\u0012\b\b\u0002\u0010\u000f\u001a\u00020\u0010¢\u0006\u0002\u0010\u0011J\b\u0010\u001a\u001a\u00020\u001bH\u0016J#\u0010\u001c\u001a\u0004\u0018\u00018��2\u0012\u0010\u001d\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u001eH\u0004¢\u0006\u0002\u0010\u001fJ-\u0010 \u001a\u001a\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f\u0012\n\u0012\b\u0012\u0004\u0012\u00020#0\"0!2\u0006\u0010$\u001a\u00028��H\u0004¢\u0006\u0002\u0010%J\u001c\u0010&\u001a\b\u0012\u0004\u0012\u00028\u00010'2\f\u0010(\u001a\b\u0012\u0004\u0012\u00020\u00040)H\u0016J\u001b\u0010*\u001a\b\u0012\u0004\u0012\u00020#0+2\u0006\u0010$\u001a\u00028��H\u0016¢\u0006\u0002\u0010,J\u0019\u0010-\u001a\u00028\u0001*\u00028��2\u0006\u0010.\u001a\u00020/H&¢\u0006\u0002\u00100R\u0018\u0010\u0012\u001a\b\u0012\u0004\u0012\u00028��0\u0013X¦\u0004¢\u0006\u0006\u001a\u0004\b\u0014\u0010\u0015R\u001a\u0010\r\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R \u0010\u0016\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u0017X\u0084\u0004¢\u0006\b\n��\u001a\u0004\b\u0018\u0010\u0019R\u001a\u0010\n\u001a\u000e\u0012\u0004\u0012\u00020\f\u0012\u0004\u0012\u00020\f0\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��¨\u00062"}, d2 = {"Lme/ahoo/wow/kafka/AbstractKafkaBus;", "M", "Lme/ahoo/wow/api/messaging/Message;", "Lme/ahoo/wow/api/modeling/AggregateIdCapable;", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "E", "Lme/ahoo/wow/messaging/handler/MessageExchange;", "Lme/ahoo/wow/messaging/DistributedMessageBus;", "topicConverter", "Lme/ahoo/wow/kafka/AggregateTopicConverter;", "senderOptions", "Lreactor/kafka/sender/SenderOptions;", "", "receiverOptions", "Lreactor/kafka/receiver/ReceiverOptions;", "receiverOptionsCustomizer", "Lme/ahoo/wow/kafka/ReceiverOptionsCustomizer;", "(Lme/ahoo/wow/kafka/AggregateTopicConverter;Lreactor/kafka/sender/SenderOptions;Lreactor/kafka/receiver/ReceiverOptions;Lme/ahoo/wow/kafka/ReceiverOptionsCustomizer;)V", "messageType", "Ljava/lang/Class;", "getMessageType", "()Ljava/lang/Class;", "sender", "Lreactor/kafka/sender/KafkaSender;", "getSender", "()Lreactor/kafka/sender/KafkaSender;", "close", "", "decode", "receiverRecord", "Lreactor/kafka/receiver/ReceiverRecord;", "(Lreactor/kafka/receiver/ReceiverRecord;)Lme/ahoo/wow/api/messaging/Message;", "encode", "Lreactor/kafka/sender/SenderRecord;", "Lreactor/core/publisher/Sinks$Empty;", "Ljava/lang/Void;", "message", "(Lme/ahoo/wow/api/messaging/Message;)Lreactor/kafka/sender/SenderRecord;", "receive", "Lreactor/core/publisher/Flux;", "namedAggregates", "", "send", "Lreactor/core/publisher/Mono;", "(Lme/ahoo/wow/api/messaging/Message;)Lreactor/core/publisher/Mono;", "toExchange", "receiverOffset", "Lreactor/kafka/receiver/ReceiverOffset;", "(Lme/ahoo/wow/api/messaging/Message;Lreactor/kafka/receiver/ReceiverOffset;)Lme/ahoo/wow/messaging/handler/MessageExchange;", "Companion", "wow-kafka"})
@SourceDebugExtension({"SMAP\nAbstractKafkaBus.kt\nKotlin\n*S Kotlin\n*F\n+ 1 AbstractKafkaBus.kt\nme/ahoo/wow/kafka/AbstractKafkaBus\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,132:1\n1549#2:133\n1620#2,3:134\n*S KotlinDebug\n*F\n+ 1 AbstractKafkaBus.kt\nme/ahoo/wow/kafka/AbstractKafkaBus\n*L\n85#1:133\n85#1:134,3\n*E\n"})
/* loaded from: input_file:me/ahoo/wow/kafka/AbstractKafkaBus.class */
public abstract class AbstractKafkaBus<M extends Message<?, ?> & AggregateIdCapable & NamedAggregate, E extends MessageExchange<?, ? extends M>> implements DistributedMessageBus<M, E> {

    @NotNull
    private final AggregateTopicConverter topicConverter;

    @NotNull
    private final SenderOptions<String, String> senderOptions;

    @NotNull
    private final ReceiverOptions<String, String> receiverOptions;

    @NotNull
    private final ReceiverOptionsCustomizer receiverOptionsCustomizer;

    @NotNull
    private final KafkaSender<String, String> sender;

    @NotNull
    public static final Companion Companion = new Companion(null);
    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaBus.class);

    /* compiled from: AbstractKafkaBus.kt */
    @Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0006"}, d2 = {"Lme/ahoo/wow/kafka/AbstractKafkaBus$Companion;", "", "()V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "wow-kafka"})
    /* loaded from: input_file:me/ahoo/wow/kafka/AbstractKafkaBus$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public AbstractKafkaBus(@NotNull AggregateTopicConverter aggregateTopicConverter, @NotNull SenderOptions<String, String> senderOptions, @NotNull ReceiverOptions<String, String> receiverOptions, @NotNull ReceiverOptionsCustomizer receiverOptionsCustomizer) {
        Intrinsics.checkNotNullParameter(aggregateTopicConverter, "topicConverter");
        Intrinsics.checkNotNullParameter(senderOptions, "senderOptions");
        Intrinsics.checkNotNullParameter(receiverOptions, "receiverOptions");
        Intrinsics.checkNotNullParameter(receiverOptionsCustomizer, "receiverOptionsCustomizer");
        this.topicConverter = aggregateTopicConverter;
        this.senderOptions = senderOptions;
        this.receiverOptions = receiverOptions;
        this.receiverOptionsCustomizer = receiverOptionsCustomizer;
        KafkaSender<String, String> create = KafkaSender.create(this.senderOptions);
        Intrinsics.checkNotNullExpressionValue(create, "create(...)");
        this.sender = create;
    }

    public /* synthetic */ AbstractKafkaBus(AggregateTopicConverter aggregateTopicConverter, SenderOptions senderOptions, ReceiverOptions receiverOptions, ReceiverOptionsCustomizer receiverOptionsCustomizer, int i, DefaultConstructorMarker defaultConstructorMarker) {
        this(aggregateTopicConverter, senderOptions, receiverOptions, (i & 8) != 0 ? NoOpReceiverOptionsCustomizer.INSTANCE : receiverOptionsCustomizer);
    }

    @NotNull
    protected final KafkaSender<String, String> getSender() {
        return this.sender;
    }

    @NotNull
    public abstract Class<M> getMessageType();

    @NotNull
    public Mono<Void> send(@NotNull M m) {
        Intrinsics.checkNotNullParameter(m, "message");
        Mono<Void> defer = Mono.defer(() -> {
            return send$lambda$2(r0, r1);
        });
        Intrinsics.checkNotNullExpressionValue(defer, "defer(...)");
        return defer;
    }

    @NotNull
    public abstract E toExchange(@NotNull M m, @NotNull ReceiverOffset receiverOffset);

    @NotNull
    public Flux<E> receive(@NotNull Set<? extends NamedAggregate> set) {
        Intrinsics.checkNotNullParameter(set, "namedAggregates");
        Flux<E> deferContextual = Flux.deferContextual((v2) -> {
            return receive$lambda$5(r0, r1, v2);
        });
        Intrinsics.checkNotNullExpressionValue(deferContextual, "deferContextual(...)");
        return deferContextual;
    }

    @NotNull
    protected final SenderRecord<String, String, Sinks.Empty<Void>> encode(@NotNull M m) {
        Intrinsics.checkNotNullParameter(m, "message");
        SenderRecord<String, String, Sinks.Empty<Void>> create = SenderRecord.create(new ProducerRecord(this.topicConverter.convert(m), (Integer) null, Long.valueOf(m.getCreateTime()), m.getAggregateId().getId(), JsonSerializerKt.toJsonString(m)), Sinks.empty());
        Intrinsics.checkNotNullExpressionValue(create, "create(...)");
        return create;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public final M decode(@NotNull ReceiverRecord<String, String> receiverRecord) {
        Message message;
        Intrinsics.checkNotNullParameter(receiverRecord, "receiverRecord");
        try {
            Object value = receiverRecord.value();
            Intrinsics.checkNotNullExpressionValue(value, "value(...)");
            message = (Message) JsonSerializerKt.toObject((String) value, getMessageType());
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error("Failed to decode ReceiverRecord[" + receiverRecord + "].", th);
            }
            message = null;
        }
        return message;
    }

    public void close() {
        if (log.isInfoEnabled()) {
            log.info("[" + getClass().getSimpleName() + "] Close KafkaSender.");
        }
        this.sender.close();
    }

    private static final void send$lambda$2$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }

    private static final Publisher send$lambda$2$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Mono send$lambda$2(Message message, AbstractKafkaBus abstractKafkaBus) {
        Intrinsics.checkNotNullParameter(message, "$message");
        Intrinsics.checkNotNullParameter(abstractKafkaBus, "this$0");
        if (log.isDebugEnabled()) {
            log.debug("Send {}.", message);
        }
        message.withReadOnly();
        Flux send = abstractKafkaBus.sender.send(Mono.just(abstractKafkaBus.encode(message)));
        AbstractKafkaBus$send$1$1 abstractKafkaBus$send$1$1 = new Function1<SenderResult<Sinks.Empty<Void>>, Unit>() { // from class: me.ahoo.wow.kafka.AbstractKafkaBus$send$1$1
            public final void invoke(SenderResult<Sinks.Empty<Void>> senderResult) {
                Exception exception = senderResult.exception();
                if (exception != null) {
                    ((Sinks.Empty) senderResult.correlationMetadata()).tryEmitError(exception);
                } else {
                    ((Sinks.Empty) senderResult.correlationMetadata()).tryEmitEmpty();
                }
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((SenderResult<Sinks.Empty<Void>>) obj);
                return Unit.INSTANCE;
            }
        };
        Flux doOnNext = send.doOnNext((v1) -> {
            send$lambda$2$lambda$0(r1, v1);
        });
        AbstractKafkaBus$send$1$2 abstractKafkaBus$send$1$2 = new Function1<SenderResult<Sinks.Empty<Void>>, Publisher<? extends Void>>() { // from class: me.ahoo.wow.kafka.AbstractKafkaBus$send$1$2
            public final Publisher<? extends Void> invoke(SenderResult<Sinks.Empty<Void>> senderResult) {
                return ((Sinks.Empty) senderResult.correlationMetadata()).asMono();
            }
        };
        return doOnNext.flatMap((v1) -> {
            return send$lambda$2$lambda$1(r1, v1);
        }).next();
    }

    private static final MessageExchange receive$lambda$5$lambda$4(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (MessageExchange) function1.invoke(obj);
    }

    /* JADX WARN: Code restructure failed: missing block: B:9:0x00c4, code lost:
    
        if (r0 == null) goto L11;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static final org.reactivestreams.Publisher receive$lambda$5(final me.ahoo.wow.kafka.AbstractKafkaBus r5, java.util.Set r6, reactor.util.context.ContextView r7) {
        /*
            r0 = r5
            java.lang.String r1 = "this$0"
            kotlin.jvm.internal.Intrinsics.checkNotNullParameter(r0, r1)
            r0 = r6
            java.lang.String r1 = "$namedAggregates"
            kotlin.jvm.internal.Intrinsics.checkNotNullParameter(r0, r1)
            r0 = r5
            me.ahoo.wow.kafka.ReceiverOptionsCustomizer r0 = r0.receiverOptionsCustomizer
            r1 = r5
            reactor.kafka.receiver.ReceiverOptions<java.lang.String, java.lang.String> r1 = r1.receiverOptions
            reactor.kafka.receiver.ReceiverOptions r0 = r0.customize(r1)
            java.lang.String r1 = "group.id"
            r2 = r7
            kotlin.jvm.internal.Intrinsics.checkNotNull(r2)
            r2 = r7
            java.lang.String r2 = me.ahoo.wow.messaging.ReceiverGroupKt.getReceiverGroup(r2)
            reactor.kafka.receiver.ReceiverOptions r0 = r0.consumerProperty(r1, r2)
            r1 = r6
            java.lang.Iterable r1 = (java.lang.Iterable) r1
            r10 = r1
            r19 = r0
            r0 = 0
            r11 = r0
            r0 = r10
            r12 = r0
            java.util.ArrayList r0 = new java.util.ArrayList
            r1 = r0
            r2 = r10
            r3 = 10
            int r2 = kotlin.collections.CollectionsKt.collectionSizeOrDefault(r2, r3)
            r1.<init>(r2)
            java.util.Collection r0 = (java.util.Collection) r0
            r13 = r0
            r0 = 0
            r14 = r0
            r0 = r12
            java.util.Iterator r0 = r0.iterator()
            r15 = r0
        L59:
            r0 = r15
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L91
            r0 = r15
            java.lang.Object r0 = r0.next()
            r16 = r0
            r0 = r13
            r1 = r16
            me.ahoo.wow.api.modeling.NamedAggregate r1 = (me.ahoo.wow.api.modeling.NamedAggregate) r1
            r17 = r1
            r20 = r0
            r0 = 0
            r18 = r0
            r0 = r5
            me.ahoo.wow.kafka.AggregateTopicConverter r0 = r0.topicConverter
            r1 = r17
            java.lang.String r0 = r0.convert(r1)
            r1 = r20
            r2 = r0; r0 = r1; r1 = r2; 
            boolean r0 = r0.add(r1)
            goto L59
        L91:
            r0 = r13
            java.util.List r0 = (java.util.List) r0
            r1 = r19
            r2 = r0; r0 = r1; r1 = r2; 
            java.lang.Iterable r1 = (java.lang.Iterable) r1
            java.util.Set r1 = kotlin.collections.CollectionsKt.toSet(r1)
            java.util.Collection r1 = (java.util.Collection) r1
            reactor.kafka.receiver.ReceiverOptions r0 = r0.subscription(r1)
            r9 = r0
            r0 = r9
            java.lang.String r1 = "subscription(...)"
            kotlin.jvm.internal.Intrinsics.checkNotNullExpressionValue(r0, r1)
            r0 = r9
            r8 = r0
            r0 = r7
            me.ahoo.wow.kafka.ReceiverOptionsCustomizer r0 = me.ahoo.wow.kafka.ReceiverOptionsCustomizerKt.getReceiverOptionsCustomizer(r0)
            r1 = r0
            if (r1 == 0) goto Lc7
            r1 = r8
            reactor.kafka.receiver.ReceiverOptions r0 = r0.customize(r1)
            r1 = r0
            if (r1 != 0) goto Lc9
        Lc7:
        Lc8:
            r0 = r8
        Lc9:
            r9 = r0
            r0 = r9
            reactor.kafka.receiver.KafkaReceiver r0 = reactor.kafka.receiver.KafkaReceiver.create(r0)
            int r1 = reactor.util.concurrent.Queues.SMALL_BUFFER_SIZE
            java.lang.Integer r1 = java.lang.Integer.valueOf(r1)
            reactor.core.publisher.Flux r0 = r0.receive(r1)
            reactor.util.retry.RetryBackoffSpec r1 = me.ahoo.wow.kafka.KafkaCommandBusKt.getDEFAULT_RECEIVE_RETRY_SPEC()
            reactor.util.retry.Retry r1 = (reactor.util.retry.Retry) r1
            reactor.core.publisher.Flux r0 = r0.retryWhen(r1)
            me.ahoo.wow.kafka.AbstractKafkaBus$receive$1$1 r1 = new me.ahoo.wow.kafka.AbstractKafkaBus$receive$1$1
            r2 = r1
            r3 = r5
            r2.<init>(r3)
            kotlin.jvm.functions.Function1 r1 = (kotlin.jvm.functions.Function1) r1
            org.reactivestreams.Publisher r1 = (v1) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return receive$lambda$5$lambda$4(r1, v1);
            }
            reactor.core.publisher.Flux r0 = r0.mapNotNull(r1)
            org.reactivestreams.Publisher r0 = (org.reactivestreams.Publisher) r0
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: me.ahoo.wow.kafka.AbstractKafkaBus.receive$lambda$5(me.ahoo.wow.kafka.AbstractKafkaBus, java.util.Set, reactor.util.context.ContextView):org.reactivestreams.Publisher");
    }
}
