package uk.gov.gchq.gaffer.accumulostore.operation.hdfs.handler.job.factory;

import java.io.IOException;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.accumulostore.AccumuloStore;
import uk.gov.gchq.gaffer.accumulostore.operation.hdfs.handler.job.partitioner.GafferKeyRangePartitioner;
import uk.gov.gchq.gaffer.accumulostore.operation.hdfs.mapper.AddElementsFromHdfsMapper;
import uk.gov.gchq.gaffer.accumulostore.operation.hdfs.reducer.AccumuloKeyValueReducer;
import uk.gov.gchq.gaffer.accumulostore.utils.AccumuloStoreConstants;
import uk.gov.gchq.gaffer.accumulostore.utils.IngestUtils;
import uk.gov.gchq.gaffer.accumulostore.utils.TableUtils;
import uk.gov.gchq.gaffer.hdfs.operation.AddElementsFromHdfs;
import uk.gov.gchq.gaffer.hdfs.operation.handler.job.factory.AddElementsFromHdfsJobFactory;
import uk.gov.gchq.gaffer.hdfs.operation.handler.job.factory.JobFactory;
import uk.gov.gchq.gaffer.hdfs.operation.partitioner.NoPartitioner;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreException;

/* loaded from: input_file:uk/gov/gchq/gaffer/accumulostore/operation/hdfs/handler/job/factory/AccumuloAddElementsFromHdfsJobFactory.class */
public class AccumuloAddElementsFromHdfsJobFactory implements AddElementsFromHdfsJobFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AccumuloAddElementsFromHdfsJobFactory.class);

    @Override // uk.gov.gchq.gaffer.hdfs.operation.handler.job.factory.AddElementsFromHdfsJobFactory
    public void prepareStore(Store store) throws StoreException {
        TableUtils.ensureTableExists((AccumuloStore) store);
    }

    @Override // uk.gov.gchq.gaffer.hdfs.operation.handler.job.factory.JobFactory
    public JobConf createJobConf(AddElementsFromHdfs addElementsFromHdfs, String str, Store store) throws IOException {
        JobConf jobConf = new JobConf(new Configuration());
        LOGGER.info("Setting up job conf");
        jobConf.set(JobFactory.SCHEMA, new String(store.getSchema().toCompactJson(), "UTF-8"));
        LOGGER.debug("Added {} {} to job conf", JobFactory.SCHEMA, new String(store.getSchema().toCompactJson(), "UTF-8"));
        jobConf.set(JobFactory.MAPPER_GENERATOR, str);
        LOGGER.info("Added {} of {} to job conf", JobFactory.MAPPER_GENERATOR, str);
        jobConf.set(JobFactory.VALIDATE, String.valueOf(addElementsFromHdfs.isValidate()));
        LOGGER.info("Added {} option of {} to job conf", JobFactory.VALIDATE, Boolean.valueOf(addElementsFromHdfs.isValidate()));
        Integer numMapTasks = addElementsFromHdfs.getNumMapTasks();
        if (null != numMapTasks) {
            jobConf.setNumMapTasks(numMapTasks.intValue());
            LOGGER.info("Set number of map tasks to {} on job conf", numMapTasks);
        }
        Integer numReduceTasks = addElementsFromHdfs.getNumReduceTasks();
        if (null != numReduceTasks) {
            jobConf.setNumReduceTasks(numReduceTasks.intValue());
            LOGGER.info("Set number of reduce tasks to {} on job conf", numReduceTasks);
        }
        jobConf.set(AccumuloStoreConstants.ACCUMULO_ELEMENT_CONVERTER_CLASS, ((AccumuloStore) store).getKeyPackage().getKeyConverter().getClass().getName());
        return jobConf;
    }

    protected String getJobName(String str, String str2) {
        return "Ingest HDFS data: Generator=" + str + ", output=" + str2;
    }

    @Override // uk.gov.gchq.gaffer.hdfs.operation.handler.job.factory.JobFactory
    public void setupJob(Job job, AddElementsFromHdfs addElementsFromHdfs, String str, Store store) throws IOException {
        job.setJarByClass(getClass());
        job.setJobName(getJobName(str, addElementsFromHdfs.getOutputPath()));
        setupMapper(job);
        setupCombiner(job);
        setupReducer(job);
        setupOutput(job, addElementsFromHdfs);
        if (NoPartitioner.class.equals(addElementsFromHdfs.getPartitioner())) {
            return;
        }
        if (null != addElementsFromHdfs.getPartitioner()) {
            addElementsFromHdfs.setPartitioner(GafferKeyRangePartitioner.class);
            LOGGER.warn("Partitioner class " + addElementsFromHdfs.getPartitioner().getName() + " will be replaced with " + GafferKeyRangePartitioner.class.getName());
        }
        setupPartitioner(job, addElementsFromHdfs, (AccumuloStore) store);
    }

    protected void setupMapper(Job job) {
        job.setMapperClass(AddElementsFromHdfsMapper.class);
        job.setMapOutputKeyClass(Key.class);
        job.setMapOutputValueClass(Value.class);
    }

    protected void setupCombiner(Job job) {
        job.setCombinerClass(AccumuloKeyValueReducer.class);
    }

    protected void setupReducer(Job job) {
        job.setReducerClass(AccumuloKeyValueReducer.class);
        job.setOutputKeyClass(Key.class);
        job.setOutputValueClass(Value.class);
    }

    protected void setupOutput(Job job, AddElementsFromHdfs addElementsFromHdfs) {
        job.setOutputFormatClass(AccumuloFileOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(addElementsFromHdfs.getOutputPath()));
    }

    protected void setupPartitioner(Job job, AddElementsFromHdfs addElementsFromHdfs, AccumuloStore accumuloStore) throws IOException {
        if (null == addElementsFromHdfs.getSplitsFilePath()) {
            addElementsFromHdfs.setSplitsFilePath("");
            LOGGER.warn("HDFS splits file path not set - using the current directory as the default path.");
        }
        if (addElementsFromHdfs.isUseProvidedSplits()) {
            setUpPartitionerFromUserProvidedSplitsFile(job, addElementsFromHdfs);
        } else {
            setUpPartitionerGenerateSplitsFile(job, addElementsFromHdfs, accumuloStore);
        }
    }

    protected void setUpPartitionerGenerateSplitsFile(Job job, AddElementsFromHdfs addElementsFromHdfs, AccumuloStore accumuloStore) throws IOException {
        String splitsFilePath = addElementsFromHdfs.getSplitsFilePath();
        LOGGER.info("Creating splits file in location {} from table {}", splitsFilePath, accumuloStore.getTableName());
        int validateValue = validateValue(addElementsFromHdfs.getMaxReduceTasks());
        int validateValue2 = validateValue(addElementsFromHdfs.getMinReduceTasks());
        if (validateValue != -1 && validateValue2 != -1 && validateValue2 > validateValue) {
            LOGGER.error("Minimum number of reducers must be less than the maximum number of reducers: minimum was {} maximum was {}", Integer.valueOf(validateValue2), Integer.valueOf(validateValue));
            throw new IOException("Minimum number of reducers must be less than the maximum number of reducers");
        }
        try {
            int createSplitsFile = validateValue == -1 ? IngestUtils.createSplitsFile(accumuloStore.getConnection(), accumuloStore.getTableName(), FileSystem.get(job.getConfiguration()), new Path(splitsFilePath)) : IngestUtils.createSplitsFile(accumuloStore.getConnection(), accumuloStore.getTableName(), FileSystem.get(job.getConfiguration()), new Path(splitsFilePath), validateValue - 1);
            int i = createSplitsFile + 1;
            LOGGER.info("Number of splits is {}; number of reducers is {}", Integer.valueOf(createSplitsFile), Integer.valueOf(i));
            if (validateValue2 != -1 && i < validateValue2) {
                LOGGER.info("Number of reducers is {} which is less than the specified minimum number of {}", Integer.valueOf(i), Integer.valueOf(validateValue2));
                int i2 = (validateValue2 / i) + 1;
                LOGGER.info("Setting number of subbins on GafferKeyRangePartitioner to {}", Integer.valueOf(i2));
                GafferKeyRangePartitioner.setNumSubBins(job, i2);
                i *= i2;
                LOGGER.info("Number of reducers is {}", Integer.valueOf(i));
            }
            job.setNumReduceTasks(i);
            job.setPartitionerClass(GafferKeyRangePartitioner.class);
            GafferKeyRangePartitioner.setSplitFile(job, splitsFilePath);
        } catch (StoreException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    protected void setUpPartitionerFromUserProvidedSplitsFile(Job job, AddElementsFromHdfs addElementsFromHdfs) throws IOException {
        String splitsFilePath = addElementsFromHdfs.getSplitsFilePath();
        if (validateValue(addElementsFromHdfs.getMaxReduceTasks()) == -1 && validateValue(addElementsFromHdfs.getMinReduceTasks()) == -1) {
            LOGGER.info("Using splits file provided by user {}", splitsFilePath);
        } else {
            LOGGER.info("Using splits file provided by user {}, ignoring minReduceTasks and maxReduceTasks", splitsFilePath);
        }
        job.setNumReduceTasks(IngestUtils.getNumSplits(FileSystem.get(job.getConfiguration()), new Path(splitsFilePath)) + 1);
        job.setPartitionerClass(GafferKeyRangePartitioner.class);
        GafferKeyRangePartitioner.setSplitFile(job, splitsFilePath);
    }

    protected static int validateValue(Integer num) throws IOException {
        int i = -1;
        if (null != num) {
            i = num.intValue();
            if (i < 1) {
                LOGGER.error("Invalid field - must be >=1, got {}", Integer.valueOf(i));
                throw new IOException("Invalid field - must be >=1, got " + i);
            }
        }
        return i;
    }
}
