package cn.atcoder.air.transport;

import cn.atcoder.air.client.ClientChannelHandler;
import cn.atcoder.air.client.MessageFuture;
import cn.atcoder.air.exception.ClientTimeoutException;
import cn.atcoder.air.exception.MessageException;
import cn.atcoder.air.msg.BaseMessage;
import cn.atcoder.air.msg.CallbackRequestMessage;
import cn.atcoder.air.msg.MessageType;
import cn.atcoder.air.msg.RequestMessage;
import cn.atcoder.air.msg.ResponseMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:cn/atcoder/air/transport/TcpClientTransport.class */
public class TcpClientTransport extends AbstractClientTransport {
    private final ConcurrentHashMap<Long, MessageFuture> futureMap;
    private AtomicLong requestId;

    public TcpClientTransport(String str, int i) {
        super(str, i);
        this.futureMap = new ConcurrentHashMap<>();
        this.requestId = new AtomicLong(0L);
    }

    @Override // cn.atcoder.air.transport.AbstractClientTransport
    protected ResponseMessage send(RequestMessage requestMessage, int i) {
        long generateRequestId = generateRequestId();
        try {
            try {
                this.currentRequests.incrementAndGet();
                requestMessage.setMessageId(generateRequestId);
                ResponseMessage responseMessage = doSendFuture(requestMessage, i).get(i, TimeUnit.MILLISECONDS);
                this.currentRequests.decrementAndGet();
                return responseMessage;
            } catch (ClientTimeoutException e) {
                try {
                    this.futureMap.remove(Long.valueOf(generateRequestId));
                } catch (Exception e2) {
                    LOGGER.error(e2.getMessage(), e2);
                }
                throw e;
            } catch (InterruptedException e3) {
                throw new MessageException("Client request thread interrupted");
            }
        } catch (Throwable th) {
            this.currentRequests.decrementAndGet();
            throw th;
        }
    }

    public void receiveResponse(ResponseMessage responseMessage) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("receiveResponse..{}", responseMessage);
        }
        Long valueOf = Long.valueOf(responseMessage.getMessageId());
        MessageFuture messageFuture = this.futureMap.get(valueOf);
        if (messageFuture == null) {
            LOGGER.warn("Not found future which msgId is {} when receive response. May be this future have been removed because of timeout", valueOf);
        } else {
            messageFuture.setSuccess(responseMessage);
            this.futureMap.remove(valueOf);
        }
    }

    @Override // cn.atcoder.air.transport.AbstractClientTransport
    void start0() {
        this.bootstrap = new Bootstrap();
        this.bootstrap.channel(NioSocketChannel.class);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        this.bootstrap.group(this.workGroup);
        this.bootstrap.remoteAddress(this.host, this.port);
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: cn.atcoder.air.transport.TcpClientTransport.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(20, 10, 0)});
                socketChannel.pipeline().addLast(new ChannelHandler[]{new ObjectEncoder()});
                socketChannel.pipeline().addLast(new ChannelHandler[]{new ObjectDecoder(ClassResolvers.cacheDisabled((ClassLoader) null))});
                socketChannel.pipeline().addLast(new ChannelHandler[]{new ClientChannelHandler()});
                ClientTransportFactory.build(TcpClientTransport.this);
            }
        });
        doConnect();
    }

    @Override // cn.atcoder.air.transport.BaseTransport
    public void doConnect() {
        if (Objects.nonNull(this.channel) && this.channel.isActive()) {
            return;
        }
        ChannelFuture connect = this.bootstrap.connect(this.host, this.port);
        connect.addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                this.channel = connect.channel();
                LOGGER.info("Connect to provider:{} success! The connection is {} -> {}", new Object[]{getRemoteAddress(), this.channel.localAddress(), getRemoteAddress()});
            } else {
                LOGGER.info("连接到服务器失败，10秒后尝试重连");
                channelFuture.channel().eventLoop().schedule(this::doConnect, 10L, TimeUnit.SECONDS);
            }
        });
    }

    @Override // cn.atcoder.air.transport.BaseTransport
    public void shutdown() {
    }

    @Override // cn.atcoder.air.transport.ClientTransport
    public String getRemoteAddress() {
        return this.host + ":" + this.port;
    }

    @Override // cn.atcoder.air.transport.BaseTransport
    public boolean isOpen() {
        return Objects.nonNull(this.channel) && this.channel.isActive();
    }

    private void addFuture(BaseMessage baseMessage, MessageFuture messageFuture) {
        MessageType messageType = baseMessage.getMessageType();
        Long valueOf = Long.valueOf(baseMessage.getMessageId());
        if (messageType == MessageType.CALLBACK_REQUEST_MSG || messageType == MessageType.REGISTER_REQUEST_MSG || messageType == MessageType.HEARTBEAT_REQUEST_MSG) {
            this.futureMap.put(valueOf, messageFuture);
        } else {
            LOGGER.error("cannot handle Future for this Msg:{}", baseMessage);
        }
    }

    private MessageFuture<ResponseMessage> doSendFuture(RequestMessage requestMessage, int i) {
        MessageFuture<ResponseMessage> messageFuture = new MessageFuture<>(requestMessage.getHeader(), i, this.channel);
        if (requestMessage instanceof CallbackRequestMessage) {
            messageFuture.setInvocationBody(((CallbackRequestMessage) requestMessage).getInvocationBody());
        }
        this.channel.writeAndFlush(requestMessage);
        messageFuture.setSentTime(System.currentTimeMillis());
        addFuture(requestMessage, messageFuture);
        return messageFuture;
    }

    private long generateRequestId() {
        return this.requestId.getAndIncrement();
    }
}
