package com.baidu.hugegraph.computer.core.network.session;

import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.network.TransportState;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.network.netty.AbstractNetworkTest;
import com.baidu.hugegraph.computer.core.util.StringEncoding;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/network/session/TransportSessionTest.class */
public class TransportSessionTest extends AbstractNetworkTest {
    private static final Logger LOG = Log.logger(TransportSessionTest.class);
    public static final String TASK_SCHEDULER = "task-scheduler-%d";

    @Override // com.baidu.hugegraph.computer.core.network.netty.AbstractNetworkTest
    protected void initOption() {
        super.updateOption(ComputerOptions.TRANSPORT_SYNC_REQUEST_TIMEOUT, 5000L);
    }

    @Test
    public void testConstruct() {
        ServerSession serverSession = new ServerSession(conf);
        Assert.assertEquals(TransportState.READY, serverSession.state());
        Assert.assertEquals(-1L, serverSession.maxRequestId);
        Assert.assertEquals(-1L, serverSession.maxAckId);
        Assert.assertEquals(-1L, serverSession.finishId);
        Assert.assertEquals(-1, Whitebox.getInternalState(serverSession, "maxHandledId"));
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        Assert.assertEquals(-1L, clientSession.maxRequestId);
        Assert.assertEquals(-1L, clientSession.maxAckId);
        Assert.assertEquals(-1L, clientSession.finishId);
        Assert.assertFalse(clientSession.flowBlocking());
    }

    @Test
    public void testStart() throws TransportException, InterruptedException {
        ScheduledExecutorService newScheduledThreadPool = ExecutorUtil.newScheduledThreadPool(1, TASK_SCHEDULER);
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        syncStartWithAutoComplete(newScheduledThreadPool, clientSession);
        newScheduledThreadPool.shutdown();
    }

    @Test
    public void testFinish() throws TransportException, InterruptedException {
        ScheduledExecutorService newScheduledThreadPool = ExecutorUtil.newScheduledThreadPool(1, TASK_SCHEDULER);
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        syncStartWithAutoComplete(newScheduledThreadPool, clientSession);
        syncFinishWithAutoComplete(newScheduledThreadPool, clientSession, 1);
        newScheduledThreadPool.shutdown();
    }

    @Test
    public void testStartWithException() throws InterruptedException, TransportException {
        ScheduledExecutorService newScheduledThreadPool = ExecutorUtil.newScheduledThreadPool(1, TASK_SCHEDULER);
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        syncStartWithAutoComplete(newScheduledThreadPool, clientSession);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            clientSession.start(conf.timeoutSyncRequest());
        }, th -> {
            Assert.assertContains("The state must be READY instead of ESTABLISHED at startAsync()", th.getMessage());
        });
    }

    @Test
    public void testSendAsyncWithException() {
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        ByteBuffer wrap = ByteBuffer.wrap(StringEncoding.encode("test data"));
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            clientSession.sendAsync(MessageType.MSG, 1, wrap);
        }, th -> {
            Assert.assertContains("The state must be ESTABLISHED instead of READY at sendAsync()", th.getMessage());
        });
    }

    @Test
    public void testFinishWithException() throws InterruptedException, TransportException {
        ScheduledExecutorService newScheduledThreadPool = ExecutorUtil.newScheduledThreadPool(1, TASK_SCHEDULER);
        ClientSession clientSession = new ClientSession(conf, message -> {
            return null;
        });
        Assert.assertEquals(TransportState.READY, clientSession.state());
        syncStartWithAutoComplete(newScheduledThreadPool, clientSession);
        syncFinishWithAutoComplete(newScheduledThreadPool, clientSession, 1);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            clientSession.finish(conf.timeoutFinishSession());
        }, th -> {
            Assert.assertContains("The state must be ESTABLISHED instead of READY at finishAsync()", th.getMessage());
        });
    }

    @Test
    public void testServerSession() {
        ServerSession serverSession = new ServerSession(conf);
        Assert.assertEquals(TransportState.READY, serverSession.state());
        serverSession.onRecvStateStart();
        Assert.assertEquals(TransportState.START_RECV, serverSession.state());
        serverSession.completeStateStart();
        Assert.assertEquals(TransportState.ESTABLISHED, serverSession.state());
        Assert.assertEquals(0L, serverSession.maxAckId);
        Assert.assertEquals(0, Whitebox.getInternalState(serverSession, "maxHandledId"));
        for (int i = 1; i < 100; i++) {
            serverSession.onRecvData(i);
            Assert.assertEquals(i, serverSession.maxRequestId);
            serverSession.onHandledData(i);
            Assert.assertEquals(i, Whitebox.getInternalState(serverSession, "maxHandledId"));
            serverSession.onDataAckSent(i);
            Assert.assertEquals(i, serverSession.maxAckId);
        }
        serverSession.onRecvStateFinish(100);
        Assert.assertEquals(TransportState.FINISH_RECV, serverSession.state());
        serverSession.completeStateFinish();
        Assert.assertEquals(TransportState.READY, serverSession.state());
    }

    @Test
    public void testServerSessionWithException() {
        ServerSession serverSession = new ServerSession(conf);
        Assert.assertEquals(TransportState.READY, serverSession.state());
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            serverSession.onRecvData(1);
        }, th -> {
            Assert.assertContains("The state must be ESTABLISHED instead of READY", th.getMessage());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            serverSession.onRecvStateFinish(1);
        }, th2 -> {
            Assert.assertContains("The state must be ESTABLISHED instead of READY", th2.getMessage());
        });
        serverSession.onRecvStateStart();
        serverSession.completeStateStart();
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            serverSession.onRecvData(2);
        }, th3 -> {
            Assert.assertContains("The requestId must be increasing", th3.getMessage());
        });
        serverSession.onRecvData(1);
        serverSession.onHandledData(1);
        serverSession.onDataAckSent(1);
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            serverSession.onDataAckSent(1);
        }, th4 -> {
            Assert.assertContains("The ackId must be increasing", th4.getMessage());
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            serverSession.onRecvStateFinish(1);
        }, th5 -> {
            Assert.assertContains("The finishId must be maxRequestId + 1", th5.getMessage());
        });
    }

    @Test
    public void testCheckFinishReady() {
        ServerSession serverSession = new ServerSession(conf);
        Assert.assertEquals(TransportState.READY, serverSession.state());
        serverSession.onRecvStateStart();
        serverSession.completeStateStart();
        Assert.assertFalse(((Boolean) Whitebox.invoke(serverSession.getClass(), "needAckFinish", serverSession, new Object[0])).booleanValue());
        serverSession.onRecvStateFinish(1);
        serverSession.completeStateFinish();
    }

    private void syncStartWithAutoComplete(ScheduledExecutorService scheduledExecutorService, ClientSession clientSession) throws TransportException, InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        scheduledExecutorService.schedule(() -> {
            Assert.assertEquals(TransportState.START_SENT, clientSession.state());
            try {
                clientSession.onRecvAck(0);
            } catch (Throwable th) {
                LOG.error("Failed to call receiveAck", th);
                copyOnWriteArrayList.add(th);
            }
        }, 2L, TimeUnit.SECONDS);
        clientSession.start(conf.timeoutSyncRequest());
        Assert.assertEquals(TransportState.ESTABLISHED, clientSession.state());
        Assert.assertEquals(0L, clientSession.maxRequestId);
        Assert.assertEquals(0L, clientSession.maxAckId);
        Assert.assertEquals(-1L, clientSession.finishId);
        Assert.assertFalse(existError(copyOnWriteArrayList));
    }

    private void syncFinishWithAutoComplete(ScheduledExecutorService scheduledExecutorService, ClientSession clientSession, int i) throws InterruptedException, TransportException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        scheduledExecutorService.schedule(() -> {
            Assert.assertEquals(TransportState.FINISH_SENT, clientSession.state());
            try {
                clientSession.onRecvAck(i);
            } catch (Throwable th) {
                LOG.error("Failed to call receiveAck", th);
                copyOnWriteArrayList.add(th);
            }
        }, 2L, TimeUnit.SECONDS);
        clientSession.finish(conf.timeoutFinishSession());
        Assert.assertEquals(TransportState.READY, clientSession.state());
        Assert.assertEquals(-1L, clientSession.finishId);
        Assert.assertEquals(-1L, clientSession.maxAckId);
        Assert.assertEquals(-1L, clientSession.maxRequestId);
        Assert.assertFalse(clientSession.flowBlocking());
        Assert.assertFalse(existError(copyOnWriteArrayList));
    }

    private boolean existError(List<Throwable> list) {
        boolean z = false;
        Iterator<Throwable> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next() != null) {
                z = true;
                break;
            }
        }
        return z;
    }
}
