package hu.akarnokd.rxjava3.fibers;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import java.lang.Thread;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:hu/akarnokd/rxjava3/fibers/FlowableTransformFiberScheduler.class */
final class FlowableTransformFiberScheduler<T, R> extends Flowable<R> implements FlowableTransformer<T, R> {
    final Flowable<T> source;
    final FiberTransformer<T, R> transformer;
    final Scheduler scheduler;
    final int prefetch;

    /* loaded from: input_file:hu/akarnokd/rxjava3/fibers/FlowableTransformFiberScheduler$TransformFiberSubscriber.class */
    static abstract class TransformFiberSubscriber<T, R> extends AtomicLong implements FlowableSubscriber<T>, Subscription, FiberEmitter<R>, Callable<Void> {
        private static final long serialVersionUID = -4702456711290258571L;
        Subscriber<? super R> downstream;
        final FiberTransformer<T, R> transformer;
        final int prefetch;
        final AtomicLong requested = new AtomicLong();
        final ResumableFiber producerReady = new ResumableFiber();
        final ResumableFiber consumerReady = new ResumableFiber();
        final SpscArrayQueue<T> queue;
        Subscription upstream;
        volatile boolean done;
        Throwable error;
        volatile boolean cancelled;
        static final Throwable STOP = new Throwable("Downstream cancelled");
        long produced;

        /* JADX INFO: Access modifiers changed from: package-private */
        public TransformFiberSubscriber(Subscriber<? super R> subscriber, FiberTransformer<T, R> fiberTransformer, int i) {
            this.downstream = subscriber;
            this.transformer = fiberTransformer;
            this.prefetch = i;
            this.queue = new SpscArrayQueue<>(i);
        }

        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
            this.downstream.onSubscribe(this);
            subscription.request(this.prefetch);
        }

        public void onNext(T t) {
            this.queue.offer(t);
            if (getAndIncrement() == 0) {
                this.producerReady.resume();
            }
        }

        public void onError(Throwable th) {
            this.error = th;
            onComplete();
        }

        public void onComplete() {
            this.done = true;
            if (getAndIncrement() == 0) {
                this.producerReady.resume();
            }
        }

        @Override // hu.akarnokd.rxjava3.fibers.FiberEmitter
        public void emit(R r) throws Throwable {
            Objects.requireNonNull(r, "item is null");
            long j = this.produced;
            while (this.requested.get() == j && !this.cancelled) {
                this.consumerReady.await();
            }
            if (this.cancelled) {
                throw STOP;
            }
            this.downstream.onNext(r);
            this.produced = j + 1;
        }

        public void request(long j) {
            BackpressureHelper.add(this.requested, j);
            this.consumerReady.resume();
        }

        public void cancel() {
            this.cancelled = true;
            this.producerReady.resume();
            this.consumerReady.resume();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.concurrent.Callable
        public Void call() {
            try {
                try {
                    long j = 0;
                    int i = this.prefetch - (this.prefetch >> 2);
                    long j2 = 0;
                    while (true) {
                        if (this.cancelled) {
                            break;
                        }
                        boolean z = this.done;
                        Object poll = this.queue.poll();
                        boolean z2 = poll == null;
                        if (z && z2) {
                            Throwable th = this.error;
                            if (th != null) {
                                this.downstream.onError(th);
                            } else {
                                this.downstream.onComplete();
                            }
                        } else if (z2) {
                            j2 = addAndGet(-j2);
                            if (j2 == 0) {
                                this.producerReady.await();
                            }
                        } else {
                            long j3 = j + 1;
                            j = j3;
                            if (j3 == i) {
                                j = 0;
                                this.upstream.request(i);
                            }
                            this.transformer.transform(poll, this);
                        }
                    }
                    this.queue.clear();
                    this.downstream = null;
                    cleanup();
                    return null;
                } catch (Throwable th2) {
                    if (th2 != STOP && !this.cancelled) {
                        this.upstream.cancel();
                        this.downstream.onError(th2);
                    }
                    this.queue.clear();
                    this.downstream = null;
                    cleanup();
                    return null;
                }
            } catch (Throwable th3) {
                this.queue.clear();
                this.downstream = null;
                cleanup();
                throw th3;
            }
        }

        protected abstract void cleanup();
    }

    /* loaded from: input_file:hu/akarnokd/rxjava3/fibers/FlowableTransformFiberScheduler$WorkerTransformFiberSubscriber.class */
    static final class WorkerTransformFiberSubscriber<T, R> extends TransformFiberSubscriber<T, R> {
        private static final long serialVersionUID = 6360560993564811498L;
        final Scheduler.Worker worker;
        ExecutorService executor;

        WorkerTransformFiberSubscriber(Subscriber<? super R> subscriber, FiberTransformer<T, R> fiberTransformer, Scheduler.Worker worker, int i, ExecutorService executorService) {
            super(subscriber, fiberTransformer, i);
            this.worker = worker;
            this.executor = executorService;
        }

        @Override // hu.akarnokd.rxjava3.fibers.FlowableTransformFiberScheduler.TransformFiberSubscriber
        protected void cleanup() {
            this.worker.dispose();
            ExecutorService executorService = this.executor;
            this.executor = null;
            if (executorService != null) {
                executorService.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowableTransformFiberScheduler(Flowable<T> flowable, FiberTransformer<T, R> fiberTransformer, Scheduler scheduler, int i) {
        this.source = flowable;
        this.transformer = fiberTransformer;
        this.scheduler = scheduler;
        this.prefetch = i;
    }

    public Publisher<R> apply(Flowable<T> flowable) {
        return new FlowableTransformFiberScheduler(flowable, this.transformer, this.scheduler, this.prefetch);
    }

    protected void subscribeActual(Subscriber<? super R> subscriber) {
        Scheduler.Worker createWorker = this.scheduler.createWorker();
        Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual();
        Objects.requireNonNull(createWorker);
        ExecutorService newThreadExecutor = Executors.newThreadExecutor(ofVirtual.scheduler(createWorker::schedule).factory());
        WorkerTransformFiberSubscriber workerTransformFiberSubscriber = new WorkerTransformFiberSubscriber(subscriber, this.transformer, createWorker, this.prefetch, newThreadExecutor);
        this.source.subscribe(workerTransformFiberSubscriber);
        newThreadExecutor.submit(workerTransformFiberSubscriber);
    }
}
