package eu.stratosphere.sopremo.server;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.rpc.RPCService;
import eu.stratosphere.sopremo.execution.ExecutionRequest;
import eu.stratosphere.sopremo.execution.ExecutionResponse;
import eu.stratosphere.sopremo.execution.LibraryTransferAgent;
import eu.stratosphere.sopremo.execution.SopremoExecutionProtocol;
import eu.stratosphere.sopremo.execution.SopremoID;
import eu.stratosphere.util.StringUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/sopremo/server/SopremoServer.class */
public class SopremoServer implements SopremoExecutionProtocol, Closeable {
    private RPCService rpcService;
    private final Configuration configuration;
    private InetSocketAddress serverAddress;
    private InetSocketAddress jobManagerAddress;
    private final ScheduledExecutorService executorService;
    private final Map<SopremoID, SopremoJobInfo> jobInfo;
    private boolean stopped;
    private final LibraryTransferAgent libraryTransferAgent;
    private static final Log LOG = LogFactory.getLog(SopremoServer.class);
    private static final int SLEEPINTERVAL = 1000;

    public SopremoServer() {
        this(GlobalConfiguration.getConfiguration());
    }

    public SopremoServer(Configuration configuration) {
        this.executorService = createExecutor();
        this.jobInfo = new HashMap();
        this.stopped = false;
        this.libraryTransferAgent = new LibraryTransferAgent();
        this.configuration = configuration;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.rpcService != null) {
            this.rpcService.shutDown();
            this.rpcService = null;
        }
        this.executorService.shutdownNow();
    }

    public ExecutionResponse execute(ExecutionRequest executionRequest) {
        SopremoID generate = SopremoID.generate();
        LOG.info("Receive execution request for job " + generate);
        SopremoJobInfo sopremoJobInfo = new SopremoJobInfo(generate, executionRequest, this.configuration);
        this.jobInfo.put(generate, sopremoJobInfo);
        if (executionRequest.getQuery() == null) {
            sopremoJobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "No plan submitted");
        } else if (executionRequest.getMode() == null) {
            sopremoJobInfo.setStatusAndDetail(ExecutionResponse.ExecutionState.ERROR, "No mode set");
        } else {
            this.executorService.submit(new SopremoExecutionThread(sopremoJobInfo, getJobManagerAddress()));
        }
        return getState(generate);
    }

    public InetSocketAddress getJobManagerAddress() {
        InetSocketAddress inetSocketAddress = this.jobManagerAddress;
        if (inetSocketAddress == null) {
            inetSocketAddress = new InetSocketAddress(this.configuration.getString("jobmanager.rpc.address", "localhost"), this.configuration.getInteger("jobmanager.rpc.port", 6123));
        }
        return inetSocketAddress;
    }

    public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest libraryCacheProfileRequest) throws IOException {
        return this.libraryTransferAgent.getLibraryCacheProfile(libraryCacheProfileRequest);
    }

    public Object getMetaData(SopremoID sopremoID, String str) {
        return this.jobInfo.get(sopremoID).getMetaData(str);
    }

    public InetSocketAddress getServerAddress() {
        InetSocketAddress inetSocketAddress = this.serverAddress;
        if (inetSocketAddress == null) {
            inetSocketAddress = new InetSocketAddress(this.configuration.getString("sopremo.rpc.address", "localhost"), this.configuration.getInteger("sopremo.rpc.port", 6201));
        }
        return inetSocketAddress;
    }

    public ExecutionResponse getState(SopremoID sopremoID) {
        SopremoJobInfo sopremoJobInfo = this.jobInfo.get(sopremoID);
        return sopremoJobInfo == null ? new ExecutionResponse(sopremoID, ExecutionResponse.ExecutionState.ERROR, "Unknown job") : new ExecutionResponse(sopremoID, sopremoJobInfo.getStatus(), sopremoJobInfo.getDetail());
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public void setJobManagerAddress(InetSocketAddress inetSocketAddress) {
        if (inetSocketAddress == null) {
            throw new NullPointerException("jobManagerAddress must not be null");
        }
        this.jobManagerAddress = inetSocketAddress;
    }

    public void setServerAddress(InetSocketAddress inetSocketAddress) {
        if (inetSocketAddress == null) {
            throw new NullPointerException("rpcServerAddress must not be null");
        }
        this.serverAddress = inetSocketAddress;
    }

    public void start() throws IOException {
        startServer();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: eu.stratosphere.sopremo.server.SopremoServer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                SopremoServer.this.stop();
                SopremoServer.this.close();
            }
        });
    }

    public void stop() {
        this.stopped = true;
    }

    public void updateLibraryCache(LibraryCacheUpdate libraryCacheUpdate) throws IOException {
        this.libraryTransferAgent.updateLibraryCache(libraryCacheUpdate);
    }

    private ScheduledThreadPoolExecutor createExecutor() {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        scheduledThreadPoolExecutor.setMaximumPoolSize(1);
        return scheduledThreadPoolExecutor;
    }

    private void startServer() throws IOException {
        this.rpcService = new RPCService(getServerAddress().getPort(), 2);
        this.rpcService.setProtocolCallbackHandler(SopremoExecutionProtocol.class, this);
    }

    public static void main(String[] strArr) {
        OptionBuilder.withArgName("config directory");
        OptionBuilder.hasArg();
        OptionBuilder.withDescription("Specify configuration directory.");
        Option create = OptionBuilder.create("configDir");
        Options options = new Options();
        options.addOption(create);
        try {
            GlobalConfiguration.loadConfiguration(new GnuParser().parse(options, strArr).getOptionValue(create.getOpt(), (String) null));
        } catch (ParseException e) {
            LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
            System.exit(1);
        }
        SopremoServer sopremoServer = new SopremoServer();
        try {
            sopremoServer.start();
            while (!Thread.interrupted()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
            }
            sopremoServer.close();
        } catch (IOException e3) {
            LOG.error("Cannot start Sopremo server: " + StringUtils.stringifyException(e3));
            sopremoServer.close();
        }
    }
}
