package gobblin.compaction.mapreduce;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import gobblin.compaction.dataset.Dataset;
import gobblin.compaction.dataset.DatasetHelper;
import gobblin.compaction.event.CompactionSlaEventHelper;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.event.EventSubmitter;
import gobblin.util.ExecutorsUtils;
import gobblin.util.FileListUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.RecordCountProvider;
import gobblin.util.WriterUtils;
import gobblin.util.executors.ScalingThreadPoolExecutor;
import gobblin.util.recordcount.LateFileRecordCountProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorJobRunner.class */
public abstract class MRCompactorJobRunner implements Runnable, Comparable<MRCompactorJobRunner> {
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorJobRunner.class);
    private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
    public static final String COMPACTION_JOB_OUTPUT_DIR_PERMISSION = "compaction.job.output.dir.permission";
    public static final String COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE = "compaction.job.target.output.file.size";
    public static final long DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE = 536870912;
    public static final String COMPACTION_JOB_MAX_NUM_REDUCERS = "compaction.job.max.num.reducers";
    public static final int DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS = 900;
    private static final String COMPACTION_JOB_OVERWRITE_OUTPUT_DIR = "compaction.job.overwrite.output.dir";
    private static final boolean DEFAULT_COMPACTION_JOB_OVERWRITE_OUTPUT_DIR = false;
    private static final String COMPACTION_JOB_ABORT_UPON_NEW_DATA = "compaction.job.abort.upon.new.data";
    private static final boolean DEFAULT_COMPACTION_JOB_ABORT_UPON_NEW_DATA = false;
    private static final String COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE = "compaction.job.copy.latedata.thread.pool.size";
    private static final int DEFAULT_COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE = 5;
    public static final String COMPACTION_JOB_USE_PRIME_REDUCERS = "compaction.job.use.prime.reducers";
    public static final boolean DEFAULT_COMPACTION_JOB_USE_PRIME_REDUCERS = true;
    public static final String HADOOP_JOB_NAME = "Gobblin MR Compaction";
    private static final long MR_JOB_CHECK_COMPLETE_INTERVAL_MS = 5000;
    protected final Dataset dataset;
    protected final FileSystem fs;
    protected final FsPermission perm;
    protected final boolean shouldDeduplicate;
    protected final boolean outputDeduplicated;
    protected final boolean recompactFromDestPaths;
    protected final boolean recompactAllData;
    protected final boolean renameSourceDir;
    protected final boolean usePrimeReducers;
    protected final EventSubmitter eventSubmitter;
    private final RecordCountProvider inputRecordCountProvider;
    private final RecordCountProvider outputRecordCountProvider;
    private final LateFileRecordCountProvider lateInputRecordCountProvider;
    private final LateFileRecordCountProvider lateOutputRecordCountProvider;
    private final DatasetHelper datasetHelper;
    private final int copyLateDataThreadPoolSize;
    private volatile Policy policy = Policy.DO_NOT_PUBLISH_DATA;
    private volatile Status status = Status.RUNNING;
    private final Cache<Path, List<Path>> applicablePathCache;

    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorJobRunner$Policy.class */
    public enum Policy {
        DO_PUBLISH_DATA,
        DO_NOT_PUBLISH_DATA,
        ABORT_ASAP
    }

    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactorJobRunner$Status.class */
    public enum Status {
        ABORTED,
        COMMITTED,
        RUNNING
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MRCompactorJobRunner(Dataset dataset, FileSystem fileSystem) {
        this.dataset = dataset;
        this.fs = fileSystem;
        this.perm = HadoopUtils.deserializeFsPermission(this.dataset.jobProps(), COMPACTION_JOB_OUTPUT_DIR_PERMISSION, FsPermission.getDefault());
        this.recompactFromDestPaths = this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_RECOMPACT_FROM_DEST_PATHS, false);
        this.recompactAllData = this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_RECOMPACT_ALL_DATA, true);
        this.renameSourceDir = this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, false);
        Preconditions.checkArgument(this.dataset.jobProps().contains(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE), String.format("Missing property %s for dataset %s", MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, this.dataset));
        this.shouldDeduplicate = this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE);
        this.outputDeduplicated = this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_OUTPUT_DEDUPLICATED, true);
        this.usePrimeReducers = this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_USE_PRIME_REDUCERS, true);
        this.eventSubmitter = new EventSubmitter.Builder(GobblinMetrics.get(this.dataset.jobProps().getProp("job.name")).getMetricContext(), MRCompactor.COMPACTION_TRACKING_EVENTS_NAMESPACE).build();
        this.copyLateDataThreadPoolSize = this.dataset.jobProps().getPropAsInt(COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE, DEFAULT_COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE);
        try {
            this.inputRecordCountProvider = (RecordCountProvider) Class.forName(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER, MRCompactor.DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER)).newInstance();
            this.outputRecordCountProvider = (RecordCountProvider) Class.forName(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_OUTPUT_RECORD_COUNT_PROVIDER, MRCompactor.DEFAULT_COMPACTION_OUTPUT_RECORD_COUNT_PROVIDER)).newInstance();
            this.lateInputRecordCountProvider = new LateFileRecordCountProvider(this.inputRecordCountProvider);
            this.lateOutputRecordCountProvider = new LateFileRecordCountProvider(this.outputRecordCountProvider);
            this.applicablePathCache = CacheBuilder.newBuilder().maximumSize(2000L).build();
            this.datasetHelper = new DatasetHelper(this.dataset, this.fs, getApplicableFileExtensions());
        } catch (Exception e) {
            throw new RuntimeException("Failed to instantiate RecordCountProvider", e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Configuration confFromState = HadoopUtils.getConfFromState(this.dataset.jobProps());
        if (confFromState.get("mapreduce.output.fileoutputformat.compress") == null && confFromState.get("mapred.output.compress") == null) {
            confFromState.setBoolean("mapreduce.output.fileoutputformat.compress", true);
        }
        if (confFromState.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
            confFromState.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        }
        try {
            DateTime compactionTimestamp = getCompactionTimestamp();
            LOG.info("MR Compaction Job Timestamp " + compactionTimestamp.getMillis());
            if (this.dataset.jobProps().getPropAsBoolean(MRCompactor.COMPACTION_JOB_LATE_DATA_MOVEMENT_TASK, false)) {
                ArrayList newArrayList = Lists.newArrayList();
                for (String str : this.dataset.jobProps().getPropAsList(MRCompactor.COMPACTION_JOB_LATE_DATA_FILES)) {
                    if (FilenameUtils.isExtension(str, getApplicableFileExtensions())) {
                        newArrayList.add(new Path(str));
                    }
                }
                Path outputLatePath = this.outputDeduplicated ? this.dataset.outputLatePath() : this.dataset.outputPath();
                LOG.info(String.format("Copying %d late data files to %s", Integer.valueOf(newArrayList.size()), outputLatePath));
                if (this.outputDeduplicated && !this.fs.exists(outputLatePath) && !this.fs.mkdirs(outputLatePath)) {
                    throw new RuntimeException(String.format("Failed to create late data output directory: %s.", outputLatePath.toString()));
                }
                copyDataFiles(outputLatePath, newArrayList);
                if (this.outputDeduplicated) {
                    this.dataset.checkIfNeedToRecompact(this.datasetHelper);
                }
                this.status = Status.COMMITTED;
            } else {
                if (this.fs.exists(this.dataset.outputPath()) && !canOverwriteOutputDir()) {
                    LOG.warn(String.format("Output paths %s exists. Will not compact %s.", this.dataset.outputPath(), this.dataset.inputPaths()));
                    this.status = Status.COMMITTED;
                    return;
                }
                addJars(confFromState);
                Job job = Job.getInstance(confFromState);
                configureJob(job);
                submitAndWait(job);
                if (!shouldPublishData(compactionTimestamp)) {
                    LOG.info("Data not published for input folder " + this.dataset.inputPaths() + " due to incompleteness");
                    this.status = Status.ABORTED;
                    return;
                }
                if (this.recompactAllData || !this.recompactFromDestPaths) {
                    moveTmpPathToOutputPath();
                    if (this.recompactFromDestPaths) {
                        deleteFilesByPaths(this.dataset.additionalInputPaths());
                    }
                } else {
                    addFilesInTmpPathToOutputPath();
                    deleteFilesByPaths(this.dataset.inputPaths());
                }
                submitSlaEvent(job);
                LOG.info("Successfully published data for input folder " + this.dataset.inputPaths());
                this.status = Status.COMMITTED;
            }
            if (this.renameSourceDir) {
                MRCompactor.renameSourceDirAsCompactionComplete(this.fs, this.dataset);
            } else {
                markOutputDirAsCompleted(compactionTimestamp);
            }
            submitRecordsCountsEvent();
        } catch (Throwable th) {
            throw Throwables.propagate(th);
        }
    }

    private DateTime getCompactionTimestamp() throws IOException {
        DateTimeZone forID = DateTimeZone.forID(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
        if (!this.recompactFromDestPaths) {
            return new DateTime(forID);
        }
        long j = Long.MIN_VALUE;
        Iterator it = FileListUtils.listFilesRecursively(this.fs, getInputPaths()).iterator();
        while (it.hasNext()) {
            j = Math.max(j, ((FileStatus) it.next()).getModificationTime());
        }
        return j == Long.MIN_VALUE ? new DateTime(forID) : new DateTime(j, forID);
    }

    private void copyDataFiles(final Path path, List<Path> list) throws IOException {
        ScalingThreadPoolExecutor newScalingThreadPool = ScalingThreadPoolExecutor.newScalingThreadPool(0, this.copyLateDataThreadPoolSize, 100L, ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of(this.dataset.getName() + "-copy-data")));
        ArrayList newArrayList = Lists.newArrayList();
        for (final Path path2 : list) {
            try {
                newArrayList.add(newScalingThreadPool.submit(new Callable<Void>() { // from class: gobblin.compaction.mapreduce.MRCompactorJobRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Path constructLateFilePath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(MRCompactorJobRunner.this.outputRecordCountProvider.convertPath(LateFileRecordCountProvider.restoreFilePath(path2), MRCompactorJobRunner.this.inputRecordCountProvider).getName(), MRCompactorJobRunner.this.fs, path);
                        HadoopUtils.copyPath(MRCompactorJobRunner.this.fs, path2, MRCompactorJobRunner.this.fs, constructLateFilePath, true, MRCompactorJobRunner.this.fs.getConf());
                        MRCompactorJobRunner.LOG.debug(String.format("Copied %s to %s.", path2, constructLateFilePath));
                        return null;
                    }
                }));
            } finally {
                ExecutorsUtils.shutdownExecutorService(newScalingThreadPool, Optional.of(LOG));
            }
        }
        try {
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Failed to copy file.", e);
        }
    }

    private boolean canOverwriteOutputDir() {
        return this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_OVERWRITE_OUTPUT_DIR, false) || this.recompactFromDestPaths;
    }

    private void addJars(Configuration configuration) throws IOException {
        if (this.dataset.jobProps().contains(MRCompactor.COMPACTION_JARS)) {
            for (FileStatus fileStatus : this.fs.listStatus(new Path(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_JARS)))) {
                DistributedCache.addFileToClassPath(fileStatus.getPath(), configuration, this.fs);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureJob(Job job) throws IOException {
        job.setJobName(HADOOP_JOB_NAME);
        configureInputAndOutputPaths(job);
        configureMapper(job);
        configureReducer(job);
        if (this.shouldDeduplicate) {
            return;
        }
        job.setNumReduceTasks(0);
    }

    private void configureInputAndOutputPaths(Job job) throws IOException {
        Iterator<Path> it = getInputPaths().iterator();
        while (it.hasNext()) {
            FileInputFormat.addInputPath(job, it.next());
        }
        this.fs.delete(this.dataset.outputTmpPath(), true);
        FileOutputFormat.setOutputPath(job, this.dataset.outputTmpPath());
    }

    private Set<Path> getInputPaths() {
        return ImmutableSet.builder().addAll(this.dataset.inputPaths()).addAll(this.dataset.additionalInputPaths()).build();
    }

    public Dataset getDataset() {
        return this.dataset;
    }

    protected void configureMapper(Job job) {
        setInputFormatClass(job);
        setMapperClass(job);
        setMapOutputKeyClass(job);
        setMapOutputValueClass(job);
    }

    protected void configureReducer(Job job) throws IOException {
        setOutputFormatClass(job);
        setReducerClass(job);
        setOutputKeyClass(job);
        setOutputValueClass(job);
        setNumberOfReducers(job);
    }

    protected abstract void setInputFormatClass(Job job);

    protected abstract void setMapperClass(Job job);

    protected abstract void setMapOutputKeyClass(Job job);

    protected abstract void setMapOutputValueClass(Job job);

    protected abstract void setOutputFormatClass(Job job);

    protected abstract void setReducerClass(Job job);

    protected abstract void setOutputKeyClass(Job job);

    protected abstract void setOutputValueClass(Job job);

    protected abstract Collection<String> getApplicableFileExtensions();

    protected void setNumberOfReducers(Job job) throws IOException {
        int min = Math.min(Ints.checkedCast(getInputSize() / getTargetFileSize()) + 1, getMaxNumReducers());
        if (this.usePrimeReducers && min != 1) {
            min = Primes.nextPrime(min);
        }
        job.setNumReduceTasks(min);
    }

    private long getInputSize() throws IOException {
        long j = 0;
        Iterator<Path> it = getInputPaths().iterator();
        while (it.hasNext()) {
            j += this.fs.getContentSummary(it.next()).getLength();
        }
        return j;
    }

    private long getTargetFileSize() {
        return this.dataset.jobProps().getPropAsLong(COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE, DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE);
    }

    private int getMaxNumReducers() {
        return this.dataset.jobProps().getPropAsInt(COMPACTION_JOB_MAX_NUM_REDUCERS, DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS);
    }

    private void submitAndWait(Job job) throws ClassNotFoundException, IOException, InterruptedException {
        job.submit();
        MRCompactor.addRunningHadoopJob(this.dataset, job);
        LOG.info(String.format("MR job submitted for dataset %s, input %s, url: %s", this.dataset, getInputPaths(), job.getTrackingURL()));
        while (!job.isComplete()) {
            if (this.policy == Policy.ABORT_ASAP) {
                LOG.info(String.format("MR job for dataset %s, input %s killed due to input data incompleteness. Will try again later", this.dataset, getInputPaths()));
                job.killJob();
                return;
            }
            Thread.sleep(MR_JOB_CHECK_COMPLETE_INTERVAL_MS);
        }
        if (!job.isSuccessful()) {
            throw new RuntimeException(String.format("MR job failed for topic %s, input %s, url: %s", this.dataset, getInputPaths(), job.getTrackingURL()));
        }
    }

    private boolean shouldPublishData(DateTime dateTime) throws IOException {
        if (this.policy != Policy.DO_PUBLISH_DATA) {
            return false;
        }
        if (!this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_ABORT_UPON_NEW_DATA, false)) {
            return true;
        }
        Iterator<Path> it = getInputPaths().iterator();
        while (it.hasNext()) {
            if (findNewDataSinceCompactionStarted(it.next(), dateTime)) {
                return false;
            }
        }
        return true;
    }

    private boolean findNewDataSinceCompactionStarted(Path path, DateTime dateTime) throws IOException {
        for (FileStatus fileStatus : FileListUtils.listFilesRecursively(this.fs, path)) {
            if (new DateTime(fileStatus.getModificationTime()).isAfter(dateTime)) {
                LOG.info(String.format("Found new file %s in input folder %s after compaction started. Will abort compaction.", fileStatus.getPath(), path));
                return true;
            }
        }
        return false;
    }

    private void markOutputDirAsCompleted(DateTime dateTime) throws IOException {
        FSDataOutputStream create = this.fs.create(new Path(this.dataset.outputPath(), MRCompactor.COMPACTION_COMPLETE_FILE_NAME));
        Throwable th = null;
        try {
            create.writeLong(dateTime.getMillis());
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private void moveTmpPathToOutputPath() throws IOException {
        LOG.info(String.format("Moving %s to %s", this.dataset.outputTmpPath(), this.dataset.outputPath()));
        this.fs.delete(this.dataset.outputPath(), true);
        WriterUtils.mkdirsWithRecursivePermission(this.fs, this.dataset.outputPath().getParent(), this.perm);
        if (!this.fs.rename(this.dataset.outputTmpPath(), this.dataset.outputPath())) {
            throw new IOException(String.format("Unable to move %s to %s", this.dataset.outputTmpPath(), this.dataset.outputPath()));
        }
    }

    private void addFilesInTmpPathToOutputPath() throws IOException {
        for (Path path : getApplicableFilePaths(this.dataset.outputTmpPath())) {
            String name = path.getName();
            LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
            Path constructLateFilePath = this.lateOutputRecordCountProvider.constructLateFilePath(name, this.fs, this.dataset.outputPath());
            if (!this.fs.rename(path, constructLateFilePath)) {
                throw new IOException(String.format("Unable to move %s to %s", path.toString(), constructLateFilePath.toString()));
            }
        }
    }

    private void deleteFilesByPaths(Set<Path> set) throws IOException {
        Iterator<Path> it = set.iterator();
        while (it.hasNext()) {
            HadoopUtils.deletePathAndEmptyAncestors(this.fs, it.next(), true);
        }
    }

    public void proceed() {
        this.policy = Policy.DO_PUBLISH_DATA;
    }

    public void abort() {
        this.policy = Policy.ABORT_ASAP;
    }

    public Status status() {
        return this.status;
    }

    @Override // java.lang.Comparable
    public int compareTo(MRCompactorJobRunner mRCompactorJobRunner) {
        return Double.compare(mRCompactorJobRunner.dataset.priority(), this.dataset.priority());
    }

    private List<Path> getApplicableFilePaths(final Path path) throws IOException {
        try {
            return (List) this.applicablePathCache.get(path, new Callable<List<Path>>() { // from class: gobblin.compaction.mapreduce.MRCompactorJobRunner.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public List<Path> call() throws Exception {
                    if (!MRCompactorJobRunner.this.fs.exists(path)) {
                        return Lists.newArrayList();
                    }
                    ArrayList newArrayList = Lists.newArrayList();
                    Iterator it = FileListUtils.listFilesRecursively(MRCompactorJobRunner.this.fs, path, new PathFilter() { // from class: gobblin.compaction.mapreduce.MRCompactorJobRunner.2.1
                        public boolean accept(Path path2) {
                            Iterator<String> it2 = MRCompactorJobRunner.this.getApplicableFileExtensions().iterator();
                            while (it2.hasNext()) {
                                if (path2.getName().endsWith(it2.next())) {
                                    return true;
                                }
                            }
                            return false;
                        }
                    }).iterator();
                    while (it.hasNext()) {
                        newArrayList.add(((FileStatus) it.next()).getPath());
                    }
                    return newArrayList;
                }
            });
        } catch (ExecutionException e) {
            throw new IOException(e);
        }
    }

    private void submitSlaEvent(Job job) {
        try {
            CompactionSlaEventHelper.getEventSubmitterBuilder(this.dataset, Optional.of(job), this.fs).eventSubmitter(this.eventSubmitter).eventName(CompactionSlaEventHelper.COMPACTION_COMPLETED_EVENT_NAME).additionalMetadata(CompactionSlaEventHelper.LATE_RECORD_COUNT, Long.toString(this.lateOutputRecordCountProvider.getRecordCount(getApplicableFilePaths(this.dataset.outputLatePath())))).additionalMetadata(CompactionSlaEventHelper.REGULAR_RECORD_COUNT, Long.toString(this.outputRecordCountProvider.getRecordCount(getApplicableFilePaths(this.dataset.outputPath())))).additionalMetadata(CompactionSlaEventHelper.RECOMPATED_METADATA_NAME, Boolean.toString(this.dataset.needToRecompact())).build().submit();
        } catch (Throwable th) {
            LOG.warn("Failed to submit compcation completed event:" + th, th);
        }
    }

    private void submitRecordsCountsEvent() {
        try {
            CompactionSlaEventHelper.getEventSubmitterBuilder(this.dataset, Optional.absent(), this.fs).eventSubmitter(this.eventSubmitter).eventName(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT).additionalMetadata(CompactionSlaEventHelper.DATASET_OUTPUT_PATH, this.dataset.outputPath().toString()).additionalMetadata(CompactionSlaEventHelper.LATE_RECORD_COUNT, Long.toString(this.datasetHelper.getLateOutputRecordCount())).additionalMetadata(CompactionSlaEventHelper.REGULAR_RECORD_COUNT, Long.toString(this.datasetHelper.getOutputRecordCount())).additionalMetadata(CompactionSlaEventHelper.NEED_RECOMPACT, Boolean.toString(this.dataset.needToRecompact())).build().submit();
        } catch (Throwable th) {
            LOG.warn("Failed to submit late event count:" + th, th);
        }
    }
}
