package moe.dare.briareus.yarn.launch.files;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import moe.dare.briareus.api.BriareusException;
import moe.dare.briareus.api.CacheableFileSource;
import moe.dare.briareus.api.FileEntry;
import moe.dare.briareus.common.concurrent.CompletableFutures;
import moe.dare.briareus.common.utils.Maps;
import moe.dare.briareus.common.utils.Pair;
import moe.dare.briareus.common.utils.Preconditions;
import moe.dare.briareus.yarn.YarnAwareFileSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:moe/dare/briareus/yarn/launch/files/DefaultFileUploadTool.class */
class DefaultFileUploadTool implements FileUploadTool {
    private static final String LOCK_FILE_NAME = ".lock";
    private final AtomicLong filesCounter = new AtomicLong();
    private final Map<Pair<FileEntry.Mode, CacheableFileSource>, CompletableFuture<LocalResource>> sharedFiles = new ConcurrentHashMap();
    private final Supplier<UserGroupInformation> user;
    private final Path directory;
    private final Configuration conf;
    private final Executor executor;
    private volatile boolean closed;
    private static final Logger log = LoggerFactory.getLogger(DefaultFileUploadTool.class);
    private static final FsPermission DIRECTORY_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
    private static final Map<FileEntry.Mode, LocalResourceType> RESOURCE_TYPES = Maps.enumMapOf(FileEntry.Mode.COPY, LocalResourceType.FILE, FileEntry.Mode.UNZIP, LocalResourceType.ARCHIVE);
    private static final Map<FileEntry.Mode, String> EXTENSIONS = Maps.enumMapOf(FileEntry.Mode.COPY, "", FileEntry.Mode.UNZIP, ".zip");

    /* JADX INFO: Access modifiers changed from: package-private */
    public static FileUploadTool create(Supplier<UserGroupInformation> supplier, Configuration configuration, Path path, Executor executor) {
        Objects.requireNonNull(supplier, "user");
        Objects.requireNonNull(path, "directory");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(configuration, "configuration");
        prepareDirectoryAsUser(supplier, path, configuration);
        return new DefaultFileUploadTool(supplier, path, configuration, executor);
    }

    private DefaultFileUploadTool(Supplier<UserGroupInformation> supplier, Path path, Configuration configuration, Executor executor) {
        this.user = supplier;
        this.directory = path;
        this.conf = configuration;
        this.executor = executor;
    }

    @Override // moe.dare.briareus.yarn.launch.files.FileUploadTool
    public CompletableFuture<List<UploadedEntry>> upload(List<FileEntry> list) {
        Preconditions.checkState(!this.closed, "Upload tool closed");
        List list2 = (List) list.stream().map(fileEntry -> {
            return Pair.of(fileEntry, sharedOrProcess(fileEntry));
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        if (list2.stream().map((v0) -> {
            return v0.second();
        }).allMatch((v0) -> {
            return v0.isDone();
        })) {
            try {
                return CompletableFuture.completedFuture(blockingGet(list2));
            } catch (Exception e) {
                return CompletableFutures.failedCompletableFuture(e);
            }
        }
        AtomicInteger atomicInteger = new AtomicInteger(list2.size());
        CompletableFuture<List<UploadedEntry>> completableFuture = new CompletableFuture<>();
        BiConsumer biConsumer = (obj, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else if (atomicInteger.decrementAndGet() == 0) {
                try {
                    completableFuture.complete(blockingGet(list2));
                } catch (Exception e2) {
                    completableFuture.completeExceptionally(e2);
                }
            }
        };
        list2.stream().map((v0) -> {
            return v0.second();
        }).forEach(completableFuture2 -> {
            completableFuture2.whenComplete(biConsumer);
        });
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.sharedFiles.clear();
    }

    private CompletableFuture<LocalResource> sharedOrProcess(FileEntry fileEntry) {
        CacheableFileSource source = fileEntry.source();
        if (!(source instanceof CacheableFileSource)) {
            return processEntry(fileEntry);
        }
        Pair<FileEntry.Mode, CacheableFileSource> of = Pair.of(fileEntry.mode(), source);
        CompletableFuture<LocalResource> completableFuture = this.sharedFiles.get(of);
        if (completableFuture != null && completableFuture.isCompletedExceptionally()) {
            if (this.sharedFiles.remove(of, completableFuture)) {
                log.warn("Remove cached result of failed upload attempt for resource '{}' in mode '{}'.", source, fileEntry.mode());
            }
            completableFuture = null;
        }
        if (completableFuture == null) {
            return this.sharedFiles.computeIfAbsent(of, pair -> {
                return processEntry(fileEntry);
            });
        }
        log.debug("Reusing previous upload request ({}) for file {}", completableFuture, source);
        return completableFuture;
    }

    private CompletableFuture<LocalResource> processEntry(FileEntry fileEntry) {
        PrepareYarnAwareResourceAction createYarnAwarePrepareAction = fileEntry.source() instanceof YarnAwareFileSource ? createYarnAwarePrepareAction(fileEntry) : createCopyAction(fileEntry);
        return CompletableFuture.supplyAsync(() -> {
            return doAsUser(createYarnAwarePrepareAction);
        }, this.executor);
    }

    private PrepareYarnAwareResourceAction createYarnAwarePrepareAction(FileEntry fileEntry) {
        YarnAwareFileSource yarnAwareFileSource = (YarnAwareFileSource) fileEntry.source();
        LocalResourceType localResourceType = RESOURCE_TYPES.get(fileEntry.mode());
        return new PrepareYarnAwareResourceAction(this.conf, yarnAwareFileSource.resourcePath(), yarnAwareFileSource.resourceVisibility(), localResourceType);
    }

    private CopyAction createCopyAction(FileEntry fileEntry) {
        FileEntry.Mode mode = fileEntry.mode();
        LocalResourceType localResourceType = RESOURCE_TYPES.get(mode);
        Path path = new Path(this.directory, this.filesCounter.getAndIncrement() + "_" + fileEntry.name() + EXTENSIONS.get(mode));
        log.debug("{} will be uploaded to {}", fileEntry, path);
        return new CopyAction(this.conf, fileEntry.source(), path, localResourceType);
    }

    private LocalResource doAsUser(PrivilegedExceptionAction<LocalResource> privilegedExceptionAction) {
        try {
            return (LocalResource) this.user.get().doAs(privilegedExceptionAction);
        } catch (Exception e) {
            throw new BriareusException("Can't prepare resource", e);
        }
    }

    private static List<UploadedEntry> blockingGet(List<Pair<FileEntry, CompletableFuture<LocalResource>>> list) {
        return (List) list.stream().map(pair -> {
            return UploadedEntry.of((FileEntry) pair.first(), (LocalResource) ((CompletableFuture) pair.second()).join());
        }).collect(Collectors.toList());
    }

    private static void prepareDirectoryAsUser(Supplier<UserGroupInformation> supplier, Path path, Configuration configuration) {
        UserGroupInformation userGroupInformation = supplier.get();
        String shortUserName = userGroupInformation.getShortUserName();
        Path path2 = new Path(path, LOCK_FILE_NAME);
        userGroupInformation.doAs(() -> {
            try {
                FileSystem fileSystem = path.getFileSystem(configuration);
                if (!fileSystem.exists(path.getParent())) {
                    throw new IllegalStateException("Parent of directory " + path + " does not exists");
                }
                fileSystem.mkdirs(path, DIRECTORY_PERMISSION);
                FileStatus fileStatus = fileSystem.getFileStatus(path);
                if (!fileStatus.getOwner().equals(shortUserName)) {
                    log.error("Directory {} owner is: {}. Current user: {}", new Object[]{path, fileStatus.getOwner(), userGroupInformation});
                    throw new IllegalArgumentException("Directory " + path + " is owned by another user.");
                }
                if (fileSystem.listLocatedStatus(path).hasNext()) {
                    throw new IllegalArgumentException("Directory " + path + " not empty");
                }
                if (!fileSystem.createNewFile(path2)) {
                    log.error("Lock file {} creation failed", path2);
                    throw new IllegalArgumentException("Directory " + path + " is locked by another process");
                }
                if (!fileStatus.getPermission().equals(DIRECTORY_PERMISSION)) {
                    log.warn("Updating directory {} permissions from {} to {}", new Object[]{path, fileStatus.getPermission(), DIRECTORY_PERMISSION});
                    fileSystem.setPermission(path, DIRECTORY_PERMISSION);
                }
                return null;
            } catch (IOException e) {
                throw new BriareusException("Can't prepare directory " + path, e);
            }
        });
        log.info("Prepared directory {} for user {}", path, userGroupInformation);
    }

    @Override // moe.dare.briareus.yarn.launch.files.FileUploadTool
    public /* bridge */ /* synthetic */ CompletionStage upload(List list) {
        return upload((List<FileEntry>) list);
    }
}
