package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryUpdate;
import org.axonframework.axonserver.connector.event.util.GrpcExceptionParser;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/axonframework/axonserver/connector/query/subscription/AxonServerSubscriptionQueryResult.class */
public class AxonServerSubscriptionQueryResult<I, U> implements SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResult.class);
    private final Mono<QueryResponseMessage<I>> initialResult;
    private final io.axoniq.axonserver.connector.query.SubscriptionQueryResult result;
    private final Flux<SubscriptionQueryUpdateMessage<U>> updates;

    public AxonServerSubscriptionQueryResult(io.axoniq.axonserver.connector.query.SubscriptionQueryResult subscriptionQueryResult, SubscriptionMessageSerializer subscriptionMessageSerializer) {
        Flux doOnError = Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                for (int i = 0; i < j; i++) {
                    QueryUpdate queryUpdate = (QueryUpdate) subscriptionQueryResult.updates().nextIfAvailable();
                    if (queryUpdate == null) {
                        if (subscriptionQueryResult.updates().isClosed()) {
                            completeFlux(fluxSink, (Throwable) subscriptionQueryResult.updates().getError().orElse(null));
                            return;
                        }
                        return;
                    }
                    fluxSink.next(queryUpdate);
                }
            });
            fluxSink.onDispose(() -> {
                this.logger.debug("Flux was disposed. Will close subscription query");
                subscriptionQueryResult.updates().close();
            });
            subscriptionQueryResult.updates().onAvailable(() -> {
                if (fluxSink.requestedFromDownstream() > 0) {
                    QueryUpdate queryUpdate = (QueryUpdate) subscriptionQueryResult.updates().nextIfAvailable();
                    if (queryUpdate != null) {
                        fluxSink.next(queryUpdate);
                    }
                } else {
                    this.logger.trace("Not sending update to Flux Sink. Not enough info requested");
                }
                if (subscriptionQueryResult.updates().isClosed()) {
                    completeFlux(fluxSink, (Throwable) subscriptionQueryResult.updates().getError().orElse(null));
                }
            });
        }).doOnError(th -> {
            subscriptionQueryResult.updates().close();
        });
        subscriptionMessageSerializer.getClass();
        this.updates = doOnError.map(subscriptionMessageSerializer::deserialize);
        subscriptionQueryResult.getClass();
        Mono onErrorMap = Mono.fromCompletionStage(subscriptionQueryResult::initialResult).onErrorMap(GrpcExceptionParser::parse);
        subscriptionMessageSerializer.getClass();
        this.initialResult = onErrorMap.map(subscriptionMessageSerializer::deserialize);
        this.result = subscriptionQueryResult;
    }

    private void completeFlux(FluxSink<QueryUpdate> fluxSink, Throwable th) {
        if (th != null) {
            fluxSink.error(th);
        } else {
            fluxSink.complete();
        }
    }

    public Mono<QueryResponseMessage<I>> initialResult() {
        return this.initialResult;
    }

    public Flux<SubscriptionQueryUpdateMessage<U>> updates() {
        return this.updates;
    }

    public boolean cancel() {
        this.result.updates().close();
        return true;
    }
}
