package com.microsoft.reef.runtime.yarn.driver;

import com.google.protobuf.ByteString;
import com.microsoft.reef.proto.DriverRuntimeProtocol;
import com.microsoft.reef.proto.ReefServiceProtos;
import com.microsoft.reef.runtime.common.driver.DriverStatusManager;
import com.microsoft.reef.runtime.common.driver.evaluator.EvaluatorManager;
import com.microsoft.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
import com.microsoft.reef.runtime.yarn.util.YarnTypes;
import com.microsoft.reef.util.Optional;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.wake.remote.impl.ObjectSerializableCodec;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

/* loaded from: input_file:com/microsoft/reef/runtime/yarn/driver/YarnContainerManager.class */
final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
    private static final Logger LOG = Logger.getLogger(YarnContainerManager.class.getName());
    private static final String RUNTIME_NAME = "YARN";
    private static final String ADD_FLAG = "+";
    private static final String REMOVE_FLAG = "-";
    private final YarnConfiguration yarnConf;
    private final AMRMClientAsync resourceManager;
    private final NMClientAsync nodeManager;
    private final REEFEventHandlers reefEventHandlers;
    private final Containers containers;
    private final ApplicationMasterRegistration registration;
    private final ContainerRequestCounter containerRequestCounter;
    private final DriverStatusManager driverStatusManager;
    private final TrackingURLProvider trackingURLProvider;
    private final YarnClient yarnClient = YarnClient.createYarnClient();
    private final Queue<AMRMClient.ContainerRequest> outstandingContainerRequests = new ConcurrentLinkedQueue();
    private int lastContainerMemory = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.microsoft.reef.runtime.yarn.driver.YarnContainerManager$1, reason: invalid class name */
    /* loaded from: input_file:com/microsoft/reef/runtime/yarn/driver/YarnContainerManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState = new int[ContainerState.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState[ContainerState.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    @Inject
    YarnContainerManager(YarnConfiguration yarnConfiguration, @Parameter(YarnHeartbeatPeriod.class) int i, REEFEventHandlers rEEFEventHandlers, Containers containers, ApplicationMasterRegistration applicationMasterRegistration, ContainerRequestCounter containerRequestCounter, DriverStatusManager driverStatusManager, TrackingURLProvider trackingURLProvider) throws IOException {
        this.reefEventHandlers = rEEFEventHandlers;
        this.driverStatusManager = driverStatusManager;
        this.containers = containers;
        this.registration = applicationMasterRegistration;
        this.containerRequestCounter = containerRequestCounter;
        this.yarnConf = yarnConfiguration;
        this.trackingURLProvider = trackingURLProvider;
        this.yarnClient.init(this.yarnConf);
        this.resourceManager = AMRMClientAsync.createAMRMClientAsync(i, this);
        this.nodeManager = new NMClientAsyncImpl(this);
        LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
    }

    public final void onContainersCompleted(List<ContainerStatus> list) {
        Iterator<ContainerStatus> it = list.iterator();
        while (it.hasNext()) {
            onContainerStatus(it.next());
        }
    }

    public final void onContainersAllocated(List<Container> list) {
        String format = String.format("%s:%d", Thread.currentThread().getName().replace(' ', '_'), Long.valueOf(System.currentTimeMillis()));
        LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}", new Object[]{format, Integer.valueOf(list.size()), Integer.valueOf(this.containerRequestCounter.get())});
        Iterator<Container> it = list.iterator();
        while (it.hasNext()) {
            handleNewContainer(it.next(), false);
        }
        LOG.log(Level.FINE, "TIME: Processed Containers {0}", format);
    }

    public void onShutdownRequest() {
        this.reefEventHandlers.onRuntimeStatus(DriverRuntimeProtocol.RuntimeStatusProto.newBuilder().setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build());
        this.driverStatusManager.onError(new Exception("Shutdown requested by YARN."));
    }

    public void onNodesUpdated(List<NodeReport> list) {
        Iterator<NodeReport> it = list.iterator();
        while (it.hasNext()) {
            onNodeReport(it.next());
        }
    }

    public final float getProgress() {
        return 0.0f;
    }

    public final void onError(Throwable th) {
        onRuntimeError(th);
    }

    public final void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
        Optional<Container> optional = this.containers.getOptional(containerId.toString());
        if (optional.isPresent()) {
            this.nodeManager.getContainerStatusAsync(containerId, ((Container) optional.get()).getNodeId());
        }
    }

    public final void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
        onContainerStatus(containerStatus);
    }

    public final void onContainerStopped(ContainerId containerId) {
        if (this.containers.hasContainer(containerId.toString())) {
            DriverRuntimeProtocol.ResourceStatusProto.Builder identifier = DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
            identifier.setState(ReefServiceProtos.State.DONE);
            this.reefEventHandlers.onResourceStatus(identifier.build());
        }
    }

    public final void onStartContainerError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    public final void onGetContainerStatusError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    public final void onStopContainerError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void submit(Container container, ContainerLaunchContext containerLaunchContext) {
        this.nodeManager.startContainerAsync(container, containerLaunchContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void release(String str) {
        LOG.log(Level.FINE, "Release container: {0}", str);
        Container removeAndGet = this.containers.removeAndGet(str);
        this.resourceManager.releaseAssignedContainer(removeAndGet.getId());
        logContainerRemoval(removeAndGet.getId().toString());
        updateRuntimeStatus();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onStart() {
        this.yarnClient.start();
        this.resourceManager.init(this.yarnConf);
        this.resourceManager.start();
        this.nodeManager.init(this.yarnConf);
        this.nodeManager.start();
        try {
            Iterator it = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING}).iterator();
            while (it.hasNext()) {
                onNodeReport((NodeReport) it.next());
            }
        } catch (IOException | YarnException e) {
            LOG.log(Level.WARNING, "Unable to fetch node reports from YARN.", (Throwable) e);
            onRuntimeError(e);
        }
        try {
            this.registration.setRegistration(this.resourceManager.registerApplicationMaster("", 0, this.trackingURLProvider.getTrackingUrl()));
            LOG.log(Level.FINE, "YARN registration: {0}", this.registration);
        } catch (YarnException | IOException e2) {
            LOG.log(Level.WARNING, "Unable to register application master.", e2);
            onRuntimeError(e2);
        }
        if (YarnTypes.isAtOrAfterVersion("2.4.0")) {
            LOG.log(Level.FINEST, "Hadoop version is {0} or after with support to retain previous containers, processing previous containers.", "2.4.0");
            processPreviousContainers();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onStop() {
        LOG.log(Level.FINE, "Stop Runtime: RM status {0}", this.resourceManager.getServiceState());
        if (this.resourceManager.getServiceState() == Service.STATE.STARTED) {
            try {
                this.reefEventHandlers.close();
                this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, (String) null, (String) null);
                this.resourceManager.close();
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Error shutting down YARN application", (Throwable) e);
            }
        }
        if (this.nodeManager.getServiceState() == Service.STATE.STARTED) {
            try {
                this.nodeManager.close();
            } catch (IOException e2) {
                LOG.log(Level.WARNING, "Error closing YARN Node Manager", (Throwable) e2);
            }
        }
    }

    private void onNodeReport(NodeReport nodeReport) {
        LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
        this.reefEventHandlers.onNodeDescriptor(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder().setIdentifier(nodeReport.getNodeId().toString()).setHostName(nodeReport.getNodeId().getHost()).setPort(nodeReport.getNodeId().getPort()).setMemorySize(nodeReport.getCapability().getMemory()).setRackName(nodeReport.getRackName()).build());
    }

    private void handleContainerError(ContainerId containerId, Throwable th) {
        DriverRuntimeProtocol.ResourceStatusProto.Builder identifier = DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
        identifier.setState(ReefServiceProtos.State.FAILED);
        identifier.setExitCode(1);
        identifier.setDiagnostics(th.getMessage());
        this.reefEventHandlers.onResourceStatus(identifier.build());
    }

    private void processPreviousContainers() {
        List<Container> containersFromPreviousAttempts = this.registration.getRegistration().getContainersFromPreviousAttempts();
        if (containersFromPreviousAttempts == null || containersFromPreviousAttempts.isEmpty()) {
            return;
        }
        LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", Integer.valueOf(containersFromPreviousAttempts.size()));
        this.driverStatusManager.setNumPreviousContainers(containersFromPreviousAttempts.size());
        Set<String> expectedContainersFromLogReplay = getExpectedContainersFromLogReplay();
        int size = expectedContainersFromLogReplay.size();
        int size2 = containersFromPreviousAttempts.size();
        if (size > size2) {
            LOG.log(Level.WARNING, "Expected {0} containers while only {1} are still alive", new Object[]{Integer.valueOf(size), Integer.valueOf(size2)});
            HashSet hashSet = new HashSet();
            Iterator it = containersFromPreviousAttempts.iterator();
            while (it.hasNext()) {
                hashSet.add(((Container) it.next()).getId().toString());
            }
            for (String str : expectedContainersFromLogReplay) {
                if (!hashSet.contains(str)) {
                    logContainerRemoval(str);
                    LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.", str);
                    informAboutConatinerFailureDuringRestart(str);
                }
            }
        }
        if (size < size2) {
            throw new RuntimeException("Expected only [" + size + "] containers but resource manager believe that [" + size2 + "] are outstanding for driver.");
        }
        for (Container container : containersFromPreviousAttempts) {
            LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
            if (!expectedContainersFromLogReplay.contains(container.getId().toString())) {
                throw new RuntimeException("Not expecting container " + container.getId().toString());
            }
            handleNewContainer(container, true);
        }
    }

    private void onContainerStatus(ContainerStatus containerStatus) {
        String containerId = containerStatus.getContainerId().toString();
        if (this.containers.hasContainer(containerId)) {
            LOG.log(Level.FINE, "Received container status: {0}", containerId);
            DriverRuntimeProtocol.ResourceStatusProto.Builder identifier = DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(containerId);
            switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState[containerStatus.getState().ordinal()]) {
                case 1:
                    LOG.log(Level.FINE, "Container completed: status {0}", Integer.valueOf(containerStatus.getExitStatus()));
                    switch (containerStatus.getExitStatus()) {
                        case 0:
                            identifier.setState(ReefServiceProtos.State.DONE);
                            break;
                        case 143:
                            identifier.setState(ReefServiceProtos.State.KILLED);
                            break;
                        default:
                            identifier.setState(ReefServiceProtos.State.FAILED);
                            break;
                    }
                    identifier.setExitCode(containerStatus.getExitStatus());
                    this.containers.removeAndGet(containerId);
                    logContainerRemoval(containerId);
                    break;
                default:
                    LOG.info("Container running");
                    identifier.setState(ReefServiceProtos.State.RUNNING);
                    break;
            }
            if (containerStatus.getDiagnostics() != null) {
                LOG.log(Level.FINE, "Container diagnostics: {0}", containerStatus.getDiagnostics());
                identifier.setDiagnostics(containerStatus.getDiagnostics());
            }
            this.reefEventHandlers.onResourceStatus(identifier.build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onContainerRequest(AMRMClient.ContainerRequest... containerRequestArr) {
        synchronized (this) {
            this.containerRequestCounter.incrementBy(containerRequestArr.length);
            boolean isEmpty = this.outstandingContainerRequests.isEmpty();
            for (AMRMClient.ContainerRequest containerRequest : containerRequestArr) {
                LOG.log(Level.FINEST, "Container Request: memory = {0}, core number = {1}", new Object[]{Integer.valueOf(containerRequest.getCapability().getMemory()), Integer.valueOf(containerRequest.getCapability().getVirtualCores())});
                LOG.log(Level.FINEST, "Adding container request to local queue: {0}", containerRequest);
                this.outstandingContainerRequests.add(containerRequest);
            }
            if (isEmpty && containerRequestArr.length != 0) {
                AMRMClient.ContainerRequest peek = this.outstandingContainerRequests.peek();
                LOG.log(Level.INFO, "Requesting first container from YARN: {0}", peek);
                this.lastContainerMemory = peek.getCapability().getMemory();
                this.resourceManager.addContainerRequest(peek);
            }
        }
        updateRuntimeStatus();
    }

    private void handleNewContainer(Container container, boolean z) {
        LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
        if (z) {
            return;
        }
        synchronized (this) {
            this.containerRequestCounter.decrement();
            if (this.outstandingContainerRequests.isEmpty()) {
                LOG.log(Level.WARNING, "outstandingContainerRequests is empty upon allocation of unexpected container {0}, releasing...", container.getId());
                this.resourceManager.releaseAssignedContainer(container.getId());
            } else {
                this.containers.add(container);
                AMRMClient.ContainerRequest peek = this.outstandingContainerRequests.peek();
                int memory = peek.getCapability().getMemory();
                AMRMClient.ContainerRequest remove = this.outstandingContainerRequests.remove();
                if (this.lastContainerMemory == memory) {
                    try {
                        LOG.log(Level.INFO, "Requesting same amount of memory {0}, make sure previous request {1} is removed from AMRM client queue", new Object[]{Integer.valueOf(memory), remove});
                        this.resourceManager.removeContainerRequest(remove);
                    } catch (Exception e) {
                        LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's queue, removal attempt failed with exception", (Throwable) e);
                    }
                }
                if (peek != null) {
                    LOG.log(Level.INFO, "Requesting 1 additional container from YARN: {0}", peek);
                    this.resourceManager.addContainerRequest(peek);
                    this.lastContainerMemory = memory;
                }
                LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{Integer.valueOf(container.getResource().getMemory()), Integer.valueOf(container.getResource().getVirtualCores())});
                this.reefEventHandlers.onResourceAllocation(DriverRuntimeProtocol.ResourceAllocationProto.newBuilder().setIdentifier(container.getId().toString()).setNodeId(container.getNodeId().toString()).setResourceMemory(container.getResource().getMemory()).setVirtualCores(container.getResource().getVirtualCores()).build());
                logContainerAddition(container.getId().toString());
                updateRuntimeStatus();
            }
        }
    }

    private void updateRuntimeStatus() {
        DriverRuntimeProtocol.RuntimeStatusProto.Builder outstandingContainerRequests = DriverRuntimeProtocol.RuntimeStatusProto.newBuilder().setName(RUNTIME_NAME).setState(ReefServiceProtos.State.RUNNING).setOutstandingContainerRequests(this.containerRequestCounter.get());
        Iterator<String> it = this.containers.getContainerIds().iterator();
        while (it.hasNext()) {
            outstandingContainerRequests.addContainerAllocation(it.next());
        }
        this.reefEventHandlers.onRuntimeStatus(outstandingContainerRequests.build());
    }

    private void onRuntimeError(Throwable th) {
        try {
            try {
                this.reefEventHandlers.close();
                this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED, th.getMessage(), (String) null);
                this.resourceManager.stop();
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Error shutting down YARN application", (Throwable) e);
                this.resourceManager.stop();
            }
            DriverRuntimeProtocol.RuntimeStatusProto.Builder name = DriverRuntimeProtocol.RuntimeStatusProto.newBuilder().setState(ReefServiceProtos.State.FAILED).setName(RUNTIME_NAME);
            name.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder().setName(RUNTIME_NAME).setMessage(th.getMessage()).setException(ByteString.copyFrom(new ObjectSerializableCodec().encode(th))).build()).build();
            this.reefEventHandlers.onRuntimeStatus(name.build());
        } catch (Throwable th2) {
            this.resourceManager.stop();
            throw th2;
        }
    }

    private Set<String> getExpectedContainersFromLogReplay() {
        Configuration configuration = new Configuration();
        configuration.setBoolean("dfs.support.append", true);
        configuration.setBoolean("dfs.support.broken.append", true);
        HashSet hashSet = new HashSet();
        try {
            FileSystem fileSystem = FileSystem.get(configuration);
            Path path = new Path(getChangeLogLocation());
            if (!fileSystem.exists(path)) {
                return hashSet;
            }
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
            for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                if (readLine.startsWith(ADD_FLAG)) {
                    String substring = readLine.substring(ADD_FLAG.length());
                    if (hashSet.contains(substring)) {
                        throw new RuntimeException("Duplicated add container record found in the change log for container " + substring);
                    }
                    hashSet.add(substring);
                } else if (readLine.startsWith(REMOVE_FLAG)) {
                    String substring2 = readLine.substring(REMOVE_FLAG.length());
                    if (!hashSet.contains(substring2)) {
                        throw new RuntimeException("Change log includes record that try to remove non-exist or duplicate remove record for container + " + substring2);
                    }
                    hashSet.remove(substring2);
                } else {
                    continue;
                }
            }
            bufferedReader.close();
            return hashSet;
        } catch (IOException e) {
            throw new RuntimeException("Cannot read from log file", e);
        }
    }

    private void informAboutConatinerFailureDuringRestart(String str) {
        LOG.log(Level.WARNING, "Container [" + str + "] has failed during driver restart process, FailedEvaluaorHandler will be triggered, but no additional evaluator can be requested due to YARN-2433.");
        this.reefEventHandlers.onResourceStatus(DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(str).setState(ReefServiceProtos.State.FAILED).setExitCode(1).setDiagnostics("Container [" + str + "] failed during driver restart process.").setIsFromPreviousDriver(true).build());
    }

    private void writeToEvaluatorLog(String str) {
        Configuration configuration = new Configuration();
        configuration.setBoolean("dfs.support.append", true);
        configuration.setBoolean("dfs.support.broken.append", true);
        boolean z = false;
        try {
            FileSystem fileSystem = FileSystem.get(configuration);
            Path path = new Path(getChangeLogLocation());
            try {
                z = fileSystem.exists(path);
                BufferedWriter bufferedWriter = !z ? new BufferedWriter(new OutputStreamWriter(fileSystem.create(path))) : new BufferedWriter(new OutputStreamWriter(fileSystem.append(path)));
                bufferedWriter.write(str);
                bufferedWriter.close();
            } catch (IOException e) {
                if (!z) {
                    throw new RuntimeException("Cannot open or write to log file", e);
                }
                appendByDeleteAndCreate(fileSystem, path, str);
            }
        } catch (IOException e2) {
            throw new RuntimeException("Cannot instantiate HDFS fs.", e2);
        }
    }

    private void appendByDeleteAndCreate(FileSystem fileSystem, Path path, String str) {
        try {
            FSDataInputStream open = fileSystem.open(path);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            IOUtils.copyBytes(open, byteArrayOutputStream, 4096, true);
            String str2 = byteArrayOutputStream.toString() + str;
            fileSystem.delete(path, true);
            IOUtils.copyBytes(new ByteArrayInputStream(str2.getBytes()), fileSystem.create(path), 4096, true);
        } catch (IOException e) {
            throw new RuntimeException("Cannot append by read-append-delete-create with exception.", e);
        }
    }

    private String getChangeLogLocation() {
        return "/ReefApplications/" + EvaluatorManager.getJobIdentifier() + "/evaluatorsChangesLog";
    }

    private void logContainerAddition(String str) {
        writeToEvaluatorLog(ADD_FLAG + str + System.lineSeparator());
    }

    private void logContainerRemoval(String str) {
        writeToEvaluatorLog(REMOVE_FLAG + str + System.lineSeparator());
    }
}
