package datafu.hourglass.jobs;

import datafu.hourglass.avro.CombinedAvroKeyInputFormat;
import datafu.hourglass.fs.DatePath;
import datafu.hourglass.fs.DateRange;
import datafu.hourglass.fs.PathUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;

/* loaded from: input_file:datafu/hourglass/jobs/AbstractNonIncrementalJob.class */
public abstract class AbstractNonIncrementalJob extends TimeBasedJob {
    private final Logger _log;
    private boolean _combineInputs;
    private Report _report;

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractNonIncrementalJob$BaseCombiner.class */
    public static abstract class BaseCombiner extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
    }

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractNonIncrementalJob$BaseMapper.class */
    public static abstract class BaseMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
    }

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractNonIncrementalJob$BaseReducer.class */
    public static abstract class BaseReducer extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
    }

    /* loaded from: input_file:datafu/hourglass/jobs/AbstractNonIncrementalJob$Report.class */
    public static class Report {
        private String jobName;
        private String jobId;
        private Path countersPath;
        private List<DatePath> inputFiles = new ArrayList();
        private DatePath outputFile;

        public String getJobName() {
            return this.jobName;
        }

        public String getJobId() {
            return this.jobId;
        }

        public Path getCountersPath() {
            return this.countersPath;
        }

        public List<DatePath> getInputFiles() {
            return Collections.unmodifiableList(this.inputFiles);
        }

        public DatePath getOutputFile() {
            return this.outputFile;
        }
    }

    public AbstractNonIncrementalJob(String str, Properties properties) throws IOException {
        super(str, properties);
        this._log = Logger.getLogger(AbstractNonIncrementalJob.class);
        if (properties.containsKey("combine.inputs")) {
            setCombineInputs(Boolean.parseBoolean(properties.getProperty("combine.inputs")));
        }
    }

    public boolean getCombineInputs() {
        return this._combineInputs;
    }

    public void setCombineInputs(boolean z) {
        this._combineInputs = z;
    }

    public Report getReport() {
        return this._report;
    }

    @Override // datafu.hourglass.jobs.AbstractJob
    public void run() throws IOException, InterruptedException, ClassNotFoundException {
        int numReducers;
        this._report = new Report();
        Calendar calendar = Calendar.getInstance(PathUtils.timeZone);
        if (!getFileSystem().exists(getOutputPath())) {
            getFileSystem().mkdirs(getOutputPath());
        }
        if (getInputPaths().size() > 1) {
            throw new RuntimeException("Only a single input is supported");
        }
        List<DatePath> findNestedDatedPaths = PathUtils.findNestedDatedPaths(getFileSystem(), getInputPaths().get(0));
        DatePath datePath = findNestedDatedPaths.size() > 0 ? findNestedDatedPaths.get(findNestedDatedPaths.size() - 1) : null;
        if (findNestedDatedPaths.size() == 0) {
            throw new RuntimeException("no input data available");
        }
        ArrayList arrayList = new ArrayList();
        Iterator<DatePath> it = findNestedDatedPaths.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getDate());
        }
        DateRange dateRange = DateRangePlanner.getDateRange(getStartDate(), getEndDate(), arrayList, getDaysAgo(), getNumDays());
        HashMap hashMap = new HashMap();
        for (DatePath datePath2 : findNestedDatedPaths) {
            hashMap.put(datePath2.getDate(), datePath2);
        }
        this._log.info("Getting schema for input " + datePath.getPath());
        Schema schemaFromPath = PathUtils.getSchemaFromPath(getFileSystem(), datePath.getPath());
        ReduceEstimator reduceEstimator = new ReduceEstimator(getFileSystem(), getProperties());
        ArrayList arrayList2 = new ArrayList();
        Date beginDate = dateRange.getBeginDate();
        while (true) {
            Date date = beginDate;
            if (date.compareTo(dateRange.getEndDate()) > 0) {
                Path path = new Path(getOutputPath(), PathUtils.datedPathFormat.format(datePath.getDate()));
                StagedOutputJob createStagedJob = StagedOutputJob.createStagedJob(getConf(), getName() + "-" + PathUtils.datedPathFormat.format(datePath.getDate()), arrayList2, "/tmp" + path.toString(), path.toString(), this._log);
                createStagedJob.setCountersParentPath(getCountersParentPath());
                if (this._combineInputs) {
                    createStagedJob.setInputFormatClass(CombinedAvroKeyInputFormat.class);
                } else {
                    createStagedJob.setInputFormatClass(AvroKeyInputFormat.class);
                }
                createStagedJob.setOutputFormatClass(AvroKeyOutputFormat.class);
                AvroJob.setInputKeySchema(createStagedJob, schemaFromPath);
                AvroJob.setMapOutputKeySchema(createStagedJob, getMapOutputKeySchema());
                AvroJob.setMapOutputValueSchema(createStagedJob, getMapOutputValueSchema());
                AvroJob.setOutputKeySchema(createStagedJob, getReduceOutputSchema());
                if (getNumReducers() != null) {
                    numReducers = getNumReducers().intValue();
                    this._log.info(String.format("Using %d reducers (fixed)", Integer.valueOf(numReducers)));
                } else {
                    numReducers = reduceEstimator.getNumReducers();
                    this._log.info(String.format("Using %d reducers (computed)", Integer.valueOf(numReducers)));
                }
                createStagedJob.setNumReduceTasks(numReducers);
                createStagedJob.setMapperClass(getMapperClass());
                createStagedJob.setReducerClass(getReducerClass());
                if (isUseCombiner() && getCombinerClass() != null) {
                    createStagedJob.setCombinerClass(getCombinerClass());
                }
                config(createStagedJob.getConfiguration());
                if (!createStagedJob.waitForCompletion(true)) {
                    this._log.error("Job failed! Quitting...");
                    throw new RuntimeException("Job failed");
                }
                this._report.jobId = createStagedJob.getJobID().toString();
                this._report.jobName = createStagedJob.getJobName();
                this._report.countersPath = createStagedJob.getCountersPath();
                this._report.outputFile = new DatePath(datePath.getDate(), path);
                if (getRetentionCount() != null) {
                    PathUtils.keepLatestDatedPaths(getFileSystem(), getOutputPath(), getRetentionCount().intValue());
                    return;
                }
                return;
            }
            DatePath datePath3 = (DatePath) hashMap.get(date);
            if (datePath3 == null) {
                throw new RuntimeException(String.format("Missing input for %s", PathUtils.datedPathFormat.format(date)));
            }
            this._log.info(String.format("Processing %s", datePath3.getPath()));
            arrayList2.add(datePath3.getPath().toString());
            reduceEstimator.addInputPath(datePath3.getPath());
            this._report.inputFiles.add(datePath3);
            datePath = datePath3;
            calendar.setTime(date);
            calendar.add(5, 1);
            beginDate = calendar.getTime();
        }
    }

    protected abstract Schema getMapOutputKeySchema();

    protected abstract Schema getMapOutputValueSchema();

    protected abstract Schema getReduceOutputSchema();

    public abstract Class<? extends BaseMapper> getMapperClass();

    public abstract Class<? extends BaseReducer> getReducerClass();

    public Class<? extends BaseCombiner> getCombinerClass() {
        return null;
    }
}
