package eu.stratosphere.yarn;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobmanager.JobManager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.util.Records;

/* loaded from: input_file:eu/stratosphere/yarn/ApplicationMaster.class */
public class ApplicationMaster {
    private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);

    /* loaded from: input_file:eu/stratosphere/yarn/ApplicationMaster$JobManagerRunner.class */
    public static class JobManagerRunner extends Thread {
        private String pathToNepheleConfig;
        private JobManager jm;

        public JobManagerRunner(String str) {
            super("Job manager runner");
            this.pathToNepheleConfig = "";
            this.pathToNepheleConfig = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.jm = JobManager.initialize(new String[]{"-executionMode", "cluster", "-configDir", this.pathToNepheleConfig});
            this.jm.startInfoServer();
            this.jm.runTaskLoop();
        }

        public void shutdown() {
            this.jm.shutdown();
        }
    }

    public static void main(String[] strArr) throws Exception {
        Configuration initializeYarnConfiguration = Utils.initializeYarnConfiguration();
        FileSystem fileSystem = FileSystem.get(initializeYarnConfiguration);
        Map<String, String> map = System.getenv();
        String str = map.get(ApplicationConstants.Environment.PWD.key());
        String str2 = map.get(ApplicationConstants.Environment.NM_HOST.key());
        int intValue = Integer.valueOf(map.get(Client.ENV_APP_ID)).intValue();
        String str3 = map.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
        String str4 = map.get(ApplicationConstants.Environment.NM_HOST.key());
        String str5 = map.get(Client.STRATOSPHERE_JAR_PATH);
        int intValue2 = Integer.valueOf(map.get(Client.ENV_TM_COUNT)).intValue();
        int intValue3 = Integer.valueOf(map.get(Client.ENV_TM_MEMORY)).intValue();
        int intValue4 = Integer.valueOf(map.get(Client.ENV_TM_CORES)).intValue();
        int i = (int) (intValue3 * 0.7d);
        if (str == null) {
            throw new RuntimeException("Current directory unknown");
        }
        if (str2 == null) {
            throw new RuntimeException("Own hostname (" + ApplicationConstants.Environment.NM_HOST + ") not set.");
        }
        LOG.info("Working directory " + str);
        String str6 = str + "/resources/web-docs-infoserver";
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(str + "/stratosphere-conf.yaml")));
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(str + "/stratosphere-conf-modified.yaml"));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            }
            if (readLine.contains("jobmanager.rpc.address")) {
                bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + str2 + "\n"));
            } else if (readLine.contains("jobmanager.web.rootpath")) {
                bufferedWriter.append((CharSequence) "jobmanager.web.rootpath: \n");
            } else if (str3 == null || !readLine.contains("taskmanager.tmp.dirs")) {
                bufferedWriter.append((CharSequence) (readLine + "\n"));
            } else {
                bufferedWriter.append((CharSequence) ("taskmanager.tmp.dirs: " + str3 + "\n"));
            }
        }
        bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + str2 + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.rootpath: " + str6 + "\n"));
        if (str3 != null) {
            bufferedWriter.append((CharSequence) ("taskmanager.tmp.dirs: " + str3 + "\n"));
        }
        bufferedWriter.close();
        bufferedReader.close();
        if (!new File(str + "/stratosphere-conf-modified.yaml").exists()) {
            LOG.warn("modified yaml does not exist!");
        }
        Utils.copyJarContents("resources/web-docs-infoserver", ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        JobManagerRunner jobManagerRunner = new JobManagerRunner(str + "/stratosphere-conf-modified.yaml");
        LOG.info("Starting JobManager");
        jobManagerRunner.start();
        AMRMClient createAMRMClient = AMRMClient.createAMRMClient();
        createAMRMClient.init(initializeYarnConfiguration);
        createAMRMClient.start();
        NMClient createNMClient = NMClient.createNMClient();
        createNMClient.init(initializeYarnConfiguration);
        createNMClient.start();
        LOG.info("registering ApplicationMaster");
        createAMRMClient.registerApplicationMaster(str4, 0, "http://" + str4 + ":" + GlobalConfiguration.getString("jobmanager.web.port", "undefined"));
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(0);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(intValue3);
        resource.setVirtualCores(intValue4);
        for (int i2 = 0; i2 < intValue2; i2++) {
            AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, priority);
            LOG.info("Requesting TaskManager container " + i2);
            createAMRMClient.addContainerRequest(containerRequest);
        }
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
        Utils.registerLocalResource(fileSystem, new Path(str5), localResource);
        Utils.setupLocalResource(initializeYarnConfiguration, fileSystem, intValue, new Path("file://" + str + "/stratosphere-conf-modified.yaml"), localResource2);
        LOG.info("Prepared localresource for modified yaml: " + localResource2);
        int i3 = 0;
        int i4 = 0;
        while (i3 < intValue2) {
            AllocateResponse allocate = createAMRMClient.allocate(0.0f);
            for (Container container : allocate.getAllocatedContainers()) {
                i3++;
                ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
                String str7 = "$JAVA_HOME/bin/java -Xmx" + i + "m  eu.stratosphere.nephele.taskmanager.TaskManager -configDir .  1><LOG_DIR>/stdout 2><LOG_DIR>/stderr";
                containerLaunchContext.setCommands(Collections.singletonList(str7));
                LOG.info("Starting TM with command=" + str7);
                HashMap hashMap = new HashMap(2);
                hashMap.put("stratosphere.jar", localResource);
                hashMap.put("stratosphere-conf.yaml", localResource2);
                containerLaunchContext.setLocalResources(hashMap);
                HashMap hashMap2 = new HashMap();
                Utils.setupEnv(initializeYarnConfiguration, hashMap2);
                containerLaunchContext.setEnvironment(hashMap2);
                LOG.info("Launching container " + i3);
                createNMClient.startContainer(container, containerLaunchContext);
            }
            Iterator it = allocate.getCompletedContainersStatuses().iterator();
            while (it.hasNext()) {
                i4++;
                LOG.info("Completed container " + ((ContainerStatus) it.next()).getContainerId() + ". Total Completed:" + i4);
            }
            Thread.sleep(100L);
        }
        while (i4 < intValue2) {
            Iterator it2 = createAMRMClient.allocate(i4 / intValue2).getCompletedContainersStatuses().iterator();
            while (it2.hasNext()) {
                i4++;
                LOG.info("Completed container " + ((ContainerStatus) it2.next()).getContainerId() + ". Total Completed:" + i4);
            }
            Thread.sleep(5000L);
        }
        jobManagerRunner.shutdown();
        jobManagerRunner.join(500L);
        createAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
    }
}
