package com.baidu.hugegraph.computer.suite.integrate;

import com.baidu.hugegraph.computer.algorithm.centrality.pagerank.PageRankParams;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.master.MasterService;
import com.baidu.hugegraph.computer.core.network.connection.ConnectionManager;
import com.baidu.hugegraph.computer.core.network.netty.NettyTransportClient;
import com.baidu.hugegraph.computer.core.network.session.ClientSession;
import com.baidu.hugegraph.computer.core.util.ComputerContextUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerService;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.testutil.Whitebox;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.class */
public class SenderIntegrateTest {
    private static final Class<?> COMPUTATION = MockComputation.class;

    /* loaded from: input_file:com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest$OptionsBuilder.class */
    private static class OptionsBuilder {
        private final List<String> options = new ArrayList();

        public static OptionsBuilder newInstance() {
            return new OptionsBuilder();
        }

        public String[] build() {
            return (String[]) this.options.toArray(new String[0]);
        }

        public OptionsBuilder withJobId(String str) {
            this.options.add(ComputerOptions.JOB_ID.name());
            this.options.add(str);
            return this;
        }

        public OptionsBuilder withAlgorithm(Class<?> cls) {
            this.options.add(ComputerOptions.ALGORITHM_PARAMS_CLASS.name());
            this.options.add(cls.getName());
            return this;
        }

        public OptionsBuilder withResultClass(Class<?> cls) {
            this.options.add(ComputerOptions.ALGORITHM_RESULT_CLASS.name());
            this.options.add(cls.getName());
            return this;
        }

        public OptionsBuilder withMessageClass(Class<?> cls) {
            this.options.add(ComputerOptions.ALGORITHM_MESSAGE_CLASS.name());
            this.options.add(cls.getName());
            return this;
        }

        public OptionsBuilder withResultName(String str) {
            this.options.add(ComputerOptions.OUTPUT_RESULT_NAME.name());
            this.options.add(str);
            return this;
        }

        public OptionsBuilder withMaxSuperStep(int i) {
            this.options.add(ComputerOptions.BSP_MAX_SUPER_STEP.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withComputationClass(Class<?> cls) {
            this.options.add(ComputerOptions.WORKER_COMPUTATION_CLASS.name());
            this.options.add(cls.getName());
            return this;
        }

        public OptionsBuilder withWorkerCount(int i) {
            this.options.add(ComputerOptions.JOB_WORKERS_COUNT.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withPartitionCount(int i) {
            this.options.add(ComputerOptions.JOB_PARTITIONS_COUNT.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withBufferThreshold(int i) {
            this.options.add(ComputerOptions.WORKER_WRITE_BUFFER_THRESHOLD.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withBufferCapacity(int i) {
            this.options.add(ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withTransoprtServerPort(int i) {
            this.options.add(ComputerOptions.TRANSPORT_SERVER_PORT.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withWriteBufferHighMark(int i) {
            this.options.add(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withWriteBufferLowMark(int i) {
            this.options.add(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withRpcServerHost(String str) {
            this.options.add(RpcOptions.RPC_SERVER_HOST.name());
            this.options.add(str);
            return this;
        }

        public OptionsBuilder withRpcServerPort(int i) {
            this.options.add(RpcOptions.RPC_SERVER_PORT.name());
            this.options.add(String.valueOf(i));
            return this;
        }

        public OptionsBuilder withRpcServerRemote(String str) {
            this.options.add(RpcOptions.RPC_REMOTE_URL.name());
            this.options.add(str);
            return this;
        }

        public OptionsBuilder withDataDirs(String str) {
            this.options.add(ComputerOptions.WORKER_DATA_DIRS.name());
            this.options.add(String.valueOf(str));
            return this;
        }
    }

    @BeforeClass
    public static void init() {
    }

    @AfterClass
    public static void clear() {
    }

    @Test
    public void testOneWorker() throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                MasterService initMaster = initMaster(OptionsBuilder.newInstance().withJobId("local_002").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(1).withBufferThreshold(50).withBufferCapacity(60).withRpcServerHost("127.0.0.1").withRpcServerPort(8090).build());
                Throwable th = null;
                try {
                    try {
                        initMaster.execute();
                        if (initMaster != null) {
                            if (0 != 0) {
                                try {
                                    initMaster.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                initMaster.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                WorkerService initWorker = initWorker(OptionsBuilder.newInstance().withJobId("local_002").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(1).withBufferThreshold(50).withBufferCapacity(60).withTransoprtServerPort(8091).withRpcServerRemote("127.0.0.1:8090").build());
                Throwable th = null;
                try {
                    try {
                        initWorker.execute();
                        if (initWorker != null) {
                            if (0 != 0) {
                                try {
                                    initWorker.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                initWorker.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        thread.start();
        thread2.start();
        thread2.join();
        thread.join();
    }

    @Test
    public void testMultiWorkers() throws InterruptedException {
        int i = 3;
        int i2 = 5;
        Thread thread = new Thread(() -> {
            try {
                MasterService initMaster = initMaster(OptionsBuilder.newInstance().withJobId("local_003").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(i).withPartitionCount(i2).withRpcServerHost("127.0.0.1").withRpcServerPort(8090).build());
                initMaster.execute();
                initMaster.close();
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        ArrayList arrayList = new ArrayList(3);
        for (int i3 = 1; i3 <= 3; i3++) {
            int i4 = 8090 + i3;
            String str = "[jobs-" + i3 + "]";
            arrayList.add(new Thread(() -> {
                try {
                    WorkerService initWorker = initWorker(OptionsBuilder.newInstance().withJobId("local_003").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(i).withPartitionCount(i2).withTransoprtServerPort(i4).withRpcServerRemote("127.0.0.1:8090").withDataDirs(str).build());
                    initWorker.execute();
                    initWorker.close();
                } catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
            }));
        }
        thread.start();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        thread.join();
    }

    @Test
    public void testOneWorkerWithBusyClient() throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                MasterService initMaster = initMaster(OptionsBuilder.newInstance().withJobId("local_002").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(1).withWriteBufferHighMark(10).withWriteBufferLowMark(5).withRpcServerHost("127.0.0.1").withRpcServerPort(8090).build());
                Throwable th = null;
                try {
                    try {
                        initMaster.execute();
                        if (initMaster != null) {
                            if (0 != 0) {
                                try {
                                    initMaster.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                initMaster.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                WorkerService initWorker = initWorker(OptionsBuilder.newInstance().withJobId("local_002").withAlgorithm(PageRankParams.class).withResultName("rank").withResultClass(DoubleValue.class).withMessageClass(DoubleValue.class).withMaxSuperStep(3).withComputationClass(COMPUTATION).withWorkerCount(1).withTransoprtServerPort(8091).withWriteBufferHighMark(20).withWriteBufferLowMark(10).withRpcServerRemote("127.0.0.1:8090").build());
                Throwable th = null;
                try {
                    try {
                        slowSendFunc(initWorker);
                        initWorker.execute();
                        if (initWorker != null) {
                            if (0 != 0) {
                                try {
                                    initWorker.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                initWorker.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (Exception e) {
                Assert.fail(e.getMessage());
            }
        });
        thread.start();
        thread2.start();
        thread2.join();
        thread.join();
    }

    private void slowSendFunc(WorkerService workerService) throws TransportException {
        NettyTransportClient orCreateClient = ((ConnectionManager) Whitebox.getInternalState(((Managers) Whitebox.getInternalState(workerService, "managers")).get("data_client"), "connManager")).getOrCreateClient("127.0.0.1", 8091);
        ClientSession clientSession = (ClientSession) Whitebox.invoke(orCreateClient.getClass(), "clientSession", orCreateClient, new Object[0]);
        Function function = (Function) Whitebox.getInternalState(clientSession, "sendFunction");
        Whitebox.setInternalState(clientSession, "sendFunction", message -> {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return (Future) function.apply(message);
        });
    }

    private MasterService initMaster(String[] strArr) {
        Config initContext = ComputerContextUtil.initContext(ComputerContextUtil.convertToMap(strArr));
        MasterService masterService = new MasterService();
        masterService.init(initContext);
        return masterService;
    }

    private WorkerService initWorker(String[] strArr) {
        Config initContext = ComputerContextUtil.initContext(ComputerContextUtil.convertToMap(strArr));
        WorkerService workerService = new WorkerService();
        workerService.init(initContext);
        return workerService;
    }
}
