package eu.stratosphere.sopremo.server;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionResult;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.sopremo.execution.ExecutionRequest;
import eu.stratosphere.sopremo.execution.ExecutionResponse;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.operator.SopremoPlan;
import eu.stratosphere.util.StringUtils;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/sopremo/server/SopremoExecutionThread.class */
public class SopremoExecutionThread implements Runnable {
    private final SopremoJobInfo jobInfo;
    private final InetSocketAddress jobManagerAddress;
    private static final Log LOG = LogFactory.getLog(SopremoExecutionThread.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: eu.stratosphere.sopremo.server.SopremoExecutionThread$1, reason: invalid class name */
    /* loaded from: input_file:eu/stratosphere/sopremo/server/SopremoExecutionThread$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$sopremo$execution$ExecutionRequest$ExecutionMode = new int[ExecutionRequest.ExecutionMode.values().length];

        static {
            try {
                $SwitchMap$eu$stratosphere$sopremo$execution$ExecutionRequest$ExecutionMode[ExecutionRequest.ExecutionMode.RUN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$stratosphere$sopremo$execution$ExecutionRequest$ExecutionMode[ExecutionRequest.ExecutionMode.RUN_WITH_STATISTICS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public SopremoExecutionThread(SopremoJobInfo sopremoJobInfo, InetSocketAddress inetSocketAddress) {
        this.jobInfo = sopremoJobInfo;
        this.jobManagerAddress = inetSocketAddress;
    }

    @Override // java.lang.Runnable
    public void run() {
        processPlan();
    }

    JobGraph getJobGraph(Plan plan) {
        return new NepheleJobGraphGenerator().compileJobGraph(new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), this.jobManagerAddress).compile(plan));
    }

    private JobExecutionResult executePlan(SopremoPlan sopremoPlan) {
        try {
            JobGraph jobGraph = getJobGraph(sopremoPlan.asPactPlan());
            try {
                for (String str : this.jobInfo.getInitialRequest().getQuery().getRequiredPackages()) {
                    Path contains = LibraryCacheManager.contains(str);
                    if (contains == null) {
                        LOG.error("Could not find associated packages " + str + " of job " + this.jobInfo.getJobId());
                        this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "Could not find associated packages: " + str);
                        return null;
                    }
                    jobGraph.addJar(contains);
                }
                try {
                    this.jobInfo.getConfiguration().setString("jobmanager.rpc.address", this.jobManagerAddress.getHostString());
                    this.jobInfo.getConfiguration().setInteger("jobmanager.rpc.port", this.jobManagerAddress.getPort());
                    JobClient jobClient = new JobClient(jobGraph, this.jobInfo.getConfiguration());
                    try {
                        this.jobInfo.setJobClient(jobClient);
                        this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.RUNNING, "");
                        return jobClient.submitJobAndWait();
                    } catch (Exception e) {
                        LOG.error("The job was not successfully executed " + this.jobInfo.getJobId(), e);
                        this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "The job was not successfully executed: " + StringUtils.stringifyException(e));
                        return null;
                    }
                } catch (Exception e2) {
                    LOG.error("Could not open job manager " + this.jobInfo.getJobId(), e2);
                    this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "Could not open job manager: " + StringUtils.stringifyException(e2));
                    return null;
                }
            } catch (Exception e3) {
                LOG.error("Could not find retrieve package information from library manager " + this.jobInfo.getJobId(), e3);
                this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "Could not find retrieve package information from library manager: " + StringUtils.stringifyException(e3));
                return null;
            }
        } catch (Exception e4) {
            LOG.error("Could not generate job graph " + this.jobInfo.getJobId(), e4);
            this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "Could not generate job graph: " + StringUtils.stringifyException(e4));
            return null;
        }
    }

    private void gatherStatistics(SopremoPlan sopremoPlan, JobExecutionResult jobExecutionResult) {
        StringBuilder sb = new StringBuilder();
        sb.append("Executed in ").append(jobExecutionResult.getNetRuntime()).append(" ms");
        for (Sink sink : sopremoPlan.getContainedOperators()) {
            if (sink instanceof Sink) {
                try {
                    String outputPath = sink.getOutputPath();
                    sb.append("\n").append("Sink ").append(outputPath).append(": ").append(FileSystem.get(new URI(outputPath)).getFileStatus(new Path(outputPath)).getLen()).append(" B");
                } catch (Exception e) {
                    LOG.warn("While gathering statistics", e);
                }
            }
        }
        this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.FINISHED, sb.toString());
    }

    private void processPlan() {
        try {
            LOG.info("Starting job " + this.jobInfo.getJobId());
            SopremoPlan query = this.jobInfo.getInitialRequest().getQuery();
            JobExecutionResult executePlan = executePlan(query);
            if (executePlan != null) {
                switch (AnonymousClass1.$SwitchMap$eu$stratosphere$sopremo$execution$ExecutionRequest$ExecutionMode[this.jobInfo.getInitialRequest().getMode().ordinal()]) {
                    case 1:
                        this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.FINISHED, "");
                        break;
                    case 2:
                        gatherStatistics(query, executePlan);
                        break;
                }
                LOG.info(String.format("Finished job %s in %s ms", this.jobInfo.getJobId(), executePlan));
            }
        } catch (Throwable th) {
            LOG.error("Cannot process plan " + this.jobInfo.getJobId(), th);
            this.jobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "Cannot process plan: " + StringUtils.stringifyException(th));
        }
    }
}
