package eu.stratosphere.nephele.jobmanager;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.client.AbstractJobResult;
import eu.stratosphere.nephele.client.JobCancelResult;
import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.GraphConversionException;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.instance.local.LocalInstanceManager;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager;
import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
import eu.stratosphere.nephele.jobmanager.web.WebInfoServer;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.multicast.MulticastManager;
import eu.stratosphere.nephele.profiling.JobManagerProfiler;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
import eu.stratosphere.nephele.protocols.InputSplitProviderProtocol;
import eu.stratosphere.nephele.protocols.JobManagerProtocol;
import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.nephele.types.IntegerRecord;
import eu.stratosphere.nephele.util.SerializableArrayList;
import eu.stratosphere.util.StringUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

/* loaded from: input_file:eu/stratosphere/nephele/jobmanager/JobManager.class */
public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol, JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol {
    private static final Log LOG = LogFactory.getLog(JobManager.class);
    private final Server jobManagerServer;
    private final JobManagerProfiler profiler;
    private final EventCollector eventCollector;
    private final ArchiveListener archive;
    private final InputSplitManager inputSplitManager;
    private final AbstractScheduler scheduler;
    private final MulticastManager multicastManager;
    private AccumulatorManager accumulatorManager;
    private InstanceManager instanceManager;
    private final int recommendedClientPollingInterval;
    private static final int FAILURE_RETURN_CODE = 1;
    private volatile boolean isShutDown;
    private WebInfoServer server;
    private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
    private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);

    /* loaded from: input_file:eu/stratosphere/nephele/jobmanager/JobManager$ExecutionMode.class */
    public enum ExecutionMode {
        LOCAL,
        CLUSTER
    }

    public JobManager(ExecutionMode executionMode) throws Exception {
        String string = GlobalConfiguration.getString("jobmanager.rpc.address", (String) null);
        InetAddress inetAddress = null;
        if (string != null) {
            try {
                inetAddress = InetAddress.getByName(string);
            } catch (UnknownHostException e) {
                throw new Exception("Cannot convert " + string + " to an IP address: " + e.getMessage(), e);
            }
        }
        int integer = GlobalConfiguration.getInteger("jobmanager.rpc.port", 6123);
        this.recommendedClientPollingInterval = GlobalConfiguration.getInteger("jobclient.polling.interval", 2);
        this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
        int integer2 = GlobalConfiguration.getInteger("jobmanager.web.history", 5);
        if (integer2 > 0) {
            this.archive = new MemoryArchivist(integer2);
            this.eventCollector.registerArchivist(this.archive);
        } else {
            this.archive = null;
        }
        this.accumulatorManager = new AccumulatorManager(Math.min(FAILURE_RETURN_CODE, integer2));
        this.inputSplitManager = new InputSplitManager();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, integer);
        try {
            this.jobManagerServer = RPC.getServer(this, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), GlobalConfiguration.getInteger("jobmanager.rpc.numhandler", 8));
            this.jobManagerServer.start();
            LOG.info("Starting job manager in " + executionMode + " mode");
            if (executionMode == ExecutionMode.LOCAL) {
                try {
                    this.instanceManager = new LocalInstanceManager();
                } catch (Throwable th) {
                    throw new Exception("Cannot instantiate local instance manager: " + th.getMessage(), th);
                }
            } else {
                String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
                LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
                this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
                if (this.instanceManager == null) {
                    throw new Exception("Unable to load instance manager " + instanceManagerClassName);
                }
            }
            String schedulerClassName = JobManagerUtils.getSchedulerClassName(executionMode);
            LOG.info("Trying to load " + schedulerClassName + " as scheduler");
            this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager);
            if (this.scheduler == null) {
                throw new Exception("Unable to load scheduler " + schedulerClassName);
            }
            this.multicastManager = new MulticastManager(this.scheduler);
            if (!GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
                this.profiler = null;
                LOG.debug("Profiler disabled");
            } else {
                this.profiler = ProfilingUtils.loadJobManagerProfiler(GlobalConfiguration.getString(ProfilingUtils.JOBMANAGER_CLASSNAME_KEY, "eu.stratosphere.nephele.profiling.impl.JobManagerProfilerImpl"), inetAddress);
                if (this.profiler == null) {
                    throw new Exception("Cannot load profiler");
                }
            }
        } catch (IOException e2) {
            throw new Exception("Cannot start RPC server: " + e2.getMessage(), e2);
        }
    }

    public void shutdown() {
        if (this.isShutdownInProgress.compareAndSet(false, true)) {
            if (this.instanceManager != null) {
                this.instanceManager.shutdown();
            }
            if (this.profiler != null) {
                this.profiler.shutdown();
            }
            if (this.jobManagerServer != null) {
                this.jobManagerServer.stop();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    LOG.debug(e);
                }
            }
            if (this.eventCollector != null) {
                this.eventCollector.shutdown();
            }
            if (this.scheduler != null) {
                this.scheduler.shutdown();
            }
            this.isShutDown = true;
            LOG.debug("Shutdown of job manager completed");
        }
    }

    private static void logVersionInformation() {
        String implementationVersion = JobManager.class.getPackage().getImplementationVersion();
        String str = "<unknown>";
        try {
            Properties properties = new Properties();
            InputStream resourceAsStream = JobManager.class.getClassLoader().getResourceAsStream(".version.properties");
            if (resourceAsStream != null) {
                properties.load(resourceAsStream);
                str = properties.getProperty("git.commit.id.abbrev");
            }
        } catch (IOException e) {
            LOG.info("Cannot determine code revision. Unable ro read version property file.");
        }
        LOG.info("Starting Stratosphere JobManager (Version: " + implementationVersion + ", Rev:" + str + ")");
    }

    public static void main(String[] strArr) {
        if (System.getProperty("log4j.configuration") == null) {
            Logger rootLogger = Logger.getRootLogger();
            rootLogger.removeAllAppenders();
            rootLogger.addAppender(new ConsoleAppender(new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"), "System.err"));
            rootLogger.setLevel(Level.INFO);
        }
        try {
            initialize(strArr).startInfoServer();
        } catch (Exception e) {
            LOG.fatal(e.getMessage(), e);
            System.exit(FAILURE_RETURN_CODE);
        }
        Object obj = new Object();
        synchronized (obj) {
            try {
                obj.wait();
            } catch (InterruptedException e2) {
            }
        }
    }

    public static JobManager initialize(String[] strArr) throws Exception {
        logVersionInformation();
        OptionBuilder.withArgName("config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify configuration directory.");
        Option create = OptionBuilder.create("configDir");
        OptionBuilder.withArgName("execution mode");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify execution mode.");
        Option create2 = OptionBuilder.create("executionMode");
        Options options = new Options();
        options.addOption(create);
        options.addOption(create2);
        CommandLine commandLine = null;
        try {
            commandLine = new GnuParser().parse(options, strArr);
        } catch (ParseException e) {
            LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(FAILURE_RETURN_CODE);
        }
        String optionValue = commandLine.getOptionValue(create.getOpt(), (String) null);
        String optionValue2 = commandLine.getOptionValue(create2.getOpt(), "local");
        ExecutionMode executionMode = null;
        if ("local".equals(optionValue2)) {
            executionMode = ExecutionMode.LOCAL;
        } else if ("cluster".equals(optionValue2)) {
            executionMode = ExecutionMode.CLUSTER;
        } else {
            System.err.println("Unrecognized execution mode: " + optionValue2);
            System.exit(FAILURE_RETURN_CODE);
        }
        GlobalConfiguration.loadConfiguration(optionValue);
        JobManager jobManager = new JobManager(executionMode);
        Configuration configuration = GlobalConfiguration.getConfiguration();
        if (optionValue != null && new File(optionValue).isDirectory()) {
            configuration.setString("stratosphere.base.dir.path", optionValue + "/..");
        }
        GlobalConfiguration.includeConfiguration(configuration);
        return jobManager;
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagementProtocol
    public JobSubmissionResult submitJob(JobGraph jobGraph) throws IOException {
        if (jobGraph == null) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submitted job " + jobGraph.getName() + " is not null");
        }
        AbstractJobVertex findVertexWithNullEdges = jobGraph.findVertexWithNullEdges();
        if (findVertexWithNullEdges != null) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Vertex " + findVertexWithNullEdges.getName() + " has at least one null edge");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submitted job " + jobGraph.getName() + " has no null edges");
        }
        if (!jobGraph.isWeaklyConnected()) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Job graph is not weakly connected");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("The graph of job " + jobGraph.getName() + " is weakly connected");
        }
        if (!jobGraph.isAcyclic()) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Job graph is not a DAG");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("The graph of job " + jobGraph.getName() + " is acyclic");
        }
        AbstractJobVertex areVertexDegreesCorrect = jobGraph.areVertexDegreesCorrect();
        if (areVertexDegreesCorrect != null) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Degree of vertex " + areVertexDegreesCorrect.getName() + " is incorrect");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("All vertices of job " + jobGraph.getName() + " have the correct degree");
        }
        if (!jobGraph.isInstanceDependencyChainAcyclic()) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "The dependency chain for instance sharing contains a cycle");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("The dependency chain for instance sharing is acyclic");
        }
        boolean z = false;
        if (this.profiler != null && jobGraph.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
            z = FAILURE_RETURN_CODE;
        }
        LOG.info("Creating initial execution graph from job graph " + jobGraph.getName());
        try {
            ExecutionGraph executionGraph = new ExecutionGraph(jobGraph, this.instanceManager);
            if (this.eventCollector != null) {
                this.eventCollector.registerJob(executionGraph, z, System.currentTimeMillis());
            }
            if (z) {
                this.profiler.registerProfilingJob(executionGraph);
                if (this.eventCollector != null) {
                    this.profiler.registerForProfilingData(executionGraph.getJobID(), this.eventCollector);
                }
            }
            this.inputSplitManager.registerJob(executionGraph);
            executionGraph.registerJobStatusListener(this);
            if (LOG.isInfoEnabled()) {
                LOG.info("Scheduling job " + jobGraph.getName());
            }
            try {
                this.scheduler.schedulJob(executionGraph);
                return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
            } catch (SchedulingException e) {
                unregisterJob(executionGraph);
                return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
            }
        } catch (GraphConversionException e2) {
            return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e2.getMessage());
        }
    }

    public InstanceManager getInstanceManager() {
        return this.instanceManager;
    }

    private void unregisterJob(ExecutionGraph executionGraph) {
        if (this.profiler != null && executionGraph.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
            this.profiler.unregisterProfilingJob(executionGraph);
            if (this.eventCollector != null) {
                this.profiler.unregisterFromProfilingData(executionGraph.getJobID(), this.eventCollector);
            }
        }
        this.instanceManager.cancelPendingRequests(executionGraph.getJobID());
        if (this.inputSplitManager != null) {
            this.inputSplitManager.unregisterJob(executionGraph);
        }
        try {
            LibraryCacheManager.unregister(executionGraph.getJobID());
        } catch (IOException e) {
            if (LOG.isWarnEnabled()) {
                LOG.warn(e);
            }
        }
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagerProtocol
    public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo, final HardwareDescription hardwareDescription) {
        if (this.instanceManager != null) {
            this.executorService.execute(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.1
                @Override // java.lang.Runnable
                public void run() {
                    JobManager.this.instanceManager.reportHeartBeat(instanceConnectionInfo, hardwareDescription);
                }
            });
        }
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagerProtocol
    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) throws IOException {
        if (taskExecutionState == null) {
            LOG.error("Received call to updateTaskExecutionState with executionState == null");
            return;
        }
        if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
            LOG.error(taskExecutionState.getDescription());
        }
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(taskExecutionState.getJobID());
        if (executionGraphByID == null) {
            LOG.error("Cannot find execution graph for ID " + taskExecutionState.getJobID() + " to change state to " + taskExecutionState.getExecutionState());
            return;
        }
        ExecutionVertex vertexByID = executionGraphByID.getVertexByID(taskExecutionState.getID());
        if (vertexByID == null) {
            LOG.error("Cannot find vertex with ID " + taskExecutionState.getID() + " of job " + executionGraphByID.getJobID() + " to change state to " + taskExecutionState.getExecutionState());
        } else {
            vertexByID.updateExecutionStateAsynchronously(taskExecutionState.getExecutionState(), taskExecutionState.getDescription());
        }
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagementProtocol
    public JobCancelResult cancelJob(JobID jobID) throws IOException {
        LOG.info("Trying to cancel job with ID " + jobID);
        final ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            return new JobCancelResult(AbstractJobResult.ReturnCode.ERROR, "Cannot find job with ID " + jobID);
        }
        executionGraphByID.executeCommand(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.2
            @Override // java.lang.Runnable
            public void run() {
                executionGraphByID.updateJobStatus(InternalJobStatus.CANCELING, "Job canceled by user");
                TaskCancelResult cancelJob = JobManager.this.cancelJob(executionGraphByID);
                if (cancelJob != null) {
                    JobManager.LOG.error(cancelJob.getDescription());
                }
            }
        });
        LOG.info("Cancel of job " + jobID + " successfully triggered");
        return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TaskCancelResult cancelJob(ExecutionGraph executionGraph) {
        TaskCancelResult taskCancelResult = null;
        ExecutionGraphIterator executionGraphIterator = new ExecutionGraphIterator(executionGraph, executionGraph.getIndexOfCurrentExecutionStage(), false, true);
        while (executionGraphIterator.hasNext()) {
            TaskCancelResult cancelTask = executionGraphIterator.next().cancelTask();
            if (cancelTask.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
                taskCancelResult = cancelTask;
            }
        }
        return taskCancelResult;
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagementProtocol
    public JobProgressResult getJobProgress(JobID jobID) throws IOException {
        if (this.eventCollector == null) {
            return new JobProgressResult(AbstractJobResult.ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
        }
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        this.eventCollector.getEventsForJob(jobID, serializableArrayList, false);
        return new JobProgressResult(AbstractJobResult.ReturnCode.SUCCESS, null, serializableArrayList);
    }

    @Override // eu.stratosphere.nephele.protocols.ChannelLookupProtocol
    public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo instanceConnectionInfo, JobID jobID, ChannelID channelID) {
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            LOG.error("Cannot find execution graph to job ID " + jobID);
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        InternalJobStatus jobStatus = executionGraphByID.getJobStatus();
        if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
            return ConnectionInfoLookupResponse.createJobIsAborting();
        }
        ExecutionEdge edgeByID = executionGraphByID.getEdgeByID(channelID);
        if (edgeByID == null) {
            LOG.error("Cannot find execution edge associated with ID " + channelID);
            return ConnectionInfoLookupResponse.createReceiverNotFound();
        }
        if (channelID.equals(edgeByID.getInputChannelID())) {
            ExecutionVertex vertex = edgeByID.getOutputGate().getVertex();
            AbstractInstance allocatedResource = vertex.getAllocatedResource().getInstance();
            if (allocatedResource == null) {
                LOG.error("Cannot resolve lookup: vertex found for channel ID " + edgeByID.getOutputGateIndex() + " but no instance assigned");
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            }
            ExecutionState executionState = vertex.getExecutionState();
            if (executionState == ExecutionState.FINISHED) {
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady();
            }
            if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
                return ConnectionInfoLookupResponse.createReceiverNotReady();
            }
            if (allocatedResource.getInstanceConnectionInfo().equals(instanceConnectionInfo)) {
                return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edgeByID.getOutputChannelID());
            }
            InstanceConnectionInfo instanceConnectionInfo2 = allocatedResource.getInstanceConnectionInfo();
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(new InetSocketAddress(instanceConnectionInfo2.getAddress(), instanceConnectionInfo2.getDataPort()), edgeByID.getConnectionID()));
        }
        if (edgeByID.isBroadcast()) {
            return this.multicastManager.lookupConnectionInfo(instanceConnectionInfo, jobID, channelID);
        }
        final ExecutionVertex vertex2 = edgeByID.getInputGate().getVertex();
        ExecutionState executionState2 = vertex2.getExecutionState();
        if (executionState2 != ExecutionState.RUNNING && executionState2 != ExecutionState.FINISHING && executionState2 != ExecutionState.FINISHED) {
            if (executionState2 == ExecutionState.ASSIGNED) {
                executionGraphByID.executeCommand(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.3
                    @Override // java.lang.Runnable
                    public void run() {
                        JobManager.this.scheduler.deployAssignedVertices(vertex2);
                    }
                });
            }
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        AbstractInstance allocatedResource2 = vertex2.getAllocatedResource().getInstance();
        if (allocatedResource2 == null) {
            LOG.error("Cannot resolve lookup: vertex found for channel ID " + edgeByID.getInputChannelID() + " but no instance assigned");
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        if (allocatedResource2.getInstanceConnectionInfo().equals(instanceConnectionInfo)) {
            return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edgeByID.getInputChannelID());
        }
        InstanceConnectionInfo instanceConnectionInfo3 = allocatedResource2.getInstanceConnectionInfo();
        return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(new InetSocketAddress(instanceConnectionInfo3.getAddress(), instanceConnectionInfo3.getDataPort()), edgeByID.getConnectionID()));
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public ManagementGraph getManagementGraph(JobID jobID) throws IOException {
        ManagementGraph managementGraph = this.eventCollector.getManagementGraph(jobID);
        if (managementGraph == null) {
            if (this.archive != null) {
                managementGraph = this.archive.getManagementGraph(jobID);
            }
            if (managementGraph == null) {
                throw new IOException("Cannot find job with ID " + jobID);
            }
        }
        return managementGraph;
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
        if (this.instanceManager != null) {
            return this.instanceManager.getNetworkTopology(jobID);
        }
        return null;
    }

    @Override // eu.stratosphere.nephele.protocols.JobManagementProtocol
    public IntegerRecord getRecommendedPollingInterval() throws IOException {
        return new IntegerRecord(this.recommendedClientPollingInterval);
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public List<RecentJobEvent> getRecentJobs() throws IOException {
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getRecentJobs(serializableArrayList);
        return serializableArrayList;
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public List<AbstractEvent> getEvents(JobID jobID) throws IOException {
        SerializableArrayList serializableArrayList = new SerializableArrayList();
        if (this.eventCollector == null) {
            throw new IOException("No instance of the event collector found");
        }
        this.eventCollector.getEventsForJob(jobID, serializableArrayList, true);
        return serializableArrayList;
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public void killTask(JobID jobID, ManagementVertexID managementVertexID) throws IOException {
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            LOG.error("Cannot find execution graph for job " + jobID);
            return;
        }
        final ExecutionVertex vertexByID = executionGraphByID.getVertexByID(ExecutionVertexID.fromManagementVertexID(managementVertexID));
        if (vertexByID == null) {
            LOG.error("Cannot find execution vertex with ID " + managementVertexID);
        } else {
            LOG.info("Killing task " + vertexByID + " of job " + jobID);
            executionGraphByID.executeCommand(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.4
                @Override // java.lang.Runnable
                public void run() {
                    TaskKillResult killTask = vertexByID.killTask();
                    if (killTask.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
                        JobManager.LOG.error(killTask.getDescription());
                    }
                }
            });
        }
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public void killInstance(StringRecord stringRecord) throws IOException {
        final AbstractInstance instanceByName = this.instanceManager.getInstanceByName(stringRecord.toString());
        if (instanceByName == null) {
            LOG.error("Cannot find instance with name " + stringRecord + " to kill it");
            return;
        }
        LOG.info("Killing task manager on instance " + instanceByName);
        this.executorService.execute(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    instanceByName.killTaskManager();
                } catch (IOException e) {
                    JobManager.LOG.error(e);
                }
            }
        });
    }

    public boolean isShutDown() {
        return this.isShutDown;
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
        if (this.instanceManager != null) {
            return this.instanceManager.getMapOfAvailableInstanceTypes();
        }
        return null;
    }

    @Override // eu.stratosphere.nephele.executiongraph.JobStatusListener
    public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus internalJobStatus, String str) {
        LOG.info("Status of job " + executionGraph.getJobName() + "(" + executionGraph.getJobID() + ") changed to " + internalJobStatus);
        if (internalJobStatus == InternalJobStatus.FAILING) {
            cancelJob(executionGraph);
        }
        if (internalJobStatus == InternalJobStatus.CANCELED || internalJobStatus == InternalJobStatus.FAILED || internalJobStatus == InternalJobStatus.FINISHED) {
            unregisterJob(executionGraph);
        }
    }

    @Override // eu.stratosphere.nephele.protocols.ExtendedManagementProtocol
    public void logBufferUtilization(JobID jobID) throws IOException {
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            return;
        }
        final HashSet hashSet = new HashSet();
        ExecutionGraphIterator executionGraphIterator = new ExecutionGraphIterator(executionGraphByID, true);
        while (executionGraphIterator.hasNext()) {
            ExecutionVertex next = executionGraphIterator.next();
            ExecutionState executionState = next.getExecutionState();
            if (executionState == ExecutionState.RUNNING || executionState == ExecutionState.FINISHING) {
                AbstractInstance allocatedResource = next.getAllocatedResource().getInstance();
                if (allocatedResource instanceof DummyInstance) {
                    LOG.error("Found instance of type DummyInstance for vertex " + next.getName() + " (state " + executionState + ")");
                } else {
                    hashSet.add(allocatedResource);
                }
            }
        }
        this.executorService.execute(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.6
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    try {
                        ((AbstractInstance) it.next()).logBufferUtilization();
                    } catch (IOException e) {
                        JobManager.LOG.error(e);
                        return;
                    }
                }
            }
        });
    }

    @Override // eu.stratosphere.nephele.jobmanager.DeploymentManager
    public void deploy(final JobID jobID, final AbstractInstance abstractInstance, final List<ExecutionVertex> list) {
        if (list.isEmpty()) {
            LOG.error("Method 'deploy' called but list of vertices to be deployed is empty");
            return;
        }
        for (ExecutionVertex executionVertex : list) {
            if (executionVertex.getExecutionState() != ExecutionState.READY) {
                LOG.error("Expected vertex " + executionVertex + " to be in state READY but it is in state " + executionVertex.getExecutionState());
            }
            executionVertex.updateExecutionState(ExecutionState.STARTING, null);
        }
        this.executorService.execute(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.JobManager.7
            @Override // java.lang.Runnable
            public void run() {
                try {
                    abstractInstance.checkLibraryAvailability(jobID);
                } catch (IOException e) {
                    JobManager.LOG.error("Cannot check library availability: " + StringUtils.stringifyException(e));
                }
                SerializableArrayList serializableArrayList = new SerializableArrayList();
                for (ExecutionVertex executionVertex2 : list) {
                    serializableArrayList.add(executionVertex2.constructDeploymentDescriptor());
                    JobManager.LOG.info("Starting task " + executionVertex2 + " on " + executionVertex2.getAllocatedResource().getInstance());
                }
                List<TaskSubmissionResult> list2 = null;
                try {
                    list2 = abstractInstance.submitTasks(serializableArrayList);
                } catch (IOException e2) {
                    String stringifyException = StringUtils.stringifyException(e2);
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        ((ExecutionVertex) it.next()).updateExecutionStateAsynchronously(ExecutionState.FAILED, stringifyException);
                    }
                }
                if (list.size() != list2.size()) {
                    JobManager.LOG.error("size of submission result list does not match size of list with vertices to be deployed");
                }
                int i = 0;
                for (TaskSubmissionResult taskSubmissionResult : list2) {
                    List list3 = list;
                    int i2 = i;
                    i += JobManager.FAILURE_RETURN_CODE;
                    ExecutionVertex executionVertex3 = (ExecutionVertex) list3.get(i2);
                    if (!executionVertex3.getID().equals(taskSubmissionResult.getVertexID())) {
                        JobManager.LOG.error("Expected different order of objects in task result list");
                        executionVertex3 = null;
                        Iterator it2 = list.iterator();
                        while (true) {
                            if (!it2.hasNext()) {
                                break;
                            }
                            ExecutionVertex executionVertex4 = (ExecutionVertex) it2.next();
                            if (taskSubmissionResult.getVertexID().equals(executionVertex4.getID())) {
                                executionVertex3 = executionVertex4;
                                break;
                            }
                        }
                        if (executionVertex3 == null) {
                            JobManager.LOG.error("Cannot find execution vertex for vertex ID " + taskSubmissionResult.getVertexID());
                        }
                    }
                    if (taskSubmissionResult.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
                        executionVertex3.updateExecutionStateAsynchronously(ExecutionState.FAILED, taskSubmissionResult.getDescription());
                    }
                }
            }
        });
    }

    @Override // eu.stratosphere.nephele.protocols.InputSplitProviderProtocol
    public InputSplitWrapper requestNextInputSplit(JobID jobID, ExecutionVertexID executionVertexID, IntegerRecord integerRecord) throws IOException {
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            LOG.error("Cannot find execution graph to job ID " + jobID);
            return null;
        }
        ExecutionVertex vertexByID = executionGraphByID.getVertexByID(executionVertexID);
        if (vertexByID != null) {
            return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertexByID, integerRecord.getValue()));
        }
        LOG.error("Cannot find execution vertex for vertex ID " + executionVertexID);
        return null;
    }

    public void startInfoServer() {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        try {
            this.server = new WebInfoServer(configuration, configuration.getInteger("jobmanager.web.port", 8081), this);
            this.server.start();
        } catch (FileNotFoundException e) {
            LOG.error(e.getMessage(), e);
        } catch (Exception e2) {
            LOG.error("Cannot instantiate info server: " + e2.getMessage(), e2);
        }
    }

    public List<RecentJobEvent> getOldJobs() throws IOException {
        if (this.archive == null) {
            throw new IOException("No instance of the event collector found");
        }
        return this.archive.getJobs();
    }

    public ArchiveListener getArchive() {
        return this.archive;
    }

    public int getNumberOfTaskTrackers() {
        return this.instanceManager.getNumberOfTaskTrackers();
    }

    @Override // eu.stratosphere.nephele.protocols.AccumulatorProtocol
    public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException {
        this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), accumulatorEvent.getAccumulators());
    }

    @Override // eu.stratosphere.nephele.protocols.AccumulatorProtocol
    public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException {
        return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID), false);
    }
}
