package com.linkedin.r2.transport.http.client.stream.http;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.http.client.AbstractJmxManager;
import com.linkedin.r2.transport.http.client.AsyncPool;
import com.linkedin.r2.transport.http.client.PoolStats;
import com.linkedin.r2.transport.http.client.TimeoutCallback;
import com.linkedin.r2.transport.http.client.TimeoutTransportCallback;
import com.linkedin.r2.transport.http.client.common.ChannelPoolFactory;
import com.linkedin.r2.transport.http.client.common.ChannelPoolManager;
import com.linkedin.r2.transport.http.client.stream.AbstractNettyStreamClient;
import com.linkedin.r2.transport.http.common.HttpProtocolVersion;
import com.linkedin.r2.util.Cancellable;
import com.linkedin.r2.util.Timeout;
import com.linkedin.r2.util.TimeoutRunnable;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.ChannelGroupFutureListener;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/r2/transport/http/client/stream/http/HttpNettyStreamClient.class */
public class HttpNettyStreamClient extends AbstractNettyStreamClient {
    static final Logger LOG = LoggerFactory.getLogger(HttpNettyStreamClient.class);
    private final ChannelPoolManager _channelPoolManager;
    private final ChannelGroup _allChannels;

    /* loaded from: input_file:com/linkedin/r2/transport/http/client/stream/http/HttpNettyStreamClient$ChannelPoolGetCallback.class */
    private class ChannelPoolGetCallback implements Callback<Channel> {
        private final AsyncPool<Channel> _pool;
        private final Request _request;
        private final TimeoutTransportCallback<StreamResponse> _callback;

        ChannelPoolGetCallback(AsyncPool<Channel> asyncPool, Request request, TimeoutTransportCallback<StreamResponse> timeoutTransportCallback) {
            this._pool = asyncPool;
            this._request = request;
            this._callback = timeoutTransportCallback;
        }

        public void onSuccess(Channel channel) {
            channel.attr(ChannelPoolStreamHandler.CHANNEL_POOL_ATTR_KEY).set(this._pool);
            this._callback.addTimeoutTask(() -> {
                AsyncPool asyncPool = (AsyncPool) channel.attr(ChannelPoolStreamHandler.CHANNEL_POOL_ATTR_KEY).getAndSet((Object) null);
                if (asyncPool != null) {
                    asyncPool.dispose(channel);
                }
            });
            Timeout timeout = new Timeout(HttpNettyStreamClient.this._scheduler, HttpNettyStreamClient.this._requestTimeout, TimeUnit.MILLISECONDS, None.none());
            this._callback.addTimeoutTask(() -> {
                Timeout timeout2 = (Timeout) channel.attr(RAPStreamResponseDecoder.TIMEOUT_ATTR_KEY).getAndSet((Object) null);
                if (timeout2 != null) {
                    timeout2.getItem();
                }
            });
            channel.attr(RAPStreamResponseHandler.CALLBACK_ATTR_KEY).set(this._callback);
            channel.attr(RAPStreamResponseDecoder.TIMEOUT_ATTR_KEY).set(timeout);
            AbstractNettyStreamClient.State state = (AbstractNettyStreamClient.State) HttpNettyStreamClient.this._state.get();
            if (state == AbstractNettyStreamClient.State.REQUESTS_STOPPING || state == AbstractNettyStreamClient.State.SHUTDOWN) {
                this._callback.onResponse(TransportResponseImpl.error(new TimeoutException("Operation did not complete before shutdown")));
            } else {
                channel.writeAndFlush(this._request).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
            }
        }

        public void onError(Throwable th) {
            this._callback.onResponse(TransportResponseImpl.error(th));
        }
    }

    /* loaded from: input_file:com/linkedin/r2/transport/http/client/stream/http/HttpNettyStreamClient$ChannelPoolShutdownCallback.class */
    private class ChannelPoolShutdownCallback extends TimeoutCallback<None> {
        public ChannelPoolShutdownCallback(final ScheduledExecutorService scheduledExecutorService, long j, TimeUnit timeUnit, final long j2, final Callback<None> callback) {
            super(scheduledExecutorService, j, timeUnit, new Callback<None>() { // from class: com.linkedin.r2.transport.http.client.stream.http.HttpNettyStreamClient.ChannelPoolShutdownCallback.1
                private void finishShutdown() {
                    HttpNettyStreamClient.this._state.set(AbstractNettyStreamClient.State.REQUESTS_STOPPING);
                    Iterator<Callback<Channel>> it = HttpNettyStreamClient.this._channelPoolManager.cancelWaiters().iterator();
                    while (it.hasNext()) {
                        it.next().onError(new TimeoutException("Operation did not complete before shutdown"));
                    }
                    Iterator it2 = HttpNettyStreamClient.this._allChannels.iterator();
                    while (it2.hasNext()) {
                        TransportCallback transportCallback = (TransportCallback) ((Channel) it2.next()).attr(RAPStreamResponseHandler.CALLBACK_ATTR_KEY).getAndSet((Object) null);
                        if (transportCallback != null) {
                            AbstractNettyStreamClient.errorResponse(transportCallback, new TimeoutException("Operation did not complete before shutdown"));
                        }
                    }
                    ScheduledExecutorService scheduledExecutorService2 = scheduledExecutorService;
                    long currentTimeMillis = j2 - System.currentTimeMillis();
                    TimeUnit timeUnit2 = TimeUnit.MILLISECONDS;
                    Callback callback2 = callback;
                    final TimeoutRunnable timeoutRunnable = new TimeoutRunnable(scheduledExecutorService2, currentTimeMillis, timeUnit2, () -> {
                        HttpNettyStreamClient.this._state.set(AbstractNettyStreamClient.State.SHUTDOWN);
                        HttpNettyStreamClient.LOG.info("Shutdown complete");
                        callback2.onSuccess(None.none());
                    }, "Timed out waiting for channels to close, continuing shutdown");
                    HttpNettyStreamClient.this._allChannels.close().addListener(new ChannelGroupFutureListener() { // from class: com.linkedin.r2.transport.http.client.stream.http.HttpNettyStreamClient.ChannelPoolShutdownCallback.1.1
                        public void operationComplete(ChannelGroupFuture channelGroupFuture) throws Exception {
                            if (!channelGroupFuture.isSuccess()) {
                                HttpNettyStreamClient.LOG.warn("Failed to close some connections, ignoring");
                            }
                            timeoutRunnable.run();
                        }
                    });
                }

                public void onSuccess(None none) {
                    HttpNettyStreamClient.LOG.info("All connection pools shut down, closing all channels");
                    finishShutdown();
                }

                public void onError(Throwable th) {
                    HttpNettyStreamClient.LOG.warn("Error shutting down HTTP connection pools, ignoring and continuing shutdown", th);
                    finishShutdown();
                }
            }, "Connection pool shutdown timeout exceeded (" + HttpNettyStreamClient.this._shutdownTimeout + "ms)");
        }
    }

    public HttpNettyStreamClient(NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduledExecutorService, long j, long j2, ExecutorService executorService, AbstractJmxManager abstractJmxManager, ChannelPoolManager channelPoolManager) {
        super(nioEventLoopGroup, scheduledExecutorService, j, j2, executorService, abstractJmxManager);
        this._channelPoolManager = channelPoolManager;
        this._jmxManager.onProviderCreate(this._channelPoolManager);
        this._allChannels = channelPoolManager.getAllChannels();
    }

    public HttpNettyStreamClient(ChannelPoolFactory channelPoolFactory, ScheduledExecutorService scheduledExecutorService, int i, int i2) {
        super(channelPoolFactory, scheduledExecutorService, i, i2);
        DefaultChannelGroup defaultChannelGroup = new DefaultChannelGroup("R2 client channels", GlobalEventExecutor.INSTANCE);
        this._channelPoolManager = new ChannelPoolManager(channelPoolFactory, defaultChannelGroup);
        this._allChannels = defaultChannelGroup;
        this._jmxManager.onProviderCreate(this._channelPoolManager);
    }

    @Override // com.linkedin.r2.transport.http.client.stream.AbstractNettyStreamClient
    public Map<String, PoolStats> getPoolStats() {
        return this._channelPoolManager.getPoolStats();
    }

    @Override // com.linkedin.r2.transport.http.client.stream.AbstractNettyStreamClient
    protected void doShutdown(Callback<None> callback) {
        this._channelPoolManager.shutdown(new ChannelPoolShutdownCallback(this._scheduler, this._shutdownTimeout, TimeUnit.MILLISECONDS, System.currentTimeMillis() + this._shutdownTimeout, callback));
        this._jmxManager.onProviderShutdown(this._channelPoolManager);
    }

    @Override // com.linkedin.r2.transport.http.client.stream.AbstractNettyStreamClient
    protected void doWriteRequest(Request request, RequestContext requestContext, SocketAddress socketAddress, TimeoutTransportCallback<StreamResponse> timeoutTransportCallback) {
        try {
            AsyncPool<Channel> poolForAddress = this._channelPoolManager.getPoolForAddress(socketAddress);
            requestContext.putLocalAttr("HTTP_PROTOCOL_VERSION", HttpProtocolVersion.HTTP_1_1);
            Cancellable cancellable = poolForAddress.get(new ChannelPoolGetCallback(poolForAddress, request, timeoutTransportCallback));
            if (cancellable != null) {
                timeoutTransportCallback.addTimeoutTask(() -> {
                    cancellable.cancel();
                });
            }
        } catch (IllegalStateException e) {
            errorResponse(timeoutTransportCallback, e);
        }
    }
}
