package ratpack.rx2;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecController;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.registry.RegistrySpec;
import ratpack.rx2.internal.DefaultSchedulers;
import ratpack.rx2.internal.ErrorHandler;
import ratpack.rx2.internal.ExecControllerBackedScheduler;
import ratpack.rx2.internal.ExecutionBackedObserver;
import ratpack.rx2.internal.ExecutionBackedSubscriber;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;

/* loaded from: input_file:ratpack/rx2/RxRatpack.class */
public abstract class RxRatpack {
    private RxRatpack() {
    }

    public static void initialize() {
        RxJavaPlugins.setErrorHandler(new ErrorHandler());
        RxJavaPlugins.setInitComputationSchedulerHandler(callable -> {
            return DefaultSchedulers.getComputationScheduler();
        });
        RxJavaPlugins.setInitIoSchedulerHandler(callable2 -> {
            return DefaultSchedulers.getIoScheduler();
        });
        RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
            return new ExecutionBackedObserver(observer);
        });
        RxJavaPlugins.setOnFlowableSubscribe((flowable, subscriber) -> {
            return new ExecutionBackedSubscriber(subscriber);
        });
    }

    public static Observable<Void> observe(Operation operation) {
        return Observable.create(observableEmitter -> {
            observableEmitter.getClass();
            Operation onError = operation.onError(observableEmitter::onError);
            observableEmitter.getClass();
            onError.then(observableEmitter::onComplete);
        });
    }

    public static <T> Observable<T> observe(Promise<T> promise) {
        return Observable.create(observableEmitter -> {
            observableEmitter.getClass();
            promise.onError(observableEmitter::onError).then(obj -> {
                observableEmitter.onNext(obj);
                observableEmitter.onComplete();
            });
        });
    }

    public static <T, I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise) {
        return Observable.merge(observe(promise).map(Observable::fromIterable));
    }

    public static <T> Promise<List<T>> promise(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Single list = observable.toList();
            downstream.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            downstream.getClass();
            list.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<List<T>> promise(ObservableOnSubscribe<T> observableOnSubscribe) throws UnmanagedThreadException {
        return promise(Observable.create(observableOnSubscribe));
    }

    public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Maybe singleElement = observable.singleElement();
            downstream.getClass();
            Consumer consumer = downstream::success;
            downstream.getClass();
            singleElement.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<T> promiseSingle(Single<T> single) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            downstream.getClass();
            Consumer consumer = downstream::success;
            downstream.getClass();
            single.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<T> promiseSingle(ObservableOnSubscribe<T> observableOnSubscribe) throws UnmanagedThreadException {
        return promiseSingle(Observable.create(observableOnSubscribe));
    }

    public static <T> TransformablePublisher<T> publisher(Observable<T> observable, BackpressureStrategy backpressureStrategy) {
        return Streams.transformable(observable.toFlowable(backpressureStrategy));
    }

    public static <T> TransformablePublisher<T> publisher(ObservableOnSubscribe<T> observableOnSubscribe, BackpressureStrategy backpressureStrategy) {
        return publisher(Observable.create(observableOnSubscribe), backpressureStrategy);
    }

    public static <T> Observable<T> bindExec(Observable<T> observable) {
        return (Observable) Exceptions.uncheck(() -> {
            return (Observable) promise(observable).to(RxRatpack::observeEach);
        });
    }

    public static <T> Observable<T> fork(Observable<T> observable) {
        return observeEach(promise(observable).fork());
    }

    public static <T> Observable<T> fork(Observable<T> observable, Action<? super RegistrySpec> action) throws Exception {
        return observeEach(promise(observable).fork(execSpec -> {
            execSpec.register(action);
        }));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        return forkEach(observable, (Action<? super RegistrySpec>) Action.noop());
    }

    public static <T> Observable<T> forkEach(Observable<T> observable, Action<? super RegistrySpec> action) {
        return observable.lift(observer -> {
            return new Observer<T>() { // from class: ratpack.rx2.RxRatpack.1
                private final AtomicInteger wip = new AtomicInteger(1);
                private final AtomicBoolean closed = new AtomicBoolean();
                private Disposable disposable;

                public void onSubscribe(Disposable disposable) {
                    this.disposable = disposable;
                    observer.onSubscribe(disposable);
                }

                public void onComplete() {
                    maybeDone();
                }

                public void onError(Throwable th) {
                    Observer observer = observer;
                    terminate(() -> {
                        observer.onError(th);
                    });
                }

                private void maybeDone() {
                    if (this.wip.decrementAndGet() == 0) {
                        Observer observer = observer;
                        observer.getClass();
                        terminate(observer::onComplete);
                    }
                }

                private void terminate(Runnable runnable) {
                    if (this.closed.compareAndSet(false, true)) {
                        runnable.run();
                    }
                }

                public void onNext(T t) {
                    if (this.disposable.isDisposed() || this.closed.get()) {
                        return;
                    }
                    this.wip.incrementAndGet();
                    ExecStarter onError = Execution.fork().register(action).onComplete(execution -> {
                        maybeDone();
                    }).onError(this::onError);
                    Observer observer = observer;
                    onError.start(execution2 -> {
                        if (this.closed.get()) {
                            return;
                        }
                        observer.onNext(t);
                    });
                }
            };
        });
    }

    public static Flowable<Void> flow(Operation operation, BackpressureStrategy backpressureStrategy) {
        return Flowable.create(flowableEmitter -> {
            flowableEmitter.getClass();
            Operation onError = operation.onError(flowableEmitter::onError);
            flowableEmitter.getClass();
            onError.then(flowableEmitter::onComplete);
        }, backpressureStrategy);
    }

    public static <T> Flowable<T> flow(Promise<T> promise, BackpressureStrategy backpressureStrategy) {
        return Flowable.create(flowableEmitter -> {
            flowableEmitter.getClass();
            promise.onError(flowableEmitter::onError).then(obj -> {
                flowableEmitter.onNext(obj);
                flowableEmitter.onComplete();
            });
        }, backpressureStrategy);
    }

    public static <T, I extends Iterable<T>> Flowable<T> flowEach(Promise<I> promise, BackpressureStrategy backpressureStrategy) {
        return Flowable.merge(flow(promise, backpressureStrategy).map(Flowable::fromIterable));
    }

    public static <T> Promise<List<T>> promise(Flowable<T> flowable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Single list = flowable.toList();
            downstream.getClass();
            Consumer consumer = (v1) -> {
                r1.success(v1);
            };
            downstream.getClass();
            list.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<List<T>> promise(FlowableOnSubscribe<T> flowableOnSubscribe, BackpressureStrategy backpressureStrategy) throws UnmanagedThreadException {
        return promise(Flowable.create(flowableOnSubscribe, backpressureStrategy));
    }

    public static <T> Promise<T> promiseSingle(Flowable<T> flowable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Single singleOrError = flowable.singleOrError();
            downstream.getClass();
            Consumer consumer = downstream::success;
            downstream.getClass();
            singleOrError.subscribe(consumer, downstream::error);
        });
    }

    public static <T> Promise<T> promiseSingle(FlowableOnSubscribe<T> flowableOnSubscribe, BackpressureStrategy backpressureStrategy) throws UnmanagedThreadException {
        return promiseSingle(Flowable.create(flowableOnSubscribe, backpressureStrategy));
    }

    public static <T> TransformablePublisher<T> publisher(Flowable<T> flowable) {
        return Streams.transformable(flowable);
    }

    public static <T> TransformablePublisher<T> publisher(FlowableOnSubscribe<T> flowableOnSubscribe, BackpressureStrategy backpressureStrategy) {
        return publisher(Flowable.create(flowableOnSubscribe, backpressureStrategy));
    }

    public static <T> Flowable<T> bindExec(Flowable<T> flowable, BackpressureStrategy backpressureStrategy) {
        return (Flowable) Exceptions.uncheck(() -> {
            return (Flowable) promise(flowable).to(promise -> {
                return flowEach(promise, backpressureStrategy);
            });
        });
    }

    public static <T> Flowable<T> fork(Flowable<T> flowable, BackpressureStrategy backpressureStrategy) {
        return flowEach(promise(flowable).fork(), backpressureStrategy);
    }

    public static <T> Flowable<T> fork(Flowable<T> flowable, BackpressureStrategy backpressureStrategy, Action<? super RegistrySpec> action) throws Exception {
        return flowEach(promise(flowable).fork(execSpec -> {
            execSpec.register(action);
        }), backpressureStrategy);
    }

    public static <T> Flowable<T> forkEach(Flowable<T> flowable) {
        return forkEach(flowable, (Action<? super RegistrySpec>) Action.noop());
    }

    public static <T> Flowable<T> forkEach(Flowable<T> flowable, Action<? super RegistrySpec> action) {
        return flowable.lift(subscriber -> {
            return new FlowableSubscriber<T>() { // from class: ratpack.rx2.RxRatpack.2
                private final AtomicInteger wip = new AtomicInteger(1);
                private final AtomicBoolean closed = new AtomicBoolean();
                private Subscription subscription;

                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    subscription.request(1L);
                    subscriber.onSubscribe(subscription);
                }

                public void onComplete() {
                    maybeDone();
                }

                public void onError(Throwable th) {
                    Subscriber subscriber = subscriber;
                    terminate(() -> {
                        subscriber.onError(th);
                    });
                }

                private void maybeDone() {
                    if (this.wip.decrementAndGet() == 0) {
                        Subscriber subscriber = subscriber;
                        subscriber.getClass();
                        terminate(subscriber::onComplete);
                    }
                }

                private void terminate(Runnable runnable) {
                    if (this.closed.compareAndSet(false, true)) {
                        this.subscription.cancel();
                        runnable.run();
                    }
                }

                public void onNext(T t) {
                    if (this.closed.get()) {
                        return;
                    }
                    this.wip.incrementAndGet();
                    ExecStarter onError = Execution.fork().register(action).onComplete(execution -> {
                        maybeDone();
                    }).onError(this::onError);
                    Subscriber subscriber = subscriber;
                    onError.start(execution2 -> {
                        if (this.closed.get()) {
                            return;
                        }
                        this.subscription.request(1L);
                        subscriber.onNext(t);
                    });
                }
            };
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler scheduler() {
        return scheduler(ExecController.require());
    }
}
