package reactor.rx.action.control;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;

/* loaded from: input_file:reactor/rx/action/control/ThrottleRequestWhenAction.class */
public class ThrottleRequestWhenAction<T> extends Action<T, T> {
    private final Broadcaster<Long> throttleStream;

    /* loaded from: input_file:reactor/rx/action/control/ThrottleRequestWhenAction$ThrottleSubscriber.class */
    private class ThrottleSubscriber implements Subscriber<Long>, NonBlocking {
        Subscription s;

        private ThrottleSubscriber() {
        }

        @Override // reactor.core.support.NonBlocking
        public boolean isReactivePull(Dispatcher dispatcher, long j) {
            return false;
        }

        @Override // reactor.core.support.NonBlocking
        public long getCapacity() {
            return ThrottleRequestWhenAction.this.capacity;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.s = subscription;
            subscription.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Long l) {
            if (l.longValue() > 0) {
                ThrottleRequestWhenAction.this.doRequest(l.longValue());
            }
            this.s.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.s.cancel();
            ThrottleRequestWhenAction.this.doError(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.s.cancel();
            ThrottleRequestWhenAction.this.doComplete();
        }
    }

    public ThrottleRequestWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Long>, ? extends Publisher<? extends Long>> function) {
        this.throttleStream = Broadcaster.create(null, dispatcher);
        function.apply(this.throttleStream).subscribe(new ThrottleSubscriber());
    }

    @Override // reactor.rx.action.Action, reactor.rx.action.Control
    public void requestMore(long j) {
        this.throttleStream.onNext(Long.valueOf(j));
    }

    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        broadcastNext(t);
    }

    @Override // reactor.rx.action.Action, org.reactivestreams.Subscriber
    public void onComplete() {
        try {
            this.throttleStream.onComplete();
            doShutdown();
        } catch (Exception e) {
            doError(e);
        }
    }

    @Override // reactor.rx.Stream, reactor.core.support.NonBlocking
    public boolean isReactivePull(Dispatcher dispatcher, long j) {
        return true;
    }

    protected void doRequest(long j) {
        this.throttleStream.getDispatcher().dispatch(Long.valueOf(j), new Consumer<Long>() { // from class: reactor.rx.action.control.ThrottleRequestWhenAction.1
            @Override // reactor.fn.Consumer
            public void accept(Long l) {
                if (ThrottleRequestWhenAction.this.upstreamSubscription != null) {
                    ThrottleRequestWhenAction.this.upstreamSubscription.request(l.longValue());
                }
            }
        }, null);
    }
}
