package com.baidu.hugegraph.computer.core.network.netty;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.network.ClientHandler;
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.MessageHandler;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.util.StringEncoding;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.concurrent.BarrierEvent;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/NettyTransportClientTest.class */
public class NettyTransportClientTest extends AbstractNetworkTest {
    private static final Logger LOG = Log.logger(NettyTransportClientTest.class);
    public static final BarrierEvent BARRIER_EVENT = new BarrierEvent();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.baidu.hugegraph.computer.core.network.netty.NettyTransportClientTest$1, reason: invalid class name */
    /* loaded from: input_file:com/baidu/hugegraph/computer/core/network/netty/NettyTransportClientTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$baidu$hugegraph$computer$core$network$message$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$baidu$hugegraph$computer$core$network$message$MessageType[MessageType.MSG.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$baidu$hugegraph$computer$core$network$message$MessageType[MessageType.EDGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$baidu$hugegraph$computer$core$network$message$MessageType[MessageType.VERTEX.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // com.baidu.hugegraph.computer.core.network.netty.AbstractNetworkTest
    protected void initOption() {
        super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, 8000);
        super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, 6000);
        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK, 67108864);
        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK, 33554432);
        super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL, 200L);
        super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT, 30000L);
    }

    @Test
    public void testChannel() throws IOException {
        Assert.assertTrue(oneClient().channel().isActive());
    }

    @Test
    public void testConnectID() throws IOException {
        Assert.assertEquals(ConnectionId.parseConnectionId(host, port), oneClient().connectionId());
    }

    @Test
    public void testRemoteAddress() throws IOException {
        NettyTransportClient oneClient = oneClient();
        ConnectionId parseConnectionId = ConnectionId.parseConnectionId(host, port);
        Assert.assertEquals(parseConnectionId.socketAddress(), oneClient.remoteAddress());
    }

    @Test
    public void testStartSession() throws IOException {
        oneClient().startSession();
    }

    @Test
    public void testStartAsync() throws Exception {
        oneClient().startSessionAsync().get(conf.timeoutSyncRequest(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFinishSession() throws IOException {
        NettyTransportClient oneClient = oneClient();
        oneClient.startSession();
        oneClient.finishSession();
    }

    @Test
    public void testFinishAsync() throws Exception {
        NettyTransportClient oneClient = oneClient();
        oneClient.startSessionAsync().get(conf.timeoutSyncRequest(), TimeUnit.MILLISECONDS);
        oneClient.finishSessionAsync().get(conf.timeoutFinishSession(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSend() throws IOException {
        NettyTransportClient oneClient = oneClient();
        for (int i = 0; i < 3; i++) {
            oneClient.startSession();
            oneClient.send(MessageType.MSG, 1, ByteBuffer.wrap(StringEncoding.encode("test1")));
            oneClient.send(MessageType.VERTEX, 2, ByteBuffer.wrap(StringEncoding.encode("test2")));
            oneClient.send(MessageType.EDGE, 3, ByteBuffer.wrap(StringEncoding.encode("test3")));
            oneClient.finishSession();
        }
    }

    @Test
    public void testDataUniformity() throws IOException {
        NettyTransportClient oneClient = oneClient();
        byte[] encode = StringEncoding.encode("test data message");
        byte[] encode2 = StringEncoding.encode("test data edge");
        byte[] encode3 = StringEncoding.encode("test data vertex");
        ((MessageHandler) Mockito.doAnswer(invocationOnMock -> {
            MessageType messageType = (MessageType) invocationOnMock.getArgument(0);
            ManagedBuffer managedBuffer = (ManagedBuffer) invocationOnMock.getArgument(2);
            byte[] bArr = null;
            switch (AnonymousClass1.$SwitchMap$com$baidu$hugegraph$computer$core$network$message$MessageType[messageType.ordinal()]) {
                case 1:
                    bArr = encode;
                    break;
                case 2:
                    bArr = encode2;
                    break;
                case 3:
                    bArr = encode3;
                    break;
            }
            byte[] copyToByteArray = managedBuffer.copyToByteArray();
            Assert.assertArrayEquals(bArr, copyToByteArray);
            Assert.assertNotSame(bArr, copyToByteArray);
            byte[] copyToByteArray2 = managedBuffer.copyToByteArray();
            Assert.assertArrayEquals(bArr, copyToByteArray2);
            Assert.assertNotSame(bArr, copyToByteArray2);
            return null;
        }).when(serverHandler)).handle((MessageType) Mockito.any(), Mockito.eq(1), (ManagedBuffer) Mockito.any());
        oneClient.startSession();
        oneClient.send(MessageType.MSG, 1, ByteBuffer.wrap(encode));
        oneClient.send(MessageType.EDGE, 1, ByteBuffer.wrap(encode2));
        oneClient.send(MessageType.VERTEX, 1, ByteBuffer.wrap(encode3));
        oneClient.finishSession();
    }

    @Test
    public void testStartSessionWithTimeout() throws IOException {
        NettyTransportClient oneClient = oneClient();
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", message -> {
            return null;
        });
        Assert.assertThrows(TransportException.class, () -> {
            oneClient.startSession();
        }, th -> {
            Assert.assertContains("to wait start-response", th.getMessage());
        });
    }

    @Test
    public void testFinishSessionWithTimeout() throws IOException {
        NettyTransportClient oneClient = oneClient();
        oneClient.startSession();
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", message -> {
            return null;
        });
        Whitebox.setInternalState(oneClient, "timeoutFinishSession", 1000L);
        Assert.assertThrows(TransportException.class, () -> {
            oneClient.finishSession();
        }, th -> {
            Assert.assertContains("to wait finish-response", th.getMessage());
        });
    }

    @Test
    public void testStartSessionWithSendException() throws IOException {
        NettyTransportClient oneClient = oneClient();
        Function function = (Function) Mockito.mock(Function.class);
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", function);
        ((Function) Mockito.doThrow(new Throwable[]{new RuntimeException("test exception")}).when(function)).apply(Mockito.any());
        Assert.assertThrows(RuntimeException.class, () -> {
            oneClient.startSession();
        }, th -> {
            Assert.assertContains("test exception", th.getMessage());
        });
    }

    @Test
    public void testFinishSessionWithSendException() throws IOException {
        NettyTransportClient oneClient = oneClient();
        oneClient.startSession();
        Function function = (Function) Mockito.mock(Function.class);
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", function);
        ((Function) Mockito.doThrow(new Throwable[]{new RuntimeException("test exception")}).when(function)).apply(Mockito.any());
        Assert.assertThrows(RuntimeException.class, () -> {
            oneClient.finishSession();
        }, th -> {
            Assert.assertContains("test exception", th.getMessage());
        });
    }

    @Test
    public void testFlowControl() throws IOException {
        ByteBuffer wrap = ByteBuffer.wrap(StringEncoding.encode("test data"));
        NettyTransportClient oneClient = oneClient();
        oneClient.startSession();
        Object internalState = Whitebox.getInternalState(oneClient.clientSession(), "sendFunction");
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", message -> {
            return null;
        });
        for (int i = 1; i <= conf.maxPendingRequests() * 2; i++) {
            boolean send = oneClient.send(MessageType.MSG, 1, wrap);
            if (i <= conf.maxPendingRequests()) {
                Assert.assertTrue(send);
            } else {
                Assert.assertFalse(send);
            }
        }
        int intValue = ((Integer) Whitebox.getInternalState(oneClient.clientSession(), "maxRequestId")).intValue();
        int intValue2 = ((Integer) Whitebox.getInternalState(oneClient.clientSession(), "maxAckId")).intValue();
        Assert.assertEquals(conf.maxPendingRequests(), intValue);
        Assert.assertEquals(0L, intValue2);
        int maxPendingRequests = conf.maxPendingRequests() - conf.minPendingRequests();
        for (int i2 = 1; i2 <= maxPendingRequests + 1; i2++) {
            Assert.assertFalse(oneClient.checkSendAvailable());
            oneClient.clientSession().onRecvAck(i2);
        }
        Assert.assertTrue(oneClient.checkSendAvailable());
        Assert.assertEquals(maxPendingRequests + 1, ((Integer) Whitebox.getInternalState(oneClient.clientSession(), "maxAckId")).intValue());
        Whitebox.setInternalState(oneClient.clientSession(), "sendFunction", internalState);
    }

    @Test
    public void testHandlerException() throws IOException {
        NettyTransportClient oneClient = oneClient();
        oneClient.startSession();
        ((MessageHandler) Mockito.doThrow(new Throwable[]{new RuntimeException("test exception")}).when(serverHandler)).handle((MessageType) Mockito.any(), Mockito.anyInt(), (ManagedBuffer) Mockito.any());
        Assert.assertTrue(oneClient.send(MessageType.MSG, 1, ByteBuffer.wrap(StringEncoding.encode("test data"))));
        Whitebox.setInternalState(oneClient, "timeoutFinishSession", 1000L);
        Assert.assertThrows(TransportException.class, () -> {
            oneClient.finishSession();
        }, th -> {
            Assert.assertContains("to wait finish-response", th.getMessage());
        });
        ((MessageHandler) Mockito.verify(serverHandler, Mockito.timeout(10000L).times(1))).exceptionCaught((TransportException) Mockito.any(), (ConnectionId) Mockito.any());
        ((ClientHandler) Mockito.verify(clientHandler, Mockito.timeout(10000L).times(1))).exceptionCaught((TransportException) Mockito.any(), (ConnectionId) Mockito.any());
    }

    @Test
    public void testTransportPerformance() throws IOException, InterruptedException {
        Configurator.setAllLevels("com.baidu.hugegraph", Level.INFO);
        Configurator.setAllLevels("com.baidu.hugegraph.computer.core.network", Level.WARN);
        NettyTransportClient oneClient = oneClient();
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(51200);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((ClientHandler) Mockito.doAnswer(invocationOnMock -> {
            invocationOnMock.callRealMethod();
            BARRIER_EVENT.signalAll();
            return null;
        }).when(clientHandler)).sendAvailable((ConnectionId) Mockito.any());
        ((MessageHandler) Mockito.doAnswer(invocationOnMock2 -> {
            invocationOnMock2.callRealMethod();
            atomicInteger.getAndIncrement();
            return null;
        }).when(serverHandler)).handle((MessageType) Mockito.any(), Mockito.anyInt(), (ManagedBuffer) Mockito.any());
        long nanoTime = System.nanoTime();
        oneClient.startSession();
        int i = 0;
        while (i < 209716) {
            if (!oneClient.send(MessageType.MSG, 1, allocateDirect)) {
                LOG.info("Current send unavailable");
                i--;
                if (!BARRIER_EVENT.await(10000L)) {
                    throw new ComputerException("Timeout(%sms) to wait sendable", new Object[]{10000L});
                }
                BARRIER_EVENT.reset();
            }
            i++;
        }
        oneClient.finishSession();
        LOG.info("Transport {} data packets total 10GB, cost {}ms", 209716, Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        Assert.assertEquals(209716, atomicInteger.get());
        Configurator.setAllLevels("com.baidu.hugegraph.computer.core.network", Level.INFO);
    }

    @Test
    public void testCheckMinPendingRequests() {
        UnitTestBase.updateWithRequiredOptions(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, "100", ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, "101");
        config = ComputerContext.instance().config();
        TransportConf wrapConfig = TransportConf.wrapConfig(config);
        wrapConfig.getClass();
        Assert.assertThrows(IllegalArgumentException.class, wrapConfig::minPendingRequests);
    }

    @Test
    public void testSessionActive() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        NettyTransportClient oneClient = oneClient();
        Assert.assertFalse(oneClient.sessionActive());
        CompletableFuture startSessionAsync = oneClient.startSessionAsync();
        Assert.assertFalse(oneClient.sessionActive());
        startSessionAsync.get(5L, TimeUnit.SECONDS);
        Assert.assertTrue(oneClient.sessionActive());
        CompletableFuture finishSessionAsync = oneClient.finishSessionAsync();
        Assert.assertTrue(oneClient.sessionActive());
        finishSessionAsync.get(5L, TimeUnit.SECONDS);
        Assert.assertFalse(oneClient.sessionActive());
        oneClient.close();
        Assert.assertFalse(oneClient.sessionActive());
    }
}
