package moe.dare.briareus.yarn.sensei;

import com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import moe.dare.briareus.api.BriareusException;
import moe.dare.briareus.api.JvmStartFailedException;
import moe.dare.briareus.api.RemoteJvmOptions;
import moe.dare.briareus.api.RemoteJvmProcess;
import moe.dare.briareus.common.concurrent.ThreadFactoryBuilder;
import moe.dare.briareus.common.utils.Either;
import moe.dare.briareus.yarn.launch.LaunchContextFactory;
import moe.dare.briareus.yarn.reousrces.ResourceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:moe/dare/briareus/yarn/sensei/BriareusYarnSenseiContextImpl.class */
class BriareusYarnSenseiContextImpl implements BriareusYarnSenseiContext {
    private static final Logger log = LoggerFactory.getLogger(BriareusYarnSenseiContextImpl.class);
    private static final ExecutionTypeRequest GUARANTEED_EXECUTION_TYPE = ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true);
    private static final ThreadFactory HEARTBEAT_THREAD_FACTORY = ThreadFactoryBuilder.withPrefix("briareus-yarn-sensei-heartbeat-").deamon(false).build();
    private static final String CONTEXT_CLOSED_MSG = "Briareus Sensei context closed";
    private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
    private final NMClientAsync nmClientAsync;
    private final UserGroupInformation user;
    private final LaunchContextFactory launchContextFactory;
    private final ResourceFactory resourceFactory;
    private final Runnable shutdownRequestHandler;
    private volatile Resource maximumResourceCapability;
    private volatile ScheduledExecutorService heartBeat;
    private volatile boolean closed;
    private final ConcurrentMap<Long, CompletableFuture<Container>> allocatingContainers = new ConcurrentHashMap();
    private final ConcurrentMap<ContainerId, CompletableFuture<Void>> startingContainers = new ConcurrentHashMap();
    private final ConcurrentMap<ContainerId, CompletableFuture<Integer>> exitCodes = new ConcurrentHashMap();
    private final AtomicInteger requestCounter = new AtomicInteger();
    private final AtomicDouble progress = new AtomicDouble();
    private volatile ApplicationStatus finalStatus = ApplicationStatus.succeeded();

    /* JADX INFO: Access modifiers changed from: package-private */
    public BriareusYarnSenseiContextImpl(UserGroupInformation userGroupInformation, LaunchContextFactory launchContextFactory, ResourceFactory resourceFactory, Runnable runnable) {
        this.user = (UserGroupInformation) Objects.requireNonNull(userGroupInformation, "user");
        this.launchContextFactory = (LaunchContextFactory) Objects.requireNonNull(launchContextFactory, "launchContextFactory");
        this.resourceFactory = (ResourceFactory) Objects.requireNonNull(resourceFactory, "resourceFactory");
        this.shutdownRequestHandler = (Runnable) Objects.requireNonNull(runnable, "shutdownRequestHandler");
        NMTokenCache nMTokenCache = new NMTokenCache();
        NMCallbackHandler nMCallbackHandler = new NMCallbackHandler(this.startingContainers);
        this.amrmClient = (AMRMClient) userGroupInformation.doAs(AMRMClient::createAMRMClient);
        this.nmClientAsync = (NMClientAsync) userGroupInformation.doAs(() -> {
            return NMClientAsync.createNMClientAsync(nMCallbackHandler);
        });
        this.amrmClient.setNMTokenCache(nMTokenCache);
        this.nmClientAsync.getClient().setNMTokenCache(nMTokenCache);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startContext(Configuration configuration, String str, int i, String str2) {
        Objects.requireNonNull(configuration, "configuration");
        Objects.requireNonNull(str, "host");
        this.heartBeat = Executors.newSingleThreadScheduledExecutor(HEARTBEAT_THREAD_FACTORY);
        this.user.doAs(() -> {
            try {
                this.amrmClient.init(configuration);
                this.amrmClient.start();
                this.nmClientAsync.init(configuration);
                this.nmClientAsync.start();
                this.maximumResourceCapability = this.amrmClient.registerApplicationMaster(str, i, str2).getMaximumResourceCapability();
                return null;
            } catch (Exception e) {
                this.heartBeat.shutdownNow();
                BriareusException briareusException = new BriareusException("Can't start Briareus Sensei", e);
                Iterator it = Arrays.asList(this.amrmClient, this.nmClientAsync).iterator();
                while (it.hasNext()) {
                    try {
                        ((AbstractService) it.next()).stop();
                    } catch (Exception e2) {
                        briareusException.addSuppressed(e2);
                    }
                }
                throw briareusException;
            }
        });
        this.heartBeat.scheduleAtFixedRate(this::heartbeatYarn, 1L, 1L, TimeUnit.SECONDS);
    }

    public CompletionStage<RemoteJvmProcess> start(RemoteJvmOptions remoteJvmOptions) {
        ensureNotClosed();
        verifyOptions(remoteJvmOptions);
        AMRMClient.ContainerRequest createRequest = createRequest(remoteJvmOptions);
        CompletionStage<ContainerLaunchContext> create = this.launchContextFactory.create(remoteJvmOptions);
        return create.handle((v0, v1) -> {
            return Either.oneOfNullable(v0, v1);
        }).thenCombine(allocateContainer(createRequest), (either, container) -> {
            if (!either.isRight()) {
                return launchContainer(container, (ContainerLaunchContext) either.left());
            }
            log.warn("Stopping container {} before start", container.getId());
            this.amrmClient.releaseAssignedContainer(container.getId());
            throw new JvmStartFailedException("Can't prepare container context", (Throwable) either.right());
        }).thenCompose(completableFuture -> {
            return completableFuture;
        }).thenApply(remoteJvmProcess -> {
            return remoteJvmProcess;
        });
    }

    private void verifyOptions(RemoteJvmOptions remoteJvmOptions) {
        Objects.requireNonNull(remoteJvmOptions);
    }

    private AMRMClient.ContainerRequest createRequest(RemoteJvmOptions remoteJvmOptions) {
        int updateAndGet = this.requestCounter.updateAndGet(i -> {
            return Math.max(i + 1, 1);
        });
        return AMRMClient.ContainerRequest.newBuilder().allocationRequestId(updateAndGet).executionTypeRequest(GUARANTEED_EXECUTION_TYPE).capability(this.resourceFactory.resources(remoteJvmOptions, this.maximumResourceCapability)).priority(Priority.newInstance(updateAndGet)).build();
    }

    private CompletableFuture<Container> allocateContainer(AMRMClient.ContainerRequest containerRequest) {
        CompletableFuture<Container> completableFuture = new CompletableFuture<>();
        if (this.allocatingContainers.putIfAbsent(Long.valueOf(containerRequest.getAllocationRequestId()), completableFuture) != null) {
            throw new IllegalStateException("Allocation id " + containerRequest.getAllocationRequestId() + " already registered");
        }
        this.amrmClient.addContainerRequest(containerRequest);
        return completableFuture;
    }

    private CompletableFuture<RemoteJvmProcess> launchContainer(Container container, ContainerLaunchContext containerLaunchContext) {
        ContainerId id = container.getId();
        NodeId nodeId = container.getNodeId();
        try {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            CompletableFuture<Integer> completableFuture2 = new CompletableFuture<>();
            this.startingContainers.put(id, completableFuture);
            this.exitCodes.put(id, completableFuture2);
            this.nmClientAsync.startContainerAsync(container, containerLaunchContext);
            completableFuture.exceptionally(th -> {
                this.exitCodes.remove(id);
                completableFuture2.completeExceptionally(th);
                return null;
            });
            return completableFuture.thenApply(r11 -> {
                return new YarnContainerJvmProcess(this.nmClientAsync, id, nodeId, completableFuture2);
            });
        } catch (Exception e) {
            log.warn("Failed to start container container {}. Releasing.", id);
            this.amrmClient.releaseAssignedContainer(id);
            throw e;
        }
    }

    @Override // moe.dare.briareus.yarn.sensei.BriareusYarnSenseiContext
    public synchronized void setFinalStatus(ApplicationStatus applicationStatus) {
        Objects.requireNonNull(applicationStatus, "status");
        ensureNotClosed();
        this.finalStatus = applicationStatus;
    }

    @Override // moe.dare.briareus.yarn.sensei.BriareusYarnSenseiContext
    public void setProgress(double d) {
        if (d < 0.0d || d > 1.0d) {
            throw new IllegalArgumentException("Progress must be in range [0, 1], but was " + d);
        }
        ensureNotClosed();
        this.progress.set(d);
    }

    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        ArrayList arrayList = new ArrayList();
        try {
            this.nmClientAsync.stop();
        } catch (Exception e) {
            log.error("Node manager client stopped with errors");
            arrayList.add(e);
        }
        this.heartBeat.shutdownNow();
        try {
            this.heartBeat.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            log.warn("Interrupted during awaiting for hearBeatThread completion");
            Thread.currentThread().interrupt();
        }
        if (!this.heartBeat.isTerminated()) {
            log.error("Heartbeat thread is not terminated normally");
            arrayList.add(new IllegalStateException("Heartbeat thread not terminated"));
        }
        try {
            this.amrmClient.unregisterApplicationMaster(this.finalStatus.getFinalApplicationStatus(), this.finalStatus.getMessage(), this.finalStatus.getNewTrackUrl());
        } catch (Exception e3) {
            log.error("Can't unregister from YARN");
            arrayList.add(e3);
        }
        try {
            this.amrmClient.stop();
        } catch (Exception e4) {
            log.error("Can't stop amrm client");
            arrayList.add(e4);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        BriareusException briareusException = new BriareusException("Context not closed properly", (Throwable) arrayList.get(0));
        Stream skip = arrayList.stream().skip(1L);
        briareusException.getClass();
        skip.forEachOrdered((v1) -> {
            r1.addSuppressed(v1);
        });
        throw briareusException;
    }

    private void heartbeatYarn() {
        Either either = (Either) this.user.doAs(() -> {
            try {
                return Either.left(this.amrmClient.allocate((float) this.progress.get()));
            } catch (Exception e) {
                return Either.right(e);
            }
        });
        if (either.isLeft()) {
            processHeartBeatResponse((AllocateResponse) either.left());
        } else {
            processHeartBeatException((Exception) either.right());
        }
    }

    private void processHeartBeatResponse(AllocateResponse allocateResponse) {
        try {
            allocateResponse.getAllocatedContainers().forEach(this::processAllocatedContainer);
            allocateResponse.getCompletedContainersStatuses().forEach(this::processCompletedContainer);
        } catch (Exception e) {
            log.error("Exception processing allocate response.", e);
        }
    }

    private void processAllocatedContainer(Container container) {
        long allocationRequestId = container.getAllocationRequestId();
        if (allocationRequestId <= 0) {
            allocationRequestId = container.getPriority().getPriority();
        }
        CompletableFuture<Container> remove = this.allocatingContainers.remove(Long.valueOf(allocationRequestId));
        if (remove == null) {
            log.warn("Unknown allocation request id {}. Releasing container {}.", Long.valueOf(allocationRequestId), container.getId());
            this.amrmClient.releaseAssignedContainer(container.getId());
            return;
        }
        remove.complete(container);
        ArrayList arrayList = new ArrayList(this.amrmClient.getMatchingRequests(allocationRequestId));
        AMRMClient<AMRMClient.ContainerRequest> aMRMClient = this.amrmClient;
        aMRMClient.getClass();
        arrayList.forEach(aMRMClient::removeContainerRequest);
    }

    private void processCompletedContainer(ContainerStatus containerStatus) {
        ContainerId containerId = containerStatus.getContainerId();
        CompletableFuture<Integer> remove = this.exitCodes.remove(containerId);
        if (remove != null) {
            remove.complete(Integer.valueOf(containerStatus.getExitStatus()));
        } else {
            log.error("Untracked container {} completed.", containerId);
        }
    }

    private void processHeartBeatException(Exception exc) {
        if (exc instanceof ApplicationAttemptNotFoundException) {
            log.warn("Resource manager asked Sensei to shutdown");
            this.shutdownRequestHandler.run();
        } else if (exc instanceof IOException) {
            log.warn("Io exception during YARN heartbeat. Possible RPC problems", exc);
        } else {
            log.error("Exception during heartbeat.", exc);
        }
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException(CONTEXT_CLOSED_MSG);
        }
    }
}
