package ratpack.reactor;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecController;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.reactor.internal.BlockingExecutorBackedScheduler;
import ratpack.reactor.internal.DefaultSchedulers;
import ratpack.reactor.internal.ErrorHandler;
import ratpack.reactor.internal.ExecControllerBackedScheduler;
import ratpack.registry.RegistrySpec;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:ratpack/reactor/ReactorRatpack.class */
public abstract class ReactorRatpack {
    private ReactorRatpack() {
    }

    public static void initialize() {
        Hooks.onOperatorError(new ErrorHandler());
    }

    public static Flux<Void> flux(Operation operation) {
        return Flux.create(fluxSink -> {
            fluxSink.getClass();
            Operation onError = operation.onError(fluxSink::error);
            fluxSink.getClass();
            onError.then(fluxSink::complete);
        });
    }

    public static <T> Flux<T> flux(Promise<T> promise) {
        return Flux.create(fluxSink -> {
            fluxSink.getClass();
            promise.onError(fluxSink::error).then(obj -> {
                fluxSink.next(obj);
                fluxSink.complete();
            });
        });
    }

    public static <T, I extends Iterable<T>> Flux<T> fluxEach(Promise<I> promise) {
        return Flux.merge(flux(promise).map(Flux::fromIterable));
    }

    public static <T> Mono<T> mono(Promise<T> promise) {
        return Mono.create(monoSink -> {
            monoSink.getClass();
            Promise onError = promise.onError(monoSink::error);
            monoSink.getClass();
            onError.then(monoSink::success);
        });
    }

    public static <T> Promise<List<T>> promise(Flux<T> flux) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Mono collectList = flux.collectList();
            downstream.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            downstream.getClass();
            Consumer consumer2 = downstream::error;
            downstream.getClass();
            collectList.subscribe(consumer, consumer2, downstream::complete);
        });
    }

    public static <T> Promise<T> promiseSingle(Mono<T> mono) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            downstream.getClass();
            Consumer consumer = downstream::success;
            downstream.getClass();
            Consumer consumer2 = downstream::error;
            downstream.getClass();
            mono.subscribe(consumer, consumer2, downstream::complete);
        });
    }

    public static <T> TransformablePublisher<T> publisher(Flux<T> flux) {
        return Streams.transformable(flux);
    }

    public static <T> Flux<T> bindExec(Flux<T> flux) {
        return (Flux) Exceptions.uncheck(() -> {
            return (Flux) promise(flux).to(ReactorRatpack::fluxEach);
        });
    }

    public static <T> Flux<T> fork(Flux<T> flux) {
        return fluxEach(promise(flux).fork());
    }

    public static <T> Flux<T> fork(Flux<T> flux, Action<? super RegistrySpec> action) throws Exception {
        return fluxEach(promise(flux).fork(execSpec -> {
            execSpec.register(action);
        }));
    }

    public static <T> Flux<T> forkEach(Flux<T> flux) {
        return forkEach(flux, Action.noop());
    }

    public static <T> Flux<T> forkEach(Flux<T> flux, Action<? super RegistrySpec> action) {
        return flux.transform(Operators.lift((scannable, coreSubscriber) -> {
            return new CoreSubscriber<T>() { // from class: ratpack.reactor.ReactorRatpack.1
                private final AtomicInteger wip = new AtomicInteger(1);
                private final AtomicBoolean closed = new AtomicBoolean();
                private Subscription subscription;

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    subscription.request(1L);
                    coreSubscriber.onSubscribe(subscription);
                }

                public void onComplete() {
                    maybeDone();
                }

                public void onError(Throwable th) {
                    CoreSubscriber coreSubscriber = coreSubscriber;
                    terminate(() -> {
                        coreSubscriber.onError(th);
                    });
                }

                private void maybeDone() {
                    if (this.wip.decrementAndGet() == 0) {
                        CoreSubscriber coreSubscriber = coreSubscriber;
                        coreSubscriber.getClass();
                        terminate(coreSubscriber::onComplete);
                    }
                }

                private void terminate(Runnable runnable) {
                    if (this.closed.compareAndSet(false, true)) {
                        this.subscription.cancel();
                        runnable.run();
                    }
                }

                public void onNext(T t) {
                    if (this.closed.get()) {
                        return;
                    }
                    this.wip.incrementAndGet();
                    ExecStarter onError = Execution.fork().register(action).onComplete(execution -> {
                        maybeDone();
                    }).onError(this::onError);
                    CoreSubscriber coreSubscriber = coreSubscriber;
                    onError.start(execution2 -> {
                        if (this.closed.get()) {
                            return;
                        }
                        this.subscription.request(1L);
                        coreSubscriber.onNext(t);
                    });
                }
            };
        }));
    }

    public static Scheduler computationScheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler computationScheduler() {
        return DefaultSchedulers.getComputationScheduler();
    }

    public static Scheduler ioScheduler(ExecController execController) {
        return new BlockingExecutorBackedScheduler(execController);
    }

    public static Scheduler ioScheduler() {
        return DefaultSchedulers.getIoScheduler();
    }
}
