package com.baidu.hugegraph.computer.core.worker;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.master.MasterService;
import com.baidu.hugegraph.computer.core.output.LimitedLogOutput;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.util.Log;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.class */
public class WorkerServiceTest extends UnitTestBase {
    private static final Logger LOG = Log.logger(WorkerServiceTest.class);

    @Test
    public void testServiceWith1Worker() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Throwable[] thArr = new Throwable[2];
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090", ComputerOptions.JOB_ID, "local_002", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.TRANSPORT_SERVER_PORT, "8086", ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName(), ComputerOptions.OUTPUT_CLASS, LimitedLogOutput.class.getName());
            MockWorkerService mockWorkerService = new MockWorkerService();
            try {
                try {
                    mockWorkerService.init(updateWithRequiredOptions);
                    mockWorkerService.execute();
                    mockWorkerService.close();
                    try {
                        mockWorkerService.close();
                    } catch (Throwable th) {
                        Assert.fail(th.getMessage());
                    }
                    countDownLatch.countDown();
                } catch (Throwable th2) {
                    LOG.error("Failed to start worker", th2);
                    thArr[0] = th2;
                    mockWorkerService.close();
                    try {
                        mockWorkerService.close();
                    } catch (Throwable th3) {
                        Assert.fail(th3.getMessage());
                    }
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                mockWorkerService.close();
                try {
                    mockWorkerService.close();
                } catch (Throwable th5) {
                    Assert.fail(th5.getMessage());
                }
                countDownLatch.countDown();
                throw th4;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_SERVER_HOST, "localhost", RpcOptions.RPC_SERVER_PORT, "8090", ComputerOptions.JOB_ID, "local_002", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, MockMasterComputation.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            MasterService masterService = new MasterService();
            try {
                try {
                    masterService.init(updateWithRequiredOptions);
                    masterService.execute();
                    masterService.close();
                    try {
                        masterService.close();
                    } catch (Throwable th) {
                        Assert.fail(th.getMessage());
                    }
                    countDownLatch.countDown();
                } catch (Throwable th2) {
                    LOG.error("Failed to start master", th2);
                    thArr[1] = th2;
                    masterService.close();
                    try {
                        masterService.close();
                    } catch (Throwable th3) {
                        Assert.fail(th3.getMessage());
                    }
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                masterService.close();
                try {
                    masterService.close();
                } catch (Throwable th5) {
                    Assert.fail(th5.getMessage());
                }
                countDownLatch.countDown();
                throw th4;
            }
        });
        countDownLatch.await();
        newFixedThreadPool.shutdownNow();
        Assert.assertFalse(Arrays.asList(thArr).toString(), existError(thArr));
    }

    @Test
    public void testServiceWith2Workers() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Throwable[] thArr = new Throwable[3];
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090", ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.TRANSPORT_SERVER_PORT, "8086", ComputerOptions.WORKER_DATA_DIRS, "[job_8086]", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            MockWorkerService mockWorkerService = new MockWorkerService();
            try {
                try {
                    mockWorkerService.init(updateWithRequiredOptions);
                    mockWorkerService.execute();
                    mockWorkerService.close();
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    LOG.error("Failed to start worker", th);
                    thArr[0] = th;
                    mockWorkerService.close();
                    countDownLatch.countDown();
                }
            } catch (Throwable th2) {
                mockWorkerService.close();
                countDownLatch.countDown();
                throw th2;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090", ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.TRANSPORT_SERVER_PORT, "8087", ComputerOptions.WORKER_DATA_DIRS, "[job_8087]", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            MockWorkerService mockWorkerService = new MockWorkerService();
            try {
                try {
                    mockWorkerService.init(updateWithRequiredOptions);
                    mockWorkerService.execute();
                    mockWorkerService.close();
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    LOG.error("Failed to start worker", th);
                    thArr[1] = th;
                    mockWorkerService.close();
                    countDownLatch.countDown();
                }
            } catch (Throwable th2) {
                mockWorkerService.close();
                countDownLatch.countDown();
                throw th2;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_SERVER_HOST, "localhost", RpcOptions.RPC_SERVER_PORT, "8090", ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, MockMasterComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            MasterService masterService = new MasterService();
            try {
                try {
                    masterService.init(updateWithRequiredOptions);
                    masterService.execute();
                    masterService.close();
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    LOG.error("Failed to start master", th);
                    thArr[2] = th;
                    masterService.close();
                    countDownLatch.countDown();
                }
            } catch (Throwable th2) {
                masterService.close();
                countDownLatch.countDown();
                throw th2;
            }
        });
        countDownLatch.await();
        newFixedThreadPool.shutdownNow();
        Assert.assertFalse(Arrays.asList(thArr).toString(), existError(thArr));
    }

    @Test
    public void testFailToConnectEtcd() {
        Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_REMOTE_URL, "127.0.0.1:8090", ComputerOptions.BSP_ETCD_ENDPOINTS, "http://abc:8098", ComputerOptions.JOB_ID, "local_004", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation.class.getName());
        MockWorkerService mockWorkerService = new MockWorkerService();
        Assert.assertThrows(ComputerException.class, () -> {
            mockWorkerService.init(updateWithRequiredOptions);
            try {
                mockWorkerService.execute();
            } finally {
                mockWorkerService.close();
            }
        }, th -> {
            Assert.assertContains("Error while getting with key='BSP_MASTER_INIT_DONE'", th.getMessage());
            Assert.assertContains("UNAVAILABLE: unresolved address", th.getCause().getMessage());
        });
    }

    @Test
    public void testDataTransportManagerFail() throws InterruptedException {
    }
}
