package co.cask.cdap.internal.app.services.http.handlers;

import co.cask.cdap.AllProgramsApp;
import co.cask.cdap.AppWithSchedule;
import co.cask.cdap.AppWithStreamSizeSchedule;
import co.cask.cdap.ConcurrentWorkflowApp;
import co.cask.cdap.ConditionalWorkflowApp;
import co.cask.cdap.PauseResumeWorklowApp;
import co.cask.cdap.WorkflowAppWithErrorRuns;
import co.cask.cdap.WorkflowAppWithFork;
import co.cask.cdap.WorkflowAppWithScopedParameters;
import co.cask.cdap.api.schedule.ScheduleSpecification;
import co.cask.cdap.api.workflow.WorkflowActionNode;
import co.cask.cdap.api.workflow.WorkflowActionSpecification;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.config.PreferencesStore;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingRuntimeFileSet;
import co.cask.cdap.internal.app.services.http.AppFabricTestBase;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.RunRecord;
import co.cask.cdap.proto.ScheduledRuntime;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.codec.ScheduleSpecificationCodec;
import co.cask.cdap.proto.codec.WorkflowActionSpecificationCodec;
import co.cask.cdap.test.XSlowTests;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

/* loaded from: input_file:co/cask/cdap/internal/app/services/http/handlers/WorkflowHttpHandlerTest.class */
public class WorkflowHttpHandlerTest extends AppFabricTestBase {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(ScheduleSpecification.class, new ScheduleSpecificationCodec()).registerTypeAdapter(WorkflowActionSpecification.class, new WorkflowActionSpecificationCodec()).create();
    protected static final Type LIST_WORKFLOWACTIONNODE_TYPE = new TypeToken<List<WorkflowActionNode>>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.1
    }.getType();

    private String getRunsUrl(String str, String str2, String str3, String str4) {
        return getVersionedAPIPath(String.format("apps/%s/workflows/%s/runs?status=%s", str2, str3, str4), "v3", str);
    }

    private void verifyRunningProgramCount(final Id.Program program, final String str, int i) throws Exception {
        Tasks.waitFor(Integer.valueOf(i), new Callable<Integer>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                return WorkflowHttpHandlerTest.this.runningProgramCount(program, str);
            }
        }, 10L, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Integer runningProgramCount(Id.Program program, String str) throws Exception {
        HttpResponse doGet = doGet(getVersionedAPIPath(String.format("apps/%s/workflows/%s/%s/current", program.getApplicationId(), program.getId(), str), program.getNamespaceId()));
        if (doGet.getStatusLine().getStatusCode() == 200) {
            return Integer.valueOf(((List) GSON.fromJson(EntityUtils.toString(doGet.getEntity()), LIST_WORKFLOWACTIONNODE_TYPE)).size());
        }
        return null;
    }

    private void suspendWorkflow(Id.Program program, String str, int i) throws Exception {
        Assert.assertEquals(i, doPost(getVersionedAPIPath(String.format("apps/%s/workflows/%s/runs/%s/suspend", program.getApplicationId(), program.getId(), str), "v3", program.getNamespaceId())).getStatusLine().getStatusCode());
    }

    /* JADX WARN: Type inference failed for: r2v0, types: [co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest$3] */
    /* JADX WARN: Type inference failed for: r2v5, types: [co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest$4] */
    private void setAndTestRuntimeArgs(Id.Program program, Map<String, String> map) throws Exception {
        String json = GSON.toJson(map, new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.3
        }.getType());
        String versionedAPIPath = getVersionedAPIPath(String.format("apps/%s/workflows/%s/runtimeargs", program.getApplicationId(), program.getId()), "v3", program.getNamespaceId());
        Assert.assertEquals(200L, doPut(versionedAPIPath, json).getStatusLine().getStatusCode());
        HttpResponse doGet = doGet(versionedAPIPath);
        Assert.assertEquals(200L, doGet.getStatusLine().getStatusCode());
        Assert.assertEquals(map.size(), ((Map) GSON.fromJson(EntityUtils.toString(doGet.getEntity()), new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.4
        }.getType())).size());
    }

    private void resumeWorkflow(Id.Program program, String str, int i) throws Exception {
        Assert.assertEquals(i, doPost(getVersionedAPIPath(String.format("apps/%s/workflows/%s/runs/%s/resume", program.getApplicationId(), program.getId(), str), "v3", program.getNamespaceId())).getStatusLine().getStatusCode());
    }

    private HttpResponse getWorkflowCurrentStatus(Id.Program program, String str) throws Exception {
        return doGet(getVersionedAPIPath(String.format("apps/%s/workflows/%s/%s/current", program.getApplicationId(), program.getId(), str), "v3", program.getNamespaceId()));
    }

    private String createInput(String str) throws IOException {
        File newFolder = tmpFolder.newFolder(str);
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(new File(newFolder.getPath() + "/words.txt")));
        try {
            bufferedWriter.write("this text has");
            bufferedWriter.newLine();
            bufferedWriter.write("two words text inside");
            bufferedWriter.close();
            return newFolder.getAbsolutePath();
        } catch (Throwable th) {
            bufferedWriter.close();
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest$5] */
    private List<ScheduledRuntime> getScheduledRunTime(Id.Program program, String str, String str2) throws Exception {
        return (List) readResponse(doGet(getVersionedAPIPath(String.format("apps/%s/workflows/%s/%s", program.getApplicationId(), program.getId(), str2), "v3", program.getNamespaceId())), new TypeToken<List<ScheduledRuntime>>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.5
        }.getType());
    }

    private String getStatusURL(String str, String str2, String str3) throws Exception {
        return getVersionedAPIPath(String.format("apps/%s/schedules/%s/status", str2, str3), "v3", str);
    }

    @Test
    public void testWorkflowPauseResume() throws Exception {
        File file = new File(tmpFolder.newFolder() + "/firstsimpleaction.file");
        File file2 = new File(tmpFolder.newFolder() + "/firstsimpleaction.file.done");
        File file3 = new File(tmpFolder.newFolder() + "/forkedsimpleaction.file");
        File file4 = new File(tmpFolder.newFolder() + "/forkedsimpleaction.file.done");
        File file5 = new File(tmpFolder.newFolder() + "/anotherforkedsimpleaction.file");
        File file6 = new File(tmpFolder.newFolder() + "/anotherforkedsimpleaction.file.done");
        File file7 = new File(tmpFolder.newFolder() + "/lastsimpleaction.file");
        File file8 = new File(tmpFolder.newFolder() + "/lastsimpleaction.file.done");
        Assert.assertEquals(200L, deploy(PauseResumeWorklowApp.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("testnamespace2", "PauseResumeWorkflowApp", ProgramType.WORKFLOW, "PauseResumeWorkflow");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("first.simple.action.file", file.getAbsolutePath());
        newHashMap.put("first.simple.action.donefile", file2.getAbsolutePath());
        newHashMap.put("forked.simple.action.file", file3.getAbsolutePath());
        newHashMap.put("forked.simple.action.donefile", file4.getAbsolutePath());
        newHashMap.put("anotherforked.simple.action.file", file5.getAbsolutePath());
        newHashMap.put("anotherforked.simple.action.donefile", file6.getAbsolutePath());
        newHashMap.put("last.simple.action.file", file7.getAbsolutePath());
        newHashMap.put("last.simple.action.donefile", file8.getAbsolutePath());
        setAndTestRuntimeArgs(from, newHashMap);
        startProgram(from, 200);
        waitState(from, "RUNNING");
        List<RunRecord> programRuns = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns.size() == 1);
        String pid = programRuns.get(0).getPid();
        while (!file.exists()) {
            TimeUnit.MILLISECONDS.sleep(50L);
        }
        verifyRunningProgramCount(from, pid, 1);
        suspendWorkflow(from, pid, 200);
        waitState(from, "SUSPENDED");
        verifyProgramRuns(from, "suspended");
        suspendWorkflow(from, pid, 409);
        file2.createNewFile();
        verifyRunningProgramCount(from, pid, 0);
        verifyProgramRuns(from, "suspended");
        resumeWorkflow(from, pid, 200);
        waitState(from, "RUNNING");
        verifyProgramRuns(from, "running");
        resumeWorkflow(from, pid, 409);
        while (true) {
            if (file3.exists() && file5.exists()) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
        verifyRunningProgramCount(from, pid, 2);
        suspendWorkflow(from, pid, 200);
        waitState(from, "SUSPENDED");
        verifyProgramRuns(from, "suspended");
        file4.createNewFile();
        file6.createNewFile();
        verifyRunningProgramCount(from, pid, 0);
        verifyProgramRuns(from, "suspended");
        Assert.assertTrue(!file7.exists());
        resumeWorkflow(from, pid, 200);
        waitState(from, "RUNNING");
        while (!file7.exists()) {
            TimeUnit.SECONDS.sleep(1L);
        }
        verifyRunningProgramCount(from, pid, 1);
        file8.createNewFile();
        verifyProgramRuns(from, "completed");
        waitState(from, "STOPPED");
    }

    @Test
    @Category({XSlowTests.class})
    public void testMultipleWorkflowInstances() throws Exception {
        File file = new File(tmpFolder.newFolder() + "/concurrentWorkflowSchedule1.file");
        File file2 = new File(tmpFolder.newFolder() + "/concurrentWorkflowSchedule2.file");
        File file3 = new File(tmpFolder.newFolder() + "/simpleaction.file.done");
        Assert.assertEquals(200L, deploy(ConcurrentWorkflowApp.class, "v3", "default").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("default", "ConcurrentWorkflowApp", ProgramType.WORKFLOW, "ConcurrentWorkflow");
        ((PreferencesStore) getInjector().getInstance(PreferencesStore.class)).setProperties("default", "ConcurrentWorkflowApp", ProgramType.WORKFLOW.getCategoryName(), "ConcurrentWorkflow", ImmutableMap.of("concurrent.runs.enabled", "true", "concurrentWorkflowSchedule1.file", file.getAbsolutePath(), "concurrentWorkflowSchedule2.file", file2.getAbsolutePath(), "done.file", file3.getAbsolutePath()));
        Assert.assertEquals(200L, resumeSchedule("default", "ConcurrentWorkflowApp", "concurrentWorkflowSchedule1"));
        Assert.assertEquals(200L, resumeSchedule("default", "ConcurrentWorkflowApp", "concurrentWorkflowSchedule2"));
        while (true) {
            if (file.exists() && file2.exists()) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
        List<RunRecord> programRuns = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns.size() >= 2);
        Iterator<ScheduleSpecification> it = getSchedules("default", "ConcurrentWorkflowApp", "ConcurrentWorkflow").iterator();
        while (it.hasNext()) {
            Assert.assertEquals(200L, suspendSchedule("default", "ConcurrentWorkflowApp", it.next().getSchedule().getName()));
        }
        HttpResponse workflowCurrentStatus = getWorkflowCurrentStatus(from, programRuns.get(0).getPid());
        Assert.assertEquals(200L, workflowCurrentStatus.getStatusLine().getStatusCode());
        List list = (List) GSON.fromJson(EntityUtils.toString(workflowCurrentStatus.getEntity()), LIST_WORKFLOWACTIONNODE_TYPE);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals("SimpleAction", ((WorkflowActionNode) list.get(0)).getProgram().getProgramName());
        HttpResponse workflowCurrentStatus2 = getWorkflowCurrentStatus(from, programRuns.get(1).getPid());
        Assert.assertEquals(200L, workflowCurrentStatus2.getStatusLine().getStatusCode());
        List list2 = (List) GSON.fromJson(EntityUtils.toString(workflowCurrentStatus2.getEntity()), LIST_WORKFLOWACTIONNODE_TYPE);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals("SimpleAction", ((WorkflowActionNode) list2.get(0)).getProgram().getProgramName());
        file3.createNewFile();
        deleteApplication(60, getVersionedAPIPath("apps/ConcurrentWorkflowApp", "v3", "default"), 200);
    }

    private void verifyFileExists(final List<File> list) throws Exception {
        Tasks.waitFor(true, new Callable<Boolean>() { // from class: co.cask.cdap.internal.app.services.http.handlers.WorkflowHttpHandlerTest.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    if (!((File) it.next()).exists()) {
                        return false;
                    }
                }
                return true;
            }
        }, 180L, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testWorkflowForkApp() throws Exception {
        HashMap newHashMap = Maps.newHashMap();
        File file = new File(tmpFolder.newFolder() + "/firstsimpleaction.file");
        File file2 = new File(tmpFolder.newFolder() + "/firstsimpleaction.file.done");
        newHashMap.put("first.simple.action.file", file.getAbsolutePath());
        newHashMap.put("first.simple.action.donefile", file2.getAbsolutePath());
        File file3 = new File(tmpFolder.newFolder() + "/onesimpleaction.file");
        File file4 = new File(tmpFolder.newFolder() + "/onesimpleaction.file.done");
        newHashMap.put("one.simple.action.file", file3.getAbsolutePath());
        newHashMap.put("one.simple.action.donefile", file4.getAbsolutePath());
        File file5 = new File(tmpFolder.newFolder() + "/anothersimpleaction.file");
        File file6 = new File(tmpFolder.newFolder() + "/anothersimpleaction.file.done");
        newHashMap.put("another.simple.action.file", file5.getAbsolutePath());
        newHashMap.put("another.simple.action.donefile", file6.getAbsolutePath());
        Assert.assertEquals(200L, deploy(WorkflowAppWithFork.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("testnamespace2", "WorkflowAppWithFork", ProgramType.WORKFLOW, "WorkflowWithFork");
        setAndTestRuntimeArgs(from, newHashMap);
        startProgram(from, 200);
        waitState(from, "RUNNING");
        List<RunRecord> programRuns = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns.size() == 1);
        String pid = programRuns.get(0).getPid();
        verifyFileExists(Lists.newArrayList(new File[]{file}));
        verifyRunningProgramCount(from, pid, 1);
        stopProgram(from, 200);
        verifyProgramRuns(from, "killed");
        file.delete();
        startProgram(from, 200);
        waitState(from, "RUNNING");
        List<RunRecord> programRuns2 = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns2.size() == 1);
        RunRecord runRecord = programRuns2.get(0);
        Assert.assertTrue(!pid.equals(runRecord.getPid()));
        String pid2 = runRecord.getPid();
        verifyFileExists(Lists.newArrayList(new File[]{file}));
        verifyRunningProgramCount(from, pid2, 1);
        file2.createNewFile();
        verifyFileExists(Lists.newArrayList(new File[]{file3, file5}));
        verifyRunningProgramCount(from, pid2, 2);
        stopProgram(from, 200);
        Assert.assertEquals(404L, getWorkflowCurrentStatus(from, pid2).getStatusLine().getStatusCode());
        verifyProgramRuns(from, "killed", 1);
        file.delete();
        file2.delete();
        file3.delete();
        file5.delete();
        startProgram(from, 200);
        waitState(from, "RUNNING");
        List<RunRecord> programRuns3 = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns3.size() == 1);
        String pid3 = programRuns3.get(0).getPid();
        verifyFileExists(Lists.newArrayList(new File[]{file}));
        verifyRunningProgramCount(from, pid3, 1);
        file2.createNewFile();
        verifyFileExists(Lists.newArrayList(new File[]{file3, file5}));
        verifyRunningProgramCount(from, pid3, 2);
        file4.createNewFile();
        file6.createNewFile();
        verifyProgramRuns(from, "completed");
    }

    @Test
    @Category({XSlowTests.class})
    public void testWorkflowScopedArguments() throws Exception {
        Assert.assertEquals(200L, deploy(WorkflowAppWithScopedParameters.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("testnamespace2", "WorkflowAppWithScopedParameters", ProgramType.WORKFLOW, "OneWorkflow");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("debug", "true");
        newHashMap.put("mapreduce.*.debug", "false");
        newHashMap.put("mapreduce.OneMR.debug", "true");
        newHashMap.put("input.path", createInput("ProgramInput"));
        newHashMap.put("mapreduce.OneMR.input.path", createInput("OneMRInput"));
        newHashMap.put("mapreduce.AnotherMR.input.path", createInput("AnotherMRInput"));
        newHashMap.put("spark.*.input.path", createInput("SparkInput"));
        newHashMap.put(AppWithMapReduceUsingRuntimeFileSet.OUTPUT_PATH, new File(tmpFolder.newFolder(), "ProgramOutput").getAbsolutePath());
        newHashMap.put("mapreduce.OneMR.output.path", new File(tmpFolder.newFolder(), "OneMROutput").getAbsolutePath());
        newHashMap.put("spark.AnotherSpark.output.path", new File(tmpFolder.newFolder(), "AnotherSparkOutput").getAbsolutePath());
        newHashMap.put("mapreduce.*.processing.time", "1HR");
        newHashMap.put("dataset.Purchase.cache.seconds", "30");
        newHashMap.put("dataset.UserProfile.schema.property", "constant");
        newHashMap.put("dataset.unknown.dataset", "false");
        newHashMap.put("dataset.*.read.timeout", "60");
        setAndTestRuntimeArgs(from, newHashMap);
        startProgram(from, 200);
        verifyProgramRuns(from, "completed");
        List<RunRecord> programRuns = getProgramRuns(from, "completed");
        List<RunRecord> programRuns2 = getProgramRuns(Id.Program.from("testnamespace2", "WorkflowAppWithScopedParameters", ProgramType.MAPREDUCE, "OneMR"), "completed");
        List<RunRecord> programRuns3 = getProgramRuns(Id.Program.from("testnamespace2", "WorkflowAppWithScopedParameters", ProgramType.MAPREDUCE, "AnotherMR"), "completed");
        List<RunRecord> programRuns4 = getProgramRuns(Id.Program.from("testnamespace2", "WorkflowAppWithScopedParameters", ProgramType.SPARK, "OneSpark"), "completed");
        List<RunRecord> programRuns5 = getProgramRuns(Id.Program.from("testnamespace2", "WorkflowAppWithScopedParameters", ProgramType.SPARK, "AnotherSpark"), "completed");
        Assert.assertEquals(1L, programRuns.size());
        Assert.assertEquals(1L, programRuns2.size());
        Assert.assertEquals(1L, programRuns3.size());
        Assert.assertEquals(1L, programRuns4.size());
        Assert.assertEquals(1L, programRuns5.size());
        Map properties = programRuns.get(0).getProperties();
        Map properties2 = programRuns2.get(0).getProperties();
        Map properties3 = programRuns3.get(0).getProperties();
        Map properties4 = programRuns4.get(0).getProperties();
        Map properties5 = programRuns5.get(0).getProperties();
        Assert.assertNotNull(properties2.get("workflowrunid"));
        Assert.assertEquals(programRuns.get(0).getPid(), properties2.get("workflowrunid"));
        Assert.assertNotNull(properties3.get("workflowrunid"));
        Assert.assertEquals(programRuns.get(0).getPid(), properties3.get("workflowrunid"));
        Assert.assertNotNull(properties4.get("workflowrunid"));
        Assert.assertEquals(programRuns.get(0).getPid(), properties4.get("workflowrunid"));
        Assert.assertNotNull(properties5.get("workflowrunid"));
        Assert.assertEquals(programRuns.get(0).getPid(), properties5.get("workflowrunid"));
        Assert.assertEquals(properties.get("0"), programRuns2.get(0).getPid());
        Assert.assertEquals(properties.get("1"), programRuns4.get(0).getPid());
        Assert.assertEquals(properties.get("2"), programRuns3.get(0).getPid());
        Assert.assertEquals(properties.get("3"), programRuns5.get(0).getPid());
    }

    @Test
    public void testWorkflowSchedules() throws Exception {
        Assert.assertEquals(200L, deploy(AppWithSchedule.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("testnamespace2", "AppWithSchedule", ProgramType.WORKFLOW, "SampleWorkflow");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("someKey", "someWorkflowValue");
        newHashMap.put("workflowKey", "workflowValue");
        setAndTestRuntimeArgs(from, newHashMap);
        List<ScheduleSpecification> schedules = getSchedules("testnamespace2", "AppWithSchedule", "SampleWorkflow");
        Assert.assertEquals(1L, schedules.size());
        String name = schedules.get(0).getSchedule().getName();
        Assert.assertNotNull(name);
        Assert.assertFalse(name.isEmpty());
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "AppWithSchedule", "SampleSchedule"));
        long currentTimeMillis = System.currentTimeMillis();
        List<ScheduledRuntime> scheduledRunTime = getScheduledRunTime(from, name, "nextruntime");
        Assert.assertTrue(scheduledRunTime.get(0).getId().contains(name));
        Assert.assertTrue(Long.valueOf(scheduledRunTime.get(0).getTime()).longValue() > currentTimeMillis);
        verifyProgramRuns(from, "completed");
        Assert.assertEquals(1L, getScheduledRunTime(from, name, "previousruntime").size());
        String statusURL = getStatusURL("testnamespace2", "AppWithSchedule", name);
        scheduleStatusCheck(5, statusURL, "SCHEDULED");
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "AppWithSchedule", name));
        scheduleStatusCheck(5, statusURL, "SUSPENDED");
        TimeUnit.SECONDS.sleep(2L);
        int size = getProgramRuns(from, "completed").size();
        TimeUnit.SECONDS.sleep(10L);
        int size2 = getProgramRuns(from, "completed").size();
        Assert.assertEquals(size, size2);
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "AppWithSchedule", name));
        verifyProgramRuns(from, "completed", size2);
        scheduleStatusCheck(5, statusURL, "SCHEDULED");
        scheduleStatusCheck(5, getStatusURL("testnamespace2", "AppWithSchedule", "invalid"), "NOT_FOUND");
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "AppWithSchedule", name));
        scheduleStatusCheck(5, statusURL, "SUSPENDED");
        scheduleStatusCheck(5, getStatusURL("testnamespace1", "AppWithSchedule", name), "NOT_FOUND");
        Assert.assertEquals(404L, suspendSchedule("testnamespace1", "AppWithSchedule", name));
        Assert.assertEquals(404L, resumeSchedule("testnamespace1", "AppWithSchedule", name));
        TimeUnit.SECONDS.sleep(2L);
    }

    @Test
    public void testStreamSizeSchedules() throws Exception {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 10000; i++) {
            sb.append("dddddddddd");
        }
        String sb2 = sb.toString();
        Assert.assertEquals(200L, deploy(AppWithStreamSizeSchedule.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "AppWithStreamSizeSchedule", "SampleSchedule1"));
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "AppWithStreamSizeSchedule", "SampleSchedule2"));
        List<ScheduleSpecification> schedules = getSchedules("testnamespace2", "AppWithStreamSizeSchedule", "SampleWorkflow");
        Assert.assertEquals(2L, schedules.size());
        String name = schedules.get(0).getSchedule().getName();
        String name2 = schedules.get(1).getSchedule().getName();
        Assert.assertNotNull(name);
        Assert.assertFalse(name.isEmpty());
        Assert.assertEquals(200L, doPut(String.format("/v3/namespaces/%s/streams/%s/properties", "testnamespace2", AllProgramsApp.STREAM_NAME), "{'notification.threshold.mb': 1}").getStatusLine().getStatusCode());
        Assert.assertEquals(1L, ((StreamProperties) new Gson().fromJson(EntityUtils.toString(doGet(String.format("/v3/namespaces/%s/streams/%s", "testnamespace2", AllProgramsApp.STREAM_NAME)).getEntity()), StreamProperties.class)).getNotificationThresholdMB().intValue());
        for (int i2 = 0; i2 < 12; i2++) {
            Assert.assertEquals(200L, doPost(String.format("/v3/namespaces/%s/streams/%s", "testnamespace2", AllProgramsApp.STREAM_NAME), sb2).getStatusLine().getStatusCode());
        }
        TimeUnit.SECONDS.sleep(10L);
        String runsUrl = getRunsUrl("testnamespace2", "AppWithStreamSizeSchedule", "SampleWorkflow", "completed");
        scheduleHistoryRuns(5, runsUrl, 0);
        String statusURL = getStatusURL("testnamespace2", "AppWithStreamSizeSchedule", name);
        String statusURL2 = getStatusURL("testnamespace2", "AppWithStreamSizeSchedule", name2);
        scheduleStatusCheck(5, statusURL, "SCHEDULED");
        scheduleStatusCheck(5, statusURL2, "SCHEDULED");
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "AppWithStreamSizeSchedule", name));
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "AppWithStreamSizeSchedule", name2));
        scheduleStatusCheck(5, statusURL, "SUSPENDED");
        scheduleStatusCheck(5, statusURL2, "SUSPENDED");
        TimeUnit.SECONDS.sleep(2L);
        int runs = getRuns(runsUrl);
        for (int i3 = 0; i3 < 12; i3++) {
            Assert.assertEquals(200L, doPost(String.format("/v3/namespaces/%s/streams/%s", "testnamespace2", AllProgramsApp.STREAM_NAME), sb2).getStatusLine().getStatusCode());
        }
        TimeUnit.SECONDS.sleep(5L);
        int runs2 = getRuns(runsUrl);
        Assert.assertEquals(runs, runs2);
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "AppWithStreamSizeSchedule", name));
        scheduleHistoryRuns(5, runsUrl, runs2);
        scheduleStatusCheck(5, statusURL, "SCHEDULED");
        scheduleStatusCheck(5, getStatusURL("testnamespace2", "AppWithStreamSizeSchedule", "invalid"), "NOT_FOUND");
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "AppWithStreamSizeSchedule", name));
        scheduleStatusCheck(5, statusURL, "SUSPENDED");
        scheduleStatusCheck(5, getStatusURL("testnamespace1", "AppWithStreamSizeSchedule", name), "NOT_FOUND");
        Assert.assertEquals(404L, suspendSchedule("testnamespace1", "AppWithStreamSizeSchedule", name));
        Assert.assertEquals(404L, resumeSchedule("testnamespace1", "AppWithStreamSizeSchedule", name));
        TimeUnit.SECONDS.sleep(2L);
    }

    @Test
    public void testWorkflowRuns() throws Exception {
        Assert.assertEquals(200L, deploy(WorkflowAppWithErrorRuns.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Assert.assertEquals(200L, resumeSchedule("testnamespace2", "WorkflowAppWithErrorRuns", "SampleSchedule"));
        Id.Program from = Id.Program.from("testnamespace2", "WorkflowAppWithErrorRuns", ProgramType.WORKFLOW, "WorkflowWithErrorRuns");
        verifyProgramRuns(from, "completed");
        ((PreferencesStore) getInjector().getInstance(PreferencesStore.class)).setProperties("testnamespace2", "WorkflowAppWithErrorRuns", ProgramType.WORKFLOW.getCategoryName(), "WorkflowWithErrorRuns", ImmutableMap.of("ThrowError", "true"));
        verifyProgramRuns(from, "failed");
        Assert.assertEquals(200L, suspendSchedule("testnamespace2", "WorkflowAppWithErrorRuns", "SampleSchedule"));
    }

    private String createConditionInput(String str, int i, int i2) throws IOException {
        File newFolder = tmpFolder.newFolder(str);
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(new File(newFolder.getPath() + "/data.txt")));
        for (int i3 = 0; i3 < i; i3++) {
            try {
                bufferedWriter.write("Afname:ALname:A:B");
                bufferedWriter.newLine();
            } finally {
                bufferedWriter.close();
            }
        }
        for (int i4 = 0; i4 < i2; i4++) {
            bufferedWriter.write("Afname ALname A B");
            bufferedWriter.newLine();
        }
        return newFolder.getAbsolutePath();
    }

    @Test
    @Category({XSlowTests.class})
    public void testWorkflowCondition() throws Exception {
        Assert.assertEquals(200L, deploy(ConditionalWorkflowApp.class, "v3", "testnamespace2").getStatusLine().getStatusCode());
        Id.Program from = Id.Program.from("testnamespace2", "ConditionalWorkflowApp", ProgramType.WORKFLOW, "ConditionalWorkflow");
        HashMap newHashMap = Maps.newHashMap();
        File file = new File(tmpFolder.newFolder() + "/iffork_one.file");
        File file2 = new File(tmpFolder.newFolder() + "/iffork_one.file.done");
        newHashMap.put("iffork_one.simple.action.file", file.getAbsolutePath());
        newHashMap.put("iffork_one.simple.action.donefile", file2.getAbsolutePath());
        File file3 = new File(tmpFolder.newFolder() + "/iffork_another.file");
        File file4 = new File(tmpFolder.newFolder() + "/iffork_another.file.done");
        newHashMap.put("iffork_another.simple.action.file", file3.getAbsolutePath());
        newHashMap.put("iffork_another.simple.action.donefile", file4.getAbsolutePath());
        File file5 = new File(tmpFolder.newFolder() + "/elsefork_one.file");
        File file6 = new File(tmpFolder.newFolder() + "/elsefork_one.file.done");
        newHashMap.put("elsefork_one.simple.action.file", file5.getAbsolutePath());
        newHashMap.put("elsefork_one.simple.action.donefile", file6.getAbsolutePath());
        File file7 = new File(tmpFolder.newFolder() + "/elsefork_another.file");
        File file8 = new File(tmpFolder.newFolder() + "/elsefork_another.file.done");
        newHashMap.put("elsefork_another.simple.action.file", file7.getAbsolutePath());
        newHashMap.put("elsefork_another.simple.action.donefile", file8.getAbsolutePath());
        File file9 = new File(tmpFolder.newFolder() + "/elsefork_third.file");
        File file10 = new File(tmpFolder.newFolder() + "/elsefork_third.file.done");
        newHashMap.put("elsefork_third.simple.action.file", file9.getAbsolutePath());
        newHashMap.put("elsefork_third.simple.action.donefile", file10.getAbsolutePath());
        newHashMap.put("inputPath", createConditionInput("ConditionProgramInput", 2, 12));
        newHashMap.put("outputPath", new File(tmpFolder.newFolder(), "ConditionProgramOutput").getAbsolutePath());
        setAndTestRuntimeArgs(from, newHashMap);
        startProgram(from, 200);
        while (true) {
            if (file5.exists() && file7.exists() && file9.exists()) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
        List<RunRecord> programRuns = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns.size() == 1);
        verifyRunningProgramCount(from, programRuns.get(0).getPid(), 3);
        file6.createNewFile();
        file8.createNewFile();
        file10.createNewFile();
        verifyProgramRuns(from, "completed");
        List<RunRecord> programRuns2 = getProgramRuns(from, "completed");
        Id.Program from2 = Id.Program.from("testnamespace2", "ConditionalWorkflowApp", ProgramType.MAPREDUCE, "RecordVerifier");
        List<RunRecord> programRuns3 = getProgramRuns(from2, "completed");
        Id.Program from3 = Id.Program.from("testnamespace2", "ConditionalWorkflowApp", ProgramType.MAPREDUCE, "ClassicWordCount");
        List<RunRecord> programRuns4 = getProgramRuns(from3, "completed");
        Assert.assertEquals(1L, programRuns2.size());
        Assert.assertEquals(1L, programRuns3.size());
        Assert.assertEquals(0L, programRuns4.size());
        newHashMap.put("inputPath", createConditionInput("AnotherConditionProgramInput", 10, 2));
        newHashMap.put("mapreduce.RecordVerifier.outputPath", new File(tmpFolder.newFolder(), "ConditionProgramOutput").getAbsolutePath());
        newHashMap.put("mapreduce.ClassicWordCount.outputPath", new File(tmpFolder.newFolder(), "ConditionProgramOutput").getAbsolutePath());
        setAndTestRuntimeArgs(from, newHashMap);
        startProgram(from, 200);
        while (true) {
            if (file.exists() && file3.exists()) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
        List<RunRecord> programRuns5 = getProgramRuns(from, "running");
        Assert.assertTrue(programRuns5.size() == 1);
        verifyRunningProgramCount(from, programRuns5.get(0).getPid(), 2);
        file2.createNewFile();
        file4.createNewFile();
        verifyProgramRuns(from, "completed", 1);
        List<RunRecord> programRuns6 = getProgramRuns(from, "completed");
        List<RunRecord> programRuns7 = getProgramRuns(from2, "completed");
        List<RunRecord> programRuns8 = getProgramRuns(from3, "completed");
        Assert.assertEquals(2L, programRuns6.size());
        Assert.assertEquals(2L, programRuns7.size());
        Assert.assertEquals(1L, programRuns8.size());
    }
}
