package com.acgist.snail.net;

import com.acgist.snail.context.exception.NetException;
import com.acgist.snail.net.codec.IMessageCodec;
import com.acgist.snail.utils.IoUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/acgist/snail/net/TcpMessageHandler.class */
public abstract class TcpMessageHandler implements CompletionHandler<Integer, ByteBuffer>, IMessageSender, IMessageReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(TcpMessageHandler.class);
    private volatile boolean close = false;
    protected AsynchronousSocketChannel socket;
    protected IMessageCodec<ByteBuffer> messageCodec;

    @Override // com.acgist.snail.net.IMessageReceiver
    public void onReceive(ByteBuffer byteBuffer) throws NetException {
        if (this.messageCodec == null) {
            throw new NetException("请实现消息处理器");
        }
        this.messageCodec.decode(byteBuffer);
    }

    public void handle(AsynchronousSocketChannel asynchronousSocketChannel) {
        this.socket = asynchronousSocketChannel;
        loopMessage();
    }

    @Override // com.acgist.snail.net.IMessageSender
    public boolean available() {
        return (this.close || this.socket == null || !this.socket.isOpen()) ? false : true;
    }

    @Override // com.acgist.snail.net.IMessageSender
    public void send(ByteBuffer byteBuffer, int i) throws NetException {
        check(byteBuffer);
        synchronized (this.socket) {
            try {
                Future<Integer> write = this.socket.write(byteBuffer);
                int intValue = i <= 0 ? write.get().intValue() : write.get(i, TimeUnit.SECONDS).intValue();
                if (intValue <= 0) {
                    LOGGER.warn("TCP消息发送失败：{}", Integer.valueOf(intValue));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new NetException(e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new NetException(e2);
            }
        }
    }

    @Override // com.acgist.snail.net.IMessageSender
    public InetSocketAddress remoteSocketAddress() {
        try {
            return (InetSocketAddress) this.socket.getRemoteAddress();
        } catch (IOException e) {
            LOGGER.error("TCP获取远程服务地址异常", e);
            return null;
        }
    }

    @Override // com.acgist.snail.net.IMessageSender
    public void close() {
        this.close = true;
        IoUtils.close(this.socket);
    }

    @Override // java.nio.channels.CompletionHandler
    public void completed(Integer num, ByteBuffer byteBuffer) {
        if (num == null) {
            close();
        } else if (num.intValue() == -1) {
            close();
        } else if (num.intValue() == 0) {
            LOGGER.debug("TCP消息接收失败（长度）：{}", num);
        } else {
            try {
                onReceive(byteBuffer);
            } catch (NetException e) {
                LOGGER.error("TCP消息接收异常", e);
            }
        }
        loopMessage();
    }

    @Override // java.nio.channels.CompletionHandler
    public void failed(Throwable th, ByteBuffer byteBuffer) {
        LOGGER.error("TCP消息处理异常", th);
    }

    private void loopMessage() {
        if (!available()) {
            LOGGER.debug("TCP消息代理退出消息轮询");
        } else {
            ByteBuffer allocate = ByteBuffer.allocate(16384);
            this.socket.read(allocate, allocate, this);
        }
    }
}
