package discord4j.connect.rsocket.router;

import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RequestQueue;
import discord4j.rest.request.RequestQueueFactory;
import java.time.Duration;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/router/RequestBridgeStream.class */
public class RequestBridgeStream {
    private static final Logger log = Loggers.getLogger(RequestBridgeStream.class);
    private final String id;
    private final RequestQueue<RequestBridge<Void>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler rateLimitScheduler;
    private volatile Duration sleepTime = Duration.ZERO;

    /* loaded from: input_file:discord4j/connect/rsocket/router/RequestBridgeStream$RequestSubscriber.class */
    private class RequestSubscriber extends BaseSubscriber<RequestBridge<Void>> {
        private RequestSubscriber() {
        }

        protected void hookOnSubscribe(Subscription subscription) {
            request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(RequestBridge<Void> requestBridge) {
            String request = requestBridge.getRequest();
            MonoProcessor<Void> acquire = requestBridge.getAcquire();
            MonoProcessor<Void> release = requestBridge.getRelease();
            if (RequestBridgeStream.log.isDebugEnabled()) {
                RequestBridgeStream.log.debug("Accepting request in bucket {}: {}", new Object[]{RequestBridgeStream.this.id, request});
            }
            RequestBridgeStream.this.globalRateLimiter.withLimiter(Mono.defer(() -> {
                return release;
            }).doOnSubscribe(subscription -> {
                acquire.onComplete();
            }).doFinally(this::next)).subscribe((Consumer) null, th -> {
                RequestBridgeStream.log.error("Error while processing {}", new Object[]{request, th});
            });
        }

        private void next(SignalType signalType) {
            Mono.delay(RequestBridgeStream.this.sleepTime, RequestBridgeStream.this.rateLimitScheduler).subscribe(l -> {
                if (RequestBridgeStream.log.isDebugEnabled()) {
                    RequestBridgeStream.log.debug("Ready to consume next request in bucket {} after {}", new Object[]{RequestBridgeStream.this.id, signalType});
                }
                RequestBridgeStream.this.sleepTime = Duration.ZERO;
                request(1L);
            }, th -> {
                RequestBridgeStream.log.error("Error while scheduling next request in bucket {}", new Object[]{RequestBridgeStream.this.id, th});
            });
        }
    }

    public RequestBridgeStream(String str, GlobalRateLimiter globalRateLimiter, Scheduler scheduler, RequestQueueFactory requestQueueFactory) {
        this.id = str;
        this.requestQueue = requestQueueFactory.create();
        this.globalRateLimiter = globalRateLimiter;
        this.rateLimitScheduler = scheduler;
    }

    public void setSleepTime(Duration duration) {
        this.sleepTime = duration;
    }

    public void push(RequestBridge<Void> requestBridge) {
        this.requestQueue.push(requestBridge);
    }

    public void start() {
        this.requestQueue.requests().subscribe(new RequestSubscriber());
    }
}
