package com.azure.cosmos.implementation.http;

import com.azure.cosmos.implementation.Configs;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import io.netty.resolver.DefaultAddressResolverGroup;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.WrongMethodTypeException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.http.client.HttpClientState;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.transport.ProxyProvider;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/cosmos/implementation/http/ReactorNettyClient.class */
public class ReactorNettyClient implements HttpClient {
    private static final String REACTOR_NETTY_REQUEST_RECORD_KEY = "reactorNettyRequestRecordKey";
    private static final Logger logger = LoggerFactory.getLogger(ReactorNettyClient.class.getSimpleName());
    private static final MethodHandle HTTP_CLIENT_WARMUP;
    private HttpClientConfig httpClientConfig;
    private reactor.netty.http.client.HttpClient httpClient;
    private ConnectionProvider connectionProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/http/ReactorNettyClient$ReactorNettyHttpResponse.class */
    public static class ReactorNettyHttpResponse extends HttpResponse {
        private final AtomicReference<ReactorNettyResponseState> state = new AtomicReference<>(ReactorNettyResponseState.NOT_SUBSCRIBED);
        private final HttpClientResponse reactorNettyResponse;
        private final Connection reactorNettyConnection;

        ReactorNettyHttpResponse(HttpClientResponse httpClientResponse, Connection connection) {
            this.reactorNettyResponse = httpClientResponse;
            this.reactorNettyConnection = connection;
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public int statusCode() {
            return this.reactorNettyResponse.status().code();
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public String headerValue(String str) {
            return this.reactorNettyResponse.responseHeaders().get(str);
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public HttpHeaders headers() {
            HttpHeaders httpHeaders = new HttpHeaders(this.reactorNettyResponse.responseHeaders().size());
            this.reactorNettyResponse.responseHeaders().forEach(entry -> {
                httpHeaders.set((String) entry.getKey(), (String) entry.getValue());
            });
            return httpHeaders;
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public Flux<ByteBuf> body() {
            return bodyIntern().doOnSubscribe(this::updateSubscriptionState);
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public Mono<byte[]> bodyAsByteArray() {
            return bodyIntern().aggregate().asByteArray().doOnSubscribe(this::updateSubscriptionState);
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public Mono<String> bodyAsString() {
            return bodyIntern().aggregate().asString().doOnSubscribe(this::updateSubscriptionState);
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        public Mono<String> bodyAsString(Charset charset) {
            return bodyIntern().aggregate().asString(charset).doOnSubscribe(this::updateSubscriptionState);
        }

        private ByteBufFlux bodyIntern() {
            return this.reactorNettyConnection.inbound().receive();
        }

        @Override // com.azure.cosmos.implementation.http.HttpResponse
        Connection internConnection() {
            return this.reactorNettyConnection;
        }

        private void updateSubscriptionState(Subscription subscription) {
            if (!this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.SUBSCRIBED) && this.state.get() == ReactorNettyResponseState.CANCELLED) {
                throw new IllegalStateException("The client response body has been released already due to cancellation.");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void releaseOnNotSubscribedResponse(ReactorNettyResponseState reactorNettyResponseState) {
            if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, reactorNettyResponseState)) {
                if (ReactorNettyClient.logger.isDebugEnabled()) {
                    ReactorNettyClient.logger.debug("Releasing body, not yet subscribed");
                }
                bodyIntern().doOnNext(byteBuf -> {
                }).subscribe(byteBuf2 -> {
                }, th -> {
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/http/ReactorNettyClient$ReactorNettyResponseState.class */
    public enum ReactorNettyResponseState {
        NOT_SUBSCRIBED,
        SUBSCRIBED,
        CANCELLED,
        ERROR
    }

    private ReactorNettyClient() {
    }

    public static ReactorNettyClient create(HttpClientConfig httpClientConfig) {
        ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
        reactorNettyClient.httpClientConfig = httpClientConfig;
        reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.newConnection().observe(getConnectionObserver()).resolver(DefaultAddressResolverGroup.INSTANCE);
        reactorNettyClient.configureChannelPipelineHandlers();
        attemptToWarmupHttpClient(reactorNettyClient);
        return reactorNettyClient;
    }

    public static ReactorNettyClient createWithConnectionProvider(ConnectionProvider connectionProvider, HttpClientConfig httpClientConfig) {
        ReactorNettyClient reactorNettyClient = new ReactorNettyClient();
        reactorNettyClient.connectionProvider = connectionProvider;
        reactorNettyClient.httpClientConfig = httpClientConfig;
        reactorNettyClient.httpClient = reactor.netty.http.client.HttpClient.create(connectionProvider).observe(getConnectionObserver()).resolver(DefaultAddressResolverGroup.INSTANCE);
        reactorNettyClient.configureChannelPipelineHandlers();
        attemptToWarmupHttpClient(reactorNettyClient);
        return reactorNettyClient;
    }

    private static void attemptToWarmupHttpClient(ReactorNettyClient reactorNettyClient) {
        if (HTTP_CLIENT_WARMUP == null) {
            return;
        }
        try {
            (Mono) HTTP_CLIENT_WARMUP.invoke(reactorNettyClient.httpClient).block();
        } catch (ClassCastException | WrongMethodTypeException e) {
            logger.debug("Invoking HttpClient.warmup failed.", e);
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private void configureChannelPipelineHandlers() {
        Configs configs = this.httpClientConfig.getConfigs();
        if (this.httpClientConfig.getProxy() != null) {
            this.httpClient = this.httpClient.proxy(typeSpec -> {
                typeSpec.type(ProxyProvider.Proxy.HTTP).address(this.httpClientConfig.getProxy().getAddress());
            });
        }
        if (LoggerFactory.getLogger(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY).isTraceEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO);
        }
        this.httpClient = this.httpClient.secure(sslContextSpec -> {
            sslContextSpec.sslContext(configs.getSslContext());
        }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) configs.getConnectionAcquireTimeout().toMillis())).httpResponseDecoder(httpResponseDecoderSpec -> {
            return httpResponseDecoderSpec.maxInitialLineLength(configs.getMaxHttpInitialLineLength()).maxHeaderSize(configs.getMaxHttpHeaderSize()).maxChunkSize(configs.getMaxHttpChunkSize()).validateHeaders(true);
        });
    }

    @Override // com.azure.cosmos.implementation.http.HttpClient
    public Mono<HttpResponse> send(HttpRequest httpRequest) {
        return send(httpRequest, Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds()));
    }

    @Override // com.azure.cosmos.implementation.http.HttpClient
    public Mono<HttpResponse> send(HttpRequest httpRequest, Duration duration) {
        Objects.requireNonNull(httpRequest.httpMethod());
        Objects.requireNonNull(httpRequest.uri());
        Objects.requireNonNull(this.httpClientConfig);
        if (httpRequest.reactorNettyRequestRecord() == null) {
            ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
            reactorNettyRequestRecord.setTimeCreated(Instant.now());
            httpRequest.withReactorNettyRequestRecord(reactorNettyRequestRecord);
        }
        AtomicReference atomicReference = new AtomicReference();
        return this.httpClient.keepAlive(this.httpClientConfig.isConnectionKeepAlive()).port(httpRequest.port()).responseTimeout(duration).request(HttpMethod.valueOf(httpRequest.httpMethod().toString())).uri(httpRequest.uri().toString()).send(bodySendDelegate(httpRequest)).responseConnection((httpClientResponse, connection) -> {
            HttpResponse withRequest = new ReactorNettyHttpResponse(httpClientResponse, connection).withRequest(httpRequest);
            atomicReference.set((ReactorNettyHttpResponse) withRequest);
            return Mono.just(withRequest);
        }).contextWrite(Context.of(REACTOR_NETTY_REQUEST_RECORD_KEY, httpRequest.reactorNettyRequestRecord())).doOnCancel(() -> {
            ReactorNettyHttpResponse reactorNettyHttpResponse = (ReactorNettyHttpResponse) atomicReference.get();
            if (reactorNettyHttpResponse != null) {
                reactorNettyHttpResponse.releaseOnNotSubscribedResponse(ReactorNettyResponseState.CANCELLED);
            }
        }).onErrorMap(th -> {
            ReactorNettyHttpResponse reactorNettyHttpResponse = (ReactorNettyHttpResponse) atomicReference.get();
            if (reactorNettyHttpResponse != null) {
                reactorNettyHttpResponse.releaseOnNotSubscribedResponse(ReactorNettyResponseState.ERROR);
            }
            return th;
        }).single();
    }

    private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(HttpRequest httpRequest) {
        return (httpClientRequest, nettyOutbound) -> {
            Iterator<HttpHeader> it = httpRequest.headers().iterator();
            while (it.hasNext()) {
                HttpHeader next = it.next();
                httpClientRequest.header(next.name(), next.value());
            }
            return httpRequest.body() != null ? nettyOutbound.sendByteArray(httpRequest.body()) : nettyOutbound;
        };
    }

    @Override // com.azure.cosmos.implementation.http.HttpClient
    public void shutdown() {
        if (this.connectionProvider != null) {
            this.connectionProvider.dispose();
        }
    }

    private static ConnectionObserver getConnectionObserver() {
        return (connection, state) -> {
            Instant now = Instant.now();
            if (state.equals(HttpClientState.CONNECTED)) {
                if (connection instanceof ConnectionObserver) {
                    ReactorNettyRequestRecord reactorNettyRequestRecord = (ReactorNettyRequestRecord) ((ConnectionObserver) connection).currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, (Object) null);
                    if (reactorNettyRequestRecord == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    reactorNettyRequestRecord.setTimeConnected(now);
                    return;
                }
                return;
            }
            if (state.equals(HttpClientState.ACQUIRED)) {
                if (connection instanceof ConnectionObserver) {
                    ReactorNettyRequestRecord reactorNettyRequestRecord2 = (ReactorNettyRequestRecord) ((ConnectionObserver) connection).currentContext().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, (Object) null);
                    if (reactorNettyRequestRecord2 == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    reactorNettyRequestRecord2.setTimeAcquired(now);
                    return;
                }
                return;
            }
            if (state.equals(HttpClientState.CONFIGURED)) {
                if (connection instanceof HttpClientRequest) {
                    ReactorNettyRequestRecord reactorNettyRequestRecord3 = (ReactorNettyRequestRecord) ((HttpClientRequest) connection).currentContextView().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, (Object) null);
                    if (reactorNettyRequestRecord3 == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    reactorNettyRequestRecord3.setTimeConfigured(now);
                    return;
                }
                return;
            }
            if (state.equals(HttpClientState.REQUEST_SENT)) {
                if (connection instanceof HttpClientRequest) {
                    ReactorNettyRequestRecord reactorNettyRequestRecord4 = (ReactorNettyRequestRecord) ((HttpClientRequest) connection).currentContextView().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, (Object) null);
                    if (reactorNettyRequestRecord4 == null) {
                        throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                    }
                    reactorNettyRequestRecord4.setTimeSent(now);
                    return;
                }
                return;
            }
            if (state.equals(HttpClientState.RESPONSE_RECEIVED) && (connection instanceof HttpClientRequest)) {
                ReactorNettyRequestRecord reactorNettyRequestRecord5 = (ReactorNettyRequestRecord) ((HttpClientRequest) connection).currentContextView().getOrDefault(REACTOR_NETTY_REQUEST_RECORD_KEY, (Object) null);
                if (reactorNettyRequestRecord5 == null) {
                    throw new IllegalStateException("ReactorNettyRequestRecord not found in context");
                }
                reactorNettyRequestRecord5.setTimeReceived(now);
            }
        };
    }

    public void enableNetworkLogging() {
        Logger logger2 = LoggerFactory.getLogger(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY);
        if (logger2.isTraceEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.TRACE);
            return;
        }
        if (logger2.isDebugEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.DEBUG);
            return;
        }
        if (logger2.isInfoEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.INFO);
        } else if (logger2.isWarnEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.WARN);
        } else if (logger2.isErrorEnabled()) {
            this.httpClient = this.httpClient.wiretap(HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY, LogLevel.ERROR);
        }
    }

    static {
        MethodHandle methodHandle = null;
        try {
            methodHandle = MethodHandles.publicLookup().findVirtual(reactor.netty.http.client.HttpClient.class, "warmup", MethodType.methodType(Mono.class));
        } catch (IllegalAccessException | NoSuchMethodException e) {
        }
        HTTP_CLIENT_WARMUP = methodHandle;
    }
}
