package alluxio.job.master;

import alluxio.conf.PropertyKey;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.job.JobConfig;
import alluxio.job.SleepJobConfig;
import alluxio.job.plan.PlanDefinitionRegistryRule;
import alluxio.job.plan.SleepPlanDefinition;
import alluxio.job.util.JobTestUtils;
import alluxio.job.wire.JobInfo;
import alluxio.job.wire.JobWorkerHealth;
import alluxio.job.wire.Status;
import alluxio.job.workflow.composite.CompositeConfig;
import alluxio.master.LocalAlluxioJobCluster;
import alluxio.master.job.JobMaster;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerInfo;
import alluxio.worker.JobWorkerProcess;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:alluxio/job/master/JobMasterIntegrationTest.class */
public final class JobMasterIntegrationTest extends BaseIntegrationTest {
    private static final long WORKER_TIMEOUT_MS = 2000;
    private static final long LOST_WORKER_INTERVAL_MS = 2000;
    private JobMaster mJobMaster;
    private JobWorkerProcess mJobWorker;
    private LocalAlluxioJobCluster mLocalAlluxioJobCluster;

    @Rule
    public LocalAlluxioClusterResource mLocalAlluxioClusterResource = new LocalAlluxioClusterResource.Builder().setProperty(PropertyKey.JOB_MASTER_WORKER_HEARTBEAT_INTERVAL, 20).setProperty(PropertyKey.JOB_MASTER_WORKER_TIMEOUT, 2000L).setProperty(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL, 2000L).build();

    @Rule
    public PlanDefinitionRegistryRule mJobRule = new PlanDefinitionRegistryRule(SleepJobConfig.class, new SleepPlanDefinition());

    @Before
    public void before() throws Exception {
        this.mLocalAlluxioJobCluster = new LocalAlluxioJobCluster();
        this.mLocalAlluxioJobCluster.start();
        this.mJobMaster = this.mLocalAlluxioJobCluster.getMaster().getJobMaster();
        this.mJobWorker = this.mLocalAlluxioJobCluster.getWorker();
    }

    @After
    public void after() throws Exception {
        this.mLocalAlluxioJobCluster.stop();
    }

    @Test
    public void multipleTasksPerWorker() throws Exception {
        long run = this.mJobMaster.run(new SleepJobConfig(1L, 2));
        Assert.assertEquals(2L, this.mJobMaster.getStatus(run).getChildren().size());
        JobTestUtils.waitForJobStatus(this.mJobMaster, run, Status.COMPLETED);
        Assert.assertEquals(2L, this.mJobMaster.getStatus(run).getChildren().size());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams = {"alluxio.job.master.job.capacity", "1", "alluxio.job.master.finished.job.retention.time", "0"})
    public void flowControl() throws Exception {
        for (int i = 0; i < 10; i++) {
            while (true) {
                try {
                    this.mJobMaster.run(new SleepJobConfig(100L));
                    break;
                } catch (ResourceExhaustedException e) {
                    CommonUtils.sleepMs(100L);
                }
            }
        }
    }

    @Test
    public void restartMasterAndLoseWorker() throws Exception {
        JobTestUtils.waitForJobStatus(this.mJobMaster, this.mJobMaster.run(new SleepJobConfig(1L)), Status.COMPLETED);
        this.mJobMaster.stop();
        this.mJobMaster.start(true);
        CommonUtils.waitFor("Worker to register with restarted job master", () -> {
            return Boolean.valueOf(!this.mJobMaster.getWorkerInfoList().isEmpty());
        }, WaitForOptions.defaults().setTimeoutMs(10000L));
        this.mJobWorker.stop();
        CommonUtils.sleepMs(4000L);
        Assert.assertTrue(this.mJobMaster.getWorkerInfoList().isEmpty());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams = {"alluxio.job.master.lost.worker.interval", "10000000"})
    public void restartMasterAndReregisterWorker() throws Exception {
        JobTestUtils.waitForJobStatus(this.mJobMaster, this.mJobMaster.run(new SleepJobConfig(1L)), Status.COMPLETED);
        this.mJobMaster.stop();
        this.mJobMaster.start(true);
        CommonUtils.waitFor("Worker to register with restarted job master", () -> {
            return Boolean.valueOf(!this.mJobMaster.getWorkerInfoList().isEmpty());
        }, WaitForOptions.defaults().setTimeoutMs(10000L));
        long id = ((WorkerInfo) this.mJobMaster.getWorkerInfoList().get(0)).getId();
        this.mLocalAlluxioJobCluster.restartWorker();
        CommonUtils.waitFor("Restarted worker to register with job master", () -> {
            List workerInfoList = this.mJobMaster.getWorkerInfoList();
            return Boolean.valueOf((workerInfoList.isEmpty() || ((WorkerInfo) workerInfoList.get(0)).getId() == id) ? false : true);
        }, WaitForOptions.defaults().setTimeoutMs(10000L));
        Assert.assertEquals(1L, this.mJobMaster.getWorkerInfoList().size());
    }

    @Test
    public void getAllWorkerHealth() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        CommonUtils.waitFor("allWorkerHealth", () -> {
            List allWorkerHealth = this.mJobMaster.getAllWorkerHealth();
            atomicReference.set(allWorkerHealth);
            return Boolean.valueOf(allWorkerHealth.size() == 1);
        });
        Assert.assertNotNull(((JobWorkerHealth) ((List) atomicReference.get()).get(0)).getHostname());
        Assert.assertEquals(3L, r0.getLoadAverage().size());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams = {"alluxio.job.master.job.capacity", "20"})
    public void stopJobWorkerTasks() throws Exception {
        long run = this.mJobMaster.run(new SleepJobConfig(5000L));
        long run2 = this.mJobMaster.run(new SleepJobConfig(5000L));
        long run3 = this.mJobMaster.run(new SleepJobConfig(1L));
        long run4 = this.mJobMaster.run(new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, run3, Status.COMPLETED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, run4, Status.COMPLETED);
        Assert.assertFalse(this.mJobMaster.getStatus(run2).getStatus().isFinished());
        Assert.assertFalse(this.mJobMaster.getStatus(run).getStatus().isFinished());
        Assert.assertEquals(2L, ((JobWorkerHealth) this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
        this.mJobMaster.setTaskPoolSize(0);
        long run5 = this.mJobMaster.run(new SleepJobConfig(1L));
        CommonUtils.sleepMs(300L);
        Assert.assertFalse(this.mJobMaster.getStatus(run5).getStatus().isFinished());
        Assert.assertEquals(0L, ((JobWorkerHealth) this.mJobMaster.getAllWorkerHealth().get(0)).getTaskPoolSize());
        Assert.assertEquals(2L, ((JobWorkerHealth) this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
    }

    @Test
    @LocalAlluxioClusterResource.Config(confParams = {"alluxio.job.master.job.capacity", "20"})
    public void throttleJobWorkerTasks() throws Exception {
        this.mJobMaster.setTaskPoolSize(1);
        long run = this.mJobMaster.run(new SleepJobConfig(1L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, run, Sets.newHashSet(new Status[]{Status.RUNNING, Status.COMPLETED}));
        long run2 = this.mJobMaster.run(new SleepJobConfig(50000L));
        JobTestUtils.waitForJobStatus(this.mJobMaster, run2, Status.RUNNING);
        JobTestUtils.waitForJobStatus(this.mJobMaster, run, Status.COMPLETED);
        long run3 = this.mJobMaster.run(new SleepJobConfig(1L));
        long run4 = this.mJobMaster.run(new SleepJobConfig(1L));
        CommonUtils.sleepMs(300L);
        Assert.assertEquals(Status.RUNNING, this.mJobMaster.getStatus(run2).getStatus());
        Assert.assertEquals(Status.CREATED, this.mJobMaster.getStatus(run3).getStatus());
        Assert.assertEquals(Status.CREATED, this.mJobMaster.getStatus(run4).getStatus());
        Assert.assertEquals(1L, ((JobWorkerHealth) this.mJobMaster.getAllWorkerHealth().get(0)).getTaskPoolSize());
        Assert.assertEquals(1L, ((JobWorkerHealth) this.mJobMaster.getAllWorkerHealth().get(0)).getNumActiveTasks());
        this.mJobMaster.cancel(run2);
        JobTestUtils.waitForJobStatus(this.mJobMaster, run3, Status.COMPLETED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, run4, Status.COMPLETED);
    }

    @Test
    public void cancel() throws Exception {
        long run = this.mJobMaster.run(new CompositeConfig(Lists.newArrayList(new JobConfig[]{new SleepJobConfig(50000L), new SleepJobConfig(45000L), new SleepJobConfig(40000L)}), false));
        List children = this.mJobMaster.getStatus(run).getChildren();
        Assert.assertEquals(3L, children.size());
        long id = ((JobInfo) children.get(0)).getId();
        long id2 = ((JobInfo) children.get(1)).getId();
        long id3 = ((JobInfo) children.get(2)).getId();
        this.mJobMaster.cancel(run);
        JobTestUtils.waitForJobStatus(this.mJobMaster, run, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, id, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, id2, Status.CANCELED);
        JobTestUtils.waitForJobStatus(this.mJobMaster, id3, Status.CANCELED);
    }
}
