package com.googlecode.protobuf.pro.stream.client;

import com.google.protobuf.Message;
import com.googlecode.protobuf.pro.stream.PeerInfo;
import com.googlecode.protobuf.pro.stream.RpcSSLContext;
import com.googlecode.protobuf.pro.stream.StreamingClient;
import com.googlecode.protobuf.pro.stream.TransferIn;
import com.googlecode.protobuf.pro.stream.TransferOut;
import com.googlecode.protobuf.pro.stream.handler.Handler;
import com.googlecode.protobuf.pro.stream.handler.StreamingClientHandler;
import com.googlecode.protobuf.pro.stream.logging.CategoryPerMessageTypeLogger;
import com.googlecode.protobuf.pro.stream.logging.StreamLogger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;

/* loaded from: input_file:com/googlecode/protobuf/pro/stream/client/StreamingTcpClientBootstrap.class */
public class StreamingTcpClientBootstrap<E extends Message, F extends Message> extends ClientBootstrap {
    private static Log log = LogFactory.getLog(StreamingTcpClientBootstrap.class);
    private final PeerInfo clientInfo;
    private StreamLogger streamLogger;
    private boolean shareChannels;
    private int chunkSize;
    private ChannelGroup allChannels;

    public StreamingTcpClientBootstrap(PeerInfo peerInfo, ChannelFactory channelFactory) {
        super(channelFactory);
        this.streamLogger = new CategoryPerMessageTypeLogger();
        this.shareChannels = true;
        this.chunkSize = 89600;
        this.allChannels = new DefaultChannelGroup();
        this.clientInfo = peerInfo;
        setPipelineFactory(new StreamingTcpClientPipelineFactory());
    }

    public TransferIn pull(PeerInfo peerInfo, E e) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(peerInfo.getHostName(), peerInfo.getPort());
        StreamingClient<E, F> findExistingChannelTo = findExistingChannelTo(inetSocketAddress);
        if (findExistingChannelTo == null || !this.shareChannels) {
            findExistingChannelTo = connectWith(inetSocketAddress);
        } else {
            log.debug("Reusing open connection to " + peerInfo + " for pull.");
        }
        return findExistingChannelTo.pull(e);
    }

    public TransferOut push(PeerInfo peerInfo, F f) throws IOException {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(peerInfo.getHostName(), peerInfo.getPort());
        StreamingClient<E, F> findExistingChannelTo = findExistingChannelTo(inetSocketAddress);
        if (findExistingChannelTo == null || !this.shareChannels) {
            findExistingChannelTo = connectWith(inetSocketAddress);
        } else {
            log.debug("Reusing open connection to " + peerInfo + " for push.");
        }
        return findExistingChannelTo.push(f);
    }

    StreamingClient<E, F> findExistingChannelTo(InetSocketAddress inetSocketAddress) {
        StreamingClientHandler streamingClientHandler;
        for (Channel channel : this.allChannels) {
            if (((InetSocketAddress) channel.getRemoteAddress()).equals(inetSocketAddress) && (streamingClientHandler = channel.getPipeline().get(Handler.STREAMING_CLIENT)) != null) {
                return streamingClientHandler.getStreamingClient();
            }
        }
        return null;
    }

    private StreamingClient<E, F> connectWith(InetSocketAddress inetSocketAddress) throws IOException {
        if (inetSocketAddress == null) {
            throw new NullPointerException("remotedAddress");
        }
        ChannelFuture awaitUninterruptibly = super.connect(inetSocketAddress, (SocketAddress) getOption("localAddress")).awaitUninterruptibly();
        if (!awaitUninterruptibly.isSuccess()) {
            throw new IOException("Failed to connect to " + inetSocketAddress, awaitUninterruptibly.getCause());
        }
        Channel channel = awaitUninterruptibly.getChannel();
        StreamingClient<E, F> streamingClient = new StreamingClient<>(channel, this.clientInfo, new PeerInfo(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), "<N/A>"), this.chunkSize);
        streamingClient.setStreamLogger(getStreamLogger());
        completePipeline(streamingClient);
        this.allChannels.add(channel);
        return streamingClient;
    }

    protected StreamingClientHandler<E, F> completePipeline(StreamingClient<E, F> streamingClient) {
        StreamingClientHandler<E, F> streamingClientHandler = new StreamingClientHandler<>(streamingClient);
        streamingClient.getChannel().getPipeline().addLast(Handler.STREAMING_CLIENT, streamingClientHandler);
        return streamingClientHandler;
    }

    public void releaseExternalResources() {
        log.debug("Closing all channels.");
        this.allChannels.close().awaitUninterruptibly();
        log.debug("Releasing IO-Layer external resources.");
        super.releaseExternalResources();
    }

    public String toString() {
        return "ClientBootstrap:" + this.clientInfo;
    }

    public ChannelFuture connect(SocketAddress socketAddress) {
        throw new IllegalStateException("use push or pull method.");
    }

    public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress2) {
        throw new IllegalStateException("use push or pull method.");
    }

    public ChannelFuture connect() {
        throw new IllegalStateException("use push or pull method.");
    }

    public PeerInfo getClientInfo() {
        return this.clientInfo;
    }

    public RpcSSLContext getSslContext() {
        return ((StreamingTcpClientPipelineFactory) getPipelineFactory()).getSslContext();
    }

    public void setSslContext(RpcSSLContext rpcSSLContext) {
        ((StreamingTcpClientPipelineFactory) getPipelineFactory()).setSslContext(rpcSSLContext);
    }

    public StreamLogger getStreamLogger() {
        return this.streamLogger;
    }

    public void setStreamLogger(StreamLogger streamLogger) {
        this.streamLogger = streamLogger;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public boolean isShareChannels() {
        return this.shareChannels;
    }

    public void setShareChannels(boolean z) {
        this.shareChannels = z;
    }

    public boolean isCompress() {
        return ((StreamingTcpClientPipelineFactory) getPipelineFactory()).isCompress();
    }

    public void setCompress(boolean z) {
        ((StreamingTcpClientPipelineFactory) getPipelineFactory()).setCompress(z);
    }
}
