package gobblin.compaction.mapreduce;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import gobblin.compaction.dataset.Dataset;
import gobblin.compaction.dataset.DatasetHelper;
import gobblin.compaction.event.CompactionSlaEventHelper;
import gobblin.configuration.State;
import gobblin.util.FileListUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorJobPropCreator.class */
public class MRCompactorJobPropCreator {
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorJobPropCreator.class);
    protected final Dataset dataset;
    protected final FileSystem fs;
    protected final State state;
    protected final boolean inputDeduplicated;
    protected final boolean outputDeduplicated;
    protected final double lateDataThresholdForRecompact;
    protected final boolean renameSourceDirEnabled;
    protected final boolean recompactFromInputPaths;
    protected final boolean recompactFromOutputPaths;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorJobPropCreator$Builder.class */
    public static class Builder {
        Dataset dataset;
        FileSystem fs;
        State state;
        double lateDataThresholdForRecompact;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withDataset(Dataset dataset) {
            this.dataset = dataset;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withFileSystem(FileSystem fileSystem) {
            this.fs = fileSystem;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder withState(State state) {
            this.state = new State();
            this.state.addAll(state);
            return this;
        }

        Builder withLateDataThresholdForRecompact(double d) {
            this.lateDataThresholdForRecompact = d;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MRCompactorJobPropCreator build() {
            return new MRCompactorJobPropCreator(this);
        }
    }

    private MRCompactorJobPropCreator(Builder builder) {
        this.dataset = builder.dataset;
        this.fs = builder.fs;
        this.state = builder.state;
        this.lateDataThresholdForRecompact = builder.lateDataThresholdForRecompact;
        this.inputDeduplicated = this.state.getPropAsBoolean(MRCompactor.COMPACTION_INPUT_DEDUPLICATED, false);
        this.outputDeduplicated = this.state.getPropAsBoolean(MRCompactor.COMPACTION_OUTPUT_DEDUPLICATED, true);
        this.recompactFromInputPaths = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RECOMPACT_FROM_INPUT_FOR_LATE_DATA, false);
        this.recompactFromOutputPaths = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RECOMPACT_FROM_DEST_PATHS, false);
        this.renameSourceDirEnabled = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Dataset> createJobProps() throws IOException {
        if (!Iterables.tryFind(this.dataset.inputPaths(), new Predicate<Path>() { // from class: gobblin.compaction.mapreduce.MRCompactorJobPropCreator.1
            public boolean apply(Path path) {
                try {
                    return MRCompactorJobPropCreator.this.fs.exists(path);
                } catch (IOException e) {
                    MRCompactorJobPropCreator.LOG.error(String.format("Failed to check if %s exits", path), e);
                    return false;
                }
            }
        }).isPresent()) {
            LOG.warn("Input folders " + this.dataset.inputPaths() + " do not exist. Skipping dataset " + this.dataset);
            return ImmutableList.of();
        }
        Optional<Dataset> createJobProps = createJobProps(this.dataset);
        if (!createJobProps.isPresent()) {
            return ImmutableList.of();
        }
        setCompactionSLATimestamp((Dataset) createJobProps.get());
        return ImmutableList.of(createJobProps.get());
    }

    private void setCompactionSLATimestamp(Dataset dataset) {
        if ((this.recompactFromOutputPaths || !MRCompactor.datasetAlreadyCompacted(this.fs, dataset, this.renameSourceDirEnabled)) && dataset.jobProps().contains(MRCompactor.COMPACTION_INPUT_PATH_TIME)) {
            CompactionSlaEventHelper.setUpstreamTimeStamp(this.state, dataset.jobProps().getPropAsLong(MRCompactor.COMPACTION_INPUT_PATH_TIME) + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS));
        }
    }

    private boolean latePathsFound(Dataset dataset) throws IOException, FileNotFoundException {
        for (Path path : dataset.inputLatePaths()) {
            if (this.fs.exists(path) && this.fs.listStatus(path).length > 0) {
                return true;
            }
        }
        return false;
    }

    protected Optional<Dataset> createJobProps(Dataset dataset) throws IOException {
        if (this.recompactFromOutputPaths && !latePathsFound(dataset)) {
            LOG.info(String.format("Skipping recompaction for %s since there is no late data in %s", dataset.inputPaths(), dataset.inputLatePaths()));
            return Optional.absent();
        }
        State state = new State();
        state.addAll(this.state);
        state.setProp(MRCompactor.COMPACTION_ENABLE_SUCCESS_FILE, false);
        state.setProp(MRCompactor.COMPACTION_INPUT_DEDUPLICATED, Boolean.valueOf(this.inputDeduplicated));
        state.setProp(MRCompactor.COMPACTION_OUTPUT_DEDUPLICATED, Boolean.valueOf(this.outputDeduplicated));
        state.setProp(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, Boolean.valueOf(!this.inputDeduplicated && this.outputDeduplicated));
        if (!this.recompactFromOutputPaths && MRCompactor.datasetAlreadyCompacted(this.fs, dataset, this.renameSourceDirEnabled)) {
            return obtainDatasetWithJobProps(state, dataset);
        }
        if (this.renameSourceDirEnabled) {
            Set<Path> deepestLevelUnrenamedDirsWithFileExistence = MRCompactor.getDeepestLevelUnrenamedDirsWithFileExistence(this.fs, dataset.inputPaths());
            if (getAllFilePathsRecursively(deepestLevelUnrenamedDirsWithFileExistence).isEmpty()) {
                return Optional.absent();
            }
            LOG.info("[{}] has unprocessed directories for first time compaction: {}", dataset.getDatasetName(), deepestLevelUnrenamedDirsWithFileExistence);
            dataset.overwriteInputPaths(deepestLevelUnrenamedDirsWithFileExistence);
            dataset.setRenamePaths(deepestLevelUnrenamedDirsWithFileExistence);
        } else {
            addInputLateFilesForFirstTimeCompaction(state, dataset);
        }
        LOG.info(String.format("Created MR job properties for input %s and output %s.", dataset.inputPaths(), dataset.outputPath()));
        dataset.setJobProps(state);
        return Optional.of(dataset);
    }

    private void addInputLateFilesForFirstTimeCompaction(State state, Dataset dataset) throws IOException {
        if (latePathsFound(dataset) && this.outputDeduplicated) {
            dataset.addAdditionalInputPaths(dataset.inputLatePaths());
            if (this.outputDeduplicated) {
                state.setProp(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
            }
        }
    }

    private Set<Path> getAllFilePathsRecursively(Set<Path> set) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        Iterator it = FileListUtils.listFilesRecursively(this.fs, set).iterator();
        while (it.hasNext()) {
            newHashSet.add(((FileStatus) it.next()).getPath());
        }
        return newHashSet;
    }

    private Optional<Dataset> obtainDatasetWithJobProps(State state, Dataset dataset) throws IOException {
        if (this.recompactFromInputPaths) {
            LOG.info(String.format("Will recompact for %s.", dataset.outputPath()));
            addInputLateFilesForFirstTimeCompaction(state, dataset);
        } else {
            Set hashSet = new HashSet();
            if (this.renameSourceDirEnabled) {
                Set<Path> deepestLevelUnrenamedDirsWithFileExistence = MRCompactor.getDeepestLevelUnrenamedDirsWithFileExistence(this.fs, dataset.inputPaths());
                if (deepestLevelUnrenamedDirsWithFileExistence.isEmpty()) {
                    LOG.info("[{}] doesn't have unprocessed directories", dataset.getDatasetName());
                } else {
                    Set<Path> allFilePathsRecursively = getAllFilePathsRecursively(deepestLevelUnrenamedDirsWithFileExistence);
                    if (allFilePathsRecursively.isEmpty()) {
                        LOG.info("[{}] has unprocessed directories but all empty: {}", dataset.getDatasetName(), deepestLevelUnrenamedDirsWithFileExistence);
                    } else {
                        dataset.setRenamePaths(deepestLevelUnrenamedDirsWithFileExistence);
                        hashSet.addAll(allFilePathsRecursively);
                        LOG.info("[{}] has unprocessed directories: {}", dataset.getDatasetName(), deepestLevelUnrenamedDirsWithFileExistence);
                    }
                }
            } else {
                hashSet = getNewDataInFolder(dataset.inputPaths(), dataset.outputPath());
                Set<Path> newDataInFolder = getNewDataInFolder(dataset.inputLatePaths(), dataset.outputPath());
                hashSet.addAll(newDataInFolder);
                if (!hashSet.isEmpty() && !newDataInFolder.isEmpty()) {
                    dataset.addAdditionalInputPaths(dataset.inputLatePaths());
                }
            }
            if (!hashSet.isEmpty()) {
                LOG.info(String.format("Will copy %d new data files for %s", Integer.valueOf(hashSet.size()), dataset.outputPath()));
                state.setProp(MRCompactor.COMPACTION_JOB_LATE_DATA_MOVEMENT_TASK, true);
                state.setProp(MRCompactor.COMPACTION_JOB_LATE_DATA_FILES, Joiner.on(",").join(hashSet));
            } else {
                if (!isOutputLateDataExists(dataset)) {
                    return Optional.absent();
                }
                LOG.info("{} don't have new data, but previous late data still remains, check if it requires to move", dataset.getDatasetName());
                dataset.setJobProps(state);
                dataset.checkIfNeedToRecompact(new DatasetHelper(dataset, this.fs, Lists.newArrayList(new String[]{"avro"})));
                if (!dataset.needToRecompact()) {
                    return Optional.absent();
                }
                MRCompactor.modifyDatasetStateToRecompact(dataset);
            }
        }
        dataset.setJobProps(state);
        return Optional.of(dataset);
    }

    private boolean isOutputLateDataExists(Dataset dataset) throws IOException {
        return this.fs.exists(dataset.outputLatePath()) && this.fs.listStatus(dataset.outputLatePath()).length > 0;
    }

    private Set<Path> getNewDataInFolder(Set<Path> set, Path path) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        Iterator<Path> it = set.iterator();
        while (it.hasNext()) {
            newHashSet.addAll(getNewDataInFolder(it.next(), path));
        }
        return newHashSet;
    }

    private Set<Path> getNewDataInFolder(Path path, Path path2) throws IOException {
        HashSet newHashSet = Sets.newHashSet();
        if (!this.fs.exists(path) || !this.fs.exists(path2)) {
            return newHashSet;
        }
        DateTime dateTime = new DateTime(MRCompactor.readCompactionTimestamp(this.fs, path2));
        for (FileStatus fileStatus : FileListUtils.listFilesRecursively(this.fs, path)) {
            DateTime dateTime2 = new DateTime(fileStatus.getModificationTime());
            if (dateTime2.isAfter(dateTime)) {
                LOG.info("[" + dateTime2.getMillis() + "] " + fileStatus.getPath() + " is after " + dateTime.getMillis());
                newHashSet.add(fileStatus.getPath());
            }
        }
        if (!newHashSet.isEmpty()) {
            LOG.info(String.format("Found %d new files within folder %s which are more recent than the previous compaction start time of %s.", Integer.valueOf(newHashSet.size()), path, dateTime));
        }
        return newHashSet;
    }

    public Dataset createFailedJobProps(Throwable th) {
        this.dataset.setJobProps(this.state);
        this.dataset.skip(th);
        return this.dataset;
    }
}
