package eu.stratosphere.nephele.client;

import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.AbstractJobResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.JobEvent;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.protocols.JobManagementProtocol;
import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/client/JobClient.class */
public class JobClient {
    private static final Log LOG = LogFactory.getLog(JobClient.class);
    private final JobManagementProtocol jobSubmitClient;
    private AccumulatorProtocol accumulatorProtocolProxy;
    private final JobGraph jobGraph;
    private final Configuration configuration;
    private final JobCleanUp jobCleanUp;
    private long lastProcessedEventSequenceNumber;
    private PrintStream console;

    /* loaded from: input_file:eu/stratosphere/nephele/client/JobClient$JobCleanUp.class */
    public static class JobCleanUp extends Thread {
        private final JobClient jobClient;

        public JobCleanUp(JobClient jobClient) {
            this.jobClient = jobClient;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.jobClient.close();
        }
    }

    public JobClient(JobGraph jobGraph) throws IOException {
        this(jobGraph, new Configuration());
    }

    public JobClient(JobGraph jobGraph, Configuration configuration) throws IOException {
        this.lastProcessedEventSequenceNumber = -1L;
        InetSocketAddress inetSocketAddress = new InetSocketAddress(configuration.getString("jobmanager.rpc.address", (String) null), configuration.getInteger("jobmanager.rpc.port", 6123));
        this.jobSubmitClient = (JobManagementProtocol) RPC.getProxy(JobManagementProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
        this.accumulatorProtocolProxy = (AccumulatorProtocol) RPC.getProxy(AccumulatorProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
        this.jobGraph = jobGraph;
        this.configuration = configuration;
        this.jobCleanUp = new JobCleanUp(this);
    }

    public JobClient(JobGraph jobGraph, Configuration configuration, InetSocketAddress inetSocketAddress) throws IOException {
        this.lastProcessedEventSequenceNumber = -1L;
        this.jobSubmitClient = (JobManagementProtocol) RPC.getProxy(JobManagementProtocol.class, inetSocketAddress, NetUtils.getSocketFactory());
        this.jobGraph = jobGraph;
        this.configuration = configuration;
        this.jobCleanUp = new JobCleanUp(this);
    }

    public void close() {
        synchronized (this.jobSubmitClient) {
            RPC.stopProxy(this.jobSubmitClient);
        }
        synchronized (this.accumulatorProtocolProxy) {
            RPC.stopProxy(this.accumulatorProtocolProxy);
        }
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public JobSubmissionResult submitJob() throws IOException {
        JobSubmissionResult submitJob;
        synchronized (this.jobSubmitClient) {
            submitJob = this.jobSubmitClient.submitJob(this.jobGraph);
        }
        return submitJob;
    }

    public JobCancelResult cancelJob() throws IOException {
        JobCancelResult cancelJob;
        synchronized (this.jobSubmitClient) {
            cancelJob = this.jobSubmitClient.cancelJob(this.jobGraph.getJobID());
        }
        return cancelJob;
    }

    public JobProgressResult getJobProgress() throws IOException {
        JobProgressResult jobProgress;
        synchronized (this.jobSubmitClient) {
            jobProgress = this.jobSubmitClient.getJobProgress(this.jobGraph.getJobID());
        }
        return jobProgress;
    }

    public JobExecutionResult submitJobAndWait() throws IOException, JobExecutionException {
        JobEvent jobEvent;
        JobStatus currentJobStatus;
        synchronized (this.jobSubmitClient) {
            JobSubmissionResult submitJob = this.jobSubmitClient.submitJob(this.jobGraph);
            if (submitJob.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
                LOG.error("ERROR: " + submitJob.getDescription());
                throw new JobExecutionException(submitJob.getDescription(), false);
            }
            Runtime.getRuntime().addShutdownHook(this.jobCleanUp);
        }
        try {
            long value = this.jobSubmitClient.getRecommendedPollingInterval().getValue() * 1000;
            try {
                Thread.sleep(value / 2);
            } catch (InterruptedException e) {
                Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                logErrorAndRethrow(StringUtils.stringifyException(e));
            }
            long j = -1;
            loop0: while (true) {
                if (Thread.interrupted()) {
                    logErrorAndRethrow("Job client has been interrupted");
                }
                try {
                    JobProgressResult jobProgress = getJobProgress();
                    if (jobProgress == null) {
                        logErrorAndRethrow("Returned job progress is unexpectedly null!");
                    }
                    if (jobProgress.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
                        logErrorAndRethrow("Could not retrieve job progress: " + jobProgress.getDescription());
                    }
                    Iterator<AbstractEvent> events = jobProgress.getEvents();
                    while (events.hasNext()) {
                        AbstractEvent next = events.next();
                        if (this.lastProcessedEventSequenceNumber < next.getSequenceNumber()) {
                            LOG.info(next.toString());
                            if (this.console != null) {
                                this.console.println(next.toString());
                            }
                            this.lastProcessedEventSequenceNumber = next.getSequenceNumber();
                            if (next instanceof JobEvent) {
                                jobEvent = (JobEvent) next;
                                currentJobStatus = jobEvent.getCurrentJobStatus();
                                if (currentJobStatus == JobStatus.SCHEDULED) {
                                    j = jobEvent.getTimestamp();
                                }
                                if (currentJobStatus == JobStatus.FINISHED) {
                                    Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                                    try {
                                        return new JobExecutionResult(jobEvent.getTimestamp() - j, AccumulatorHelper.toResultMap(getAccumulators().getAccumulators()));
                                    } catch (IOException e2) {
                                        Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                                        throw e2;
                                    }
                                }
                                if (currentJobStatus == JobStatus.CANCELED || currentJobStatus == JobStatus.FAILED) {
                                    break loop0;
                                }
                            } else {
                                continue;
                            }
                        }
                    }
                    try {
                        Thread.sleep(value);
                    } catch (InterruptedException e3) {
                        logErrorAndRethrow(StringUtils.stringifyException(e3));
                    }
                } catch (IOException e4) {
                    Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
                    throw e4;
                }
            }
            Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
            LOG.info(jobEvent.getOptionalMessage());
            if (currentJobStatus == JobStatus.CANCELED) {
                throw new JobExecutionException(jobEvent.getOptionalMessage(), true);
            }
            throw new JobExecutionException(jobEvent.getOptionalMessage(), false);
        } catch (IOException e5) {
            Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
            throw e5;
        }
    }

    public int getRecommendedPollingInterval() throws IOException {
        int value;
        synchronized (this.jobSubmitClient) {
            value = this.jobSubmitClient.getRecommendedPollingInterval().getValue();
        }
        return value;
    }

    private void logErrorAndRethrow(String str) throws IOException {
        LOG.error(str);
        throw new IOException(str);
    }

    public void setConsoleStreamForReporting(PrintStream printStream) {
        this.console = printStream;
    }

    private AccumulatorEvent getAccumulators() throws IOException {
        AccumulatorEvent accumulatorResults;
        synchronized (this.jobSubmitClient) {
            accumulatorResults = this.accumulatorProtocolProxy.getAccumulatorResults(this.jobGraph.getJobID());
        }
        return accumulatorResults;
    }
}
