package me.ahoo.wow.r2dbc;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
import io.r2dbc.spi.Readable;
import io.r2dbc.spi.Result;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import me.ahoo.wow.api.modeling.AggregateId;
import me.ahoo.wow.api.modeling.NamedAggregate;
import me.ahoo.wow.command.DuplicateRequestIdException;
import me.ahoo.wow.event.DomainEventStream;
import me.ahoo.wow.eventsourcing.AbstractEventStore;
import me.ahoo.wow.eventsourcing.EventVersionConflictException;
import me.ahoo.wow.serialization.JsonSerializer;
import me.ahoo.wow.serialization.JsonSerializerKt;
import me.ahoo.wow.serialization.event.EventStreamRecord;
import me.ahoo.wow.serialization.event.EventStreamRecordKt;
import me.ahoo.wow.serialization.event.FlatEventStreamRecord;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* compiled from: R2dbcEventStore.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��J\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n\u0002\b\u0002\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\u0016\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\b2\u0006\u0010\n\u001a\u00020\u000bH\u0016J&\u0010\f\u001a\b\u0012\u0004\u0012\u00020\u000b0\r2\u0006\u0010\u000e\u001a\u00020\u000f2\u0006\u0010\u0010\u001a\u00020\u00112\u0006\u0010\u0012\u001a\u00020\u0011H\u0016J&\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u000f0\r2\u0006\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0016\u001a\u00020\u00172\u0006\u0010\u0018\u001a\u00020\u0011H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0019"}, d2 = {"Lme/ahoo/wow/r2dbc/R2dbcEventStore;", "Lme/ahoo/wow/eventsourcing/AbstractEventStore;", "database", "Lme/ahoo/wow/r2dbc/Database;", "eventStreamSchema", "Lme/ahoo/wow/r2dbc/EventStreamSchema;", "(Lme/ahoo/wow/r2dbc/Database;Lme/ahoo/wow/r2dbc/EventStreamSchema;)V", "appendStream", "Lreactor/core/publisher/Mono;", "Ljava/lang/Void;", "eventStream", "Lme/ahoo/wow/event/DomainEventStream;", "loadStream", "Lreactor/core/publisher/Flux;", "aggregateId", "Lme/ahoo/wow/api/modeling/AggregateId;", "headVersion", "", "tailVersion", "scanAggregateId", "namedAggregate", "Lme/ahoo/wow/api/modeling/NamedAggregate;", "cursorId", "", "limit", "wow-r2dbc"})
/* loaded from: input_file:me/ahoo/wow/r2dbc/R2dbcEventStore.class */
public final class R2dbcEventStore extends AbstractEventStore {

    @NotNull
    private final Database database;

    @NotNull
    private final EventStreamSchema eventStreamSchema;

    public R2dbcEventStore(@NotNull Database database, @NotNull EventStreamSchema eventStreamSchema) {
        Intrinsics.checkNotNullParameter(database, "database");
        Intrinsics.checkNotNullParameter(eventStreamSchema, "eventStreamSchema");
        this.database = database;
        this.eventStreamSchema = eventStreamSchema;
    }

    @NotNull
    public Mono<Void> appendStream(@NotNull final DomainEventStream domainEventStream) {
        Intrinsics.checkNotNullParameter(domainEventStream, "eventStream");
        Publisher<? extends Connection> createConnection = this.database.createConnection(domainEventStream.getAggregateId());
        Function1 function1 = new Function1() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$appendStream$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final Publisher<? extends Result> invoke(Connection connection) {
                EventStreamSchema eventStreamSchema;
                ObjectNode valueToTree = JsonSerializer.INSTANCE.valueToTree(domainEventStream);
                Intrinsics.checkNotNullExpressionValue(valueToTree, "valueToTree(...)");
                EventStreamRecord eventStreamRecord = EventStreamRecordKt.toEventStreamRecord(valueToTree);
                eventStreamSchema = this.eventStreamSchema;
                return connection.createStatement(eventStreamSchema.append(domainEventStream.getAggregateId())).bind(0, eventStreamRecord.getId()).bind(1, domainEventStream.getAggregateId().getId()).bind(2, domainEventStream.getAggregateId().getTenantId()).bind(3, eventStreamRecord.getRequestId()).bind(4, eventStreamRecord.getCommandId()).bind(5, Integer.valueOf(eventStreamRecord.getVersion())).bind(6, JsonSerializerKt.toJsonString(eventStreamRecord.getHeader())).bind(7, JsonSerializerKt.toJsonString(eventStreamRecord.getBody())).bind(8, Integer.valueOf(domainEventStream.getSize())).bind(9, Long.valueOf(eventStreamRecord.getCreateTime())).execute();
            }
        };
        Function function = (v1) -> {
            return appendStream$lambda$0(r1, v1);
        };
        R2dbcEventStore$appendStream$2 r2dbcEventStore$appendStream$2 = R2dbcEventStore$appendStream$2.INSTANCE;
        Flux usingWhen = Flux.usingWhen(createConnection, function, (v1) -> {
            return appendStream$lambda$1(r2, v1);
        });
        R2dbcEventStore$appendStream$3 r2dbcEventStore$appendStream$3 = new Function1<Result, Publisher<? extends Long>>() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$appendStream$3
            public final Publisher<? extends Long> invoke(Result result) {
                return result.getRowsUpdated();
            }
        };
        Flux flatMap = usingWhen.flatMap((v1) -> {
            return appendStream$lambda$2(r1, v1);
        });
        Function1<R2dbcDataIntegrityViolationException, Throwable> function12 = new Function1<R2dbcDataIntegrityViolationException, Throwable>() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$appendStream$4
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final Throwable invoke(R2dbcDataIntegrityViolationException r2dbcDataIntegrityViolationException) {
                EventStreamSchema eventStreamSchema;
                EventStreamSchema eventStreamSchema2;
                String message = r2dbcDataIntegrityViolationException.getMessage();
                Intrinsics.checkNotNull(message);
                eventStreamSchema = R2dbcEventStore.this.eventStreamSchema;
                if (StringsKt.contains$default(message, eventStreamSchema.getAggregateIdVersionUniqueIndexName(), false, 2, (Object) null)) {
                    return new EventVersionConflictException(domainEventStream, (Throwable) r2dbcDataIntegrityViolationException);
                }
                String message2 = r2dbcDataIntegrityViolationException.getMessage();
                Intrinsics.checkNotNull(message2);
                eventStreamSchema2 = R2dbcEventStore.this.eventStreamSchema;
                return StringsKt.contains$default(message2, eventStreamSchema2.getRequestIdUniqueIndexName(), false, 2, (Object) null) ? new DuplicateRequestIdException(domainEventStream.getAggregateId(), domainEventStream.getRequestId(), (Throwable) r2dbcDataIntegrityViolationException) : (Throwable) r2dbcDataIntegrityViolationException;
            }
        };
        Mono<Void> then = flatMap.onErrorMap(R2dbcDataIntegrityViolationException.class, (v1) -> {
            return appendStream$lambda$3(r2, v1);
        }).then();
        Intrinsics.checkNotNullExpressionValue(then, "then(...)");
        return then;
    }

    @NotNull
    public Flux<DomainEventStream> loadStream(@NotNull final AggregateId aggregateId, final int i, final int i2) {
        Intrinsics.checkNotNullParameter(aggregateId, "aggregateId");
        Publisher<? extends Connection> createConnection = this.database.createConnection(aggregateId);
        Function1 function1 = new Function1() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$loadStream$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final Publisher<? extends Result> invoke(Connection connection) {
                EventStreamSchema eventStreamSchema;
                eventStreamSchema = R2dbcEventStore.this.eventStreamSchema;
                return connection.createStatement(eventStreamSchema.load(aggregateId)).bind(0, aggregateId.getId()).bind(1, Integer.valueOf(i)).bind(2, Integer.valueOf(i2)).execute();
            }
        };
        Function function = (v1) -> {
            return loadStream$lambda$4(r1, v1);
        };
        R2dbcEventStore$loadStream$2 r2dbcEventStore$loadStream$2 = R2dbcEventStore$loadStream$2.INSTANCE;
        Flux usingWhen = Flux.usingWhen(createConnection, function, (v1) -> {
            return loadStream$lambda$5(r2, v1);
        });
        Function1<Result, Publisher<? extends DomainEventStream>> function12 = new Function1<Result, Publisher<? extends DomainEventStream>>() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$loadStream$3
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            public final Publisher<? extends DomainEventStream> invoke(Result result) {
                final AggregateId aggregateId2 = aggregateId;
                Function1<Readable, DomainEventStream> function13 = new Function1<Readable, DomainEventStream>() { // from class: me.ahoo.wow.r2dbc.R2dbcEventStore$loadStream$3.1
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(1);
                    }

                    public final DomainEventStream invoke(Readable readable) {
                        if (!Intrinsics.areEqual(aggregateId2.getId(), (String) readable.get("aggregate_id", String.class))) {
                            throw new IllegalArgumentException("Failed requirement.".toString());
                        }
                        Object obj = readable.get("id", String.class);
                        if (obj == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj, "checkNotNull(...)");
                        String str = (String) obj;
                        Object obj2 = readable.get("request_id", String.class);
                        if (obj2 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj2, "checkNotNull(...)");
                        String str2 = (String) obj2;
                        Object obj3 = readable.get("tenant_id", String.class);
                        if (obj3 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj3, "checkNotNull(...)");
                        String str3 = (String) obj3;
                        boolean areEqual = Intrinsics.areEqual(str3, aggregateId2.getTenantId());
                        AggregateId aggregateId3 = aggregateId2;
                        if (!areEqual) {
                            throw new IllegalArgumentException(("The aggregated tenantId[" + aggregateId3.getTenantId() + "] does not match the tenantId:[" + str3 + "] stored in the eventStore").toString());
                        }
                        Object obj4 = readable.get("command_id", String.class);
                        if (obj4 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj4, "checkNotNull(...)");
                        String str4 = (String) obj4;
                        Object obj5 = readable.get("version", Integer.TYPE);
                        if (obj5 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj5, "checkNotNull(...)");
                        int intValue = ((Number) obj5).intValue();
                        Object obj6 = readable.get("header", String.class);
                        if (obj6 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj6, "checkNotNull(...)");
                        String str5 = (String) obj6;
                        Object obj7 = readable.get("body", String.class);
                        if (obj7 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj7, "checkNotNull(...)");
                        String str6 = (String) obj7;
                        Object obj8 = readable.get("create_time", Long.TYPE);
                        if (obj8 == null) {
                            throw new IllegalStateException("Required value was null.".toString());
                        }
                        Intrinsics.checkNotNullExpressionValue(obj8, "checkNotNull(...)");
                        long longValue = ((Number) obj8).longValue();
                        ObjectNode jsonNode = JsonSerializerKt.toJsonNode(str5);
                        Intrinsics.checkNotNull(jsonNode, "null cannot be cast to non-null type com.fasterxml.jackson.databind.node.ObjectNode");
                        return new FlatEventStreamRecord(str, aggregateId2, jsonNode, intValue, str4, str2, JsonSerializerKt.toJsonNode(str6), longValue).toDomainEventStream();
                    }
                };
                return result.map((v1) -> {
                    return invoke$lambda$0(r1, v1);
                });
            }

            private static final DomainEventStream invoke$lambda$0(Function1 function13, Object obj) {
                Intrinsics.checkNotNullParameter(function13, "$tmp0");
                return (DomainEventStream) function13.invoke(obj);
            }
        };
        Flux<DomainEventStream> flatMap = usingWhen.flatMap((v1) -> {
            return loadStream$lambda$6(r1, v1);
        });
        Intrinsics.checkNotNullExpressionValue(flatMap, "flatMap(...)");
        return flatMap;
    }

    @NotNull
    public Flux<AggregateId> scanAggregateId(@NotNull NamedAggregate namedAggregate, @NotNull String str, int i) {
        Intrinsics.checkNotNullParameter(namedAggregate, "namedAggregate");
        Intrinsics.checkNotNullParameter(str, "cursorId");
        throw new UnsupportedOperationException();
    }

    private static final Publisher appendStream$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher appendStream$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher appendStream$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Throwable appendStream$lambda$3(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Throwable) function1.invoke(obj);
    }

    private static final Publisher loadStream$lambda$4(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher loadStream$lambda$5(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher loadStream$lambda$6(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }
}
