package software.amazon.awssdk.transfer.s3.internal;

import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.utils.Logger;

@SdkInternalApi
/* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter.class */
public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream {
    static final long DEFAULT_REQUEST_SIZE = 8;
    private static final Logger LOG = Logger.loggerFor(S3CrtRequestBodyStreamAdapter.class);
    private final Publisher<ByteBuffer> bodyPublisher;
    private volatile Subscription subscription;
    private final AtomicReference<SubscriptionStatus> subscriptionStatus = new AtomicReference<>(SubscriptionStatus.NOT_SUBSCRIBED);
    private final BlockingQueue<Subscription> subscriptionQueue = new LinkedBlockingQueue(1);
    private final BlockingDeque<Event> eventBuffer = new LinkedBlockingDeque();
    private long pending = 0;
    private Subscriber<? super ByteBuffer> subscriber = createSubscriber();

    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$CompleteEvent.class */
    private static final class CompleteEvent implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;

        CompleteEvent(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public EventType type() {
            return EventType.COMPLETE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$DataEvent.class */
    public static final class DataEvent implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;
        private final ByteBuffer data;

        DataEvent(Subscriber<? super ByteBuffer> subscriber, ByteBuffer byteBuffer) {
            this.subscriber = subscriber;
            this.data = byteBuffer;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public EventType type() {
            return EventType.DATA;
        }

        public ByteBuffer data() {
            return this.data;
        }
    }

    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$ErrorEvent.class */
    private static final class ErrorEvent implements Event {
        private final Subscriber<? super ByteBuffer> subscriber;
        private final Throwable error;

        ErrorEvent(Subscriber<? super ByteBuffer> subscriber, Throwable th) {
            this.subscriber = subscriber;
            this.error = th;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public Subscriber<? super ByteBuffer> subscriber() {
            return this.subscriber;
        }

        @Override // software.amazon.awssdk.transfer.s3.internal.S3CrtRequestBodyStreamAdapter.Event
        public EventType type() {
            return EventType.ERROR;
        }

        public Throwable error() {
            return this.error;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$Event.class */
    public interface Event {
        Subscriber<? super ByteBuffer> subscriber();

        EventType type();
    }

    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$EventType.class */
    private enum EventType {
        DATA,
        COMPLETE,
        ERROR
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$SubscriberImpl.class */
    public static class SubscriberImpl implements Subscriber<ByteBuffer> {
        private final Consumer<Subscription> subscriptionSetter;
        private final Deque<Event> eventBuffer;
        private boolean subscribed = false;

        SubscriberImpl(Consumer<Subscription> consumer, Deque<Event> deque) {
            this.subscriptionSetter = consumer;
            this.eventBuffer = deque;
        }

        public void onSubscribe(Subscription subscription) {
            if (subscription == null) {
                throw new NullPointerException("Subscription must not be null");
            }
            if (this.subscribed) {
                subscription.cancel();
            } else {
                this.subscriptionSetter.accept(subscription);
                this.subscribed = true;
            }
        }

        public void onNext(ByteBuffer byteBuffer) {
            if (byteBuffer == null) {
                throw new NullPointerException("byteBuffer must not be null");
            }
            S3CrtRequestBodyStreamAdapter.LOG.trace(() -> {
                return "Received new data of size: " + byteBuffer.remaining();
            });
            this.eventBuffer.add(new DataEvent(this, byteBuffer));
        }

        public void onError(Throwable th) {
            this.eventBuffer.add(new ErrorEvent(this, th));
        }

        public void onComplete() {
            this.eventBuffer.add(new CompleteEvent(this));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/S3CrtRequestBodyStreamAdapter$SubscriptionStatus.class */
    public enum SubscriptionStatus {
        NOT_SUBSCRIBED,
        SUBSCRIBING,
        SUBSCRIBED,
        TIMED_OUT
    }

    public S3CrtRequestBodyStreamAdapter(Publisher<ByteBuffer> publisher) {
        this.bodyPublisher = publisher;
    }

    public boolean sendRequestBody(ByteBuffer byteBuffer) {
        LOG.trace(() -> {
            return "Getting data to fill buffer of size " + byteBuffer.remaining();
        });
        waitForSubscription();
        while (byteBuffer.hasRemaining()) {
            if (this.eventBuffer.isEmpty() && this.pending == 0) {
                this.pending = DEFAULT_REQUEST_SIZE;
                this.subscription.request(this.pending);
            }
            Event takeFirstEvent = takeFirstEvent();
            if (takeFirstEvent.subscriber().equals(this.subscriber)) {
                switch (takeFirstEvent.type()) {
                    case DATA:
                        ByteBuffer data = ((DataEvent) takeFirstEvent).data();
                        ByteBuffer duplicate = data.duplicate();
                        int min = Math.min(byteBuffer.remaining(), data.remaining());
                        if (duplicate.remaining() > min) {
                            duplicate.limit(duplicate.position() + min);
                        }
                        byteBuffer.put(duplicate);
                        data.position(duplicate.limit());
                        if (!data.hasRemaining()) {
                            this.pending--;
                            break;
                        } else {
                            this.eventBuffer.push(takeFirstEvent);
                            break;
                        }
                    case COMPLETE:
                        this.eventBuffer.push(takeFirstEvent);
                        this.pending = 0L;
                        return true;
                    case ERROR:
                        this.eventBuffer.push(takeFirstEvent);
                        Throwable error = ((ErrorEvent) takeFirstEvent).error();
                        if (error instanceof RuntimeException) {
                            throw ((RuntimeException) error);
                        }
                        throw new RuntimeException(error);
                    default:
                        throw new IllegalStateException("Unknown event type: " + takeFirstEvent.type());
                }
            } else {
                LOG.debug(() -> {
                    return "Received an event for a previous publisher. Discarding. Event was: " + takeFirstEvent;
                });
            }
        }
        return false;
    }

    public boolean resetPosition() {
        this.subscription.cancel();
        this.subscription = null;
        this.subscriber = createSubscriber();
        this.subscriptionStatus.set(SubscriptionStatus.NOT_SUBSCRIBED);
        this.eventBuffer.clear();
        this.pending = 0L;
        return true;
    }

    private Event takeFirstEvent() {
        try {
            return this.eventBuffer.takeFirst();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for next event", e);
        }
    }

    public SubscriberImpl createSubscriber() {
        return new SubscriberImpl(this::setSubscription, this.eventBuffer);
    }

    private void setSubscription(Subscription subscription) {
        if (this.subscriptionStatus.compareAndSet(SubscriptionStatus.SUBSCRIBING, SubscriptionStatus.SUBSCRIBED)) {
            this.subscriptionQueue.add(subscription);
        } else {
            LOG.error(() -> {
                return "The supplier stopped waiting for the subscription. This is likely because it took longer than the timeout to arrive. Cancelling the subscription";
            });
            subscription.cancel();
        }
    }

    private void waitForSubscription() {
        if (this.subscriptionStatus.compareAndSet(SubscriptionStatus.NOT_SUBSCRIBED, SubscriptionStatus.SUBSCRIBING)) {
            this.bodyPublisher.subscribe(this.subscriber);
            try {
                this.subscription = this.subscriptionQueue.poll(5L, TimeUnit.SECONDS);
                if (this.subscription == null) {
                    if (!this.subscriptionStatus.compareAndSet(SubscriptionStatus.SUBSCRIBING, SubscriptionStatus.TIMED_OUT)) {
                        this.subscriptionQueue.take().cancel();
                    }
                    throw new RuntimeException("Publisher did not respond with a subscription within 5 seconds");
                }
            } catch (InterruptedException e) {
                LOG.error(() -> {
                    return "Interrupted while waiting for subscription";
                }, e);
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for subscription", e);
            }
        }
    }
}
