package com.marklogic.contentpump;

import com.marklogic.contentpump.utilities.ReflectionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:com/marklogic/contentpump/LocalJobRunner.class */
public class LocalJobRunner implements ConfigConstants {
    public static final Log LOG = LogFactory.getLog(LocalJobRunner.class);
    public static final int DEFAULT_THREAD_COUNT = 4;
    private LocalJob job;
    private ExecutorService pool;
    private AtomicInteger[] progress;
    private long startTime;
    private int threadsPerSplit;
    private int threadCount;
    private int availableThreads = 1;
    private int minThreads;
    private Command cmd;
    private ContentPumpReporter reporter;

    /* loaded from: input_file:com/marklogic/contentpump/LocalJobRunner$LocalMapTask.class */
    public class LocalMapTask<INKEY, INVALUE, OUTKEY, OUTVALUE> implements Callable<Object> {
        private InputFormat<INKEY, INVALUE> inputFormat;
        private OutputFormat<OUTKEY, OUTVALUE> outputFormat;
        private Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper;
        private Configuration conf;
        private int id;
        private InputSplit split;
        private AtomicInteger pctProgress;
        private ContentPumpReporter reporter;
        private Class<? extends Mapper<?, ?, ?, ?>> mapperClass;
        private int threadCount = 0;

        public LocalMapTask(InputFormat<INKEY, INVALUE> inputFormat, OutputFormat<OUTKEY, OUTVALUE> outputFormat, Configuration configuration, int i, InputSplit inputSplit, ContentPumpReporter contentPumpReporter, AtomicInteger atomicInteger) {
            this.inputFormat = inputFormat;
            this.outputFormat = outputFormat;
            this.conf = configuration;
            this.id = i;
            this.split = inputSplit;
            this.pctProgress = atomicInteger;
            this.reporter = contentPumpReporter;
            try {
                this.mapperClass = LocalJobRunner.this.job.getMapperClass();
            } catch (ClassNotFoundException e) {
                LocalJobRunner.LOG.error("Mapper class not found", e);
            }
        }

        public int getThreadCount() {
            return this.threadCount;
        }

        public void setThreadCount(int i) {
            this.threadCount = i;
        }

        public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() {
            return this.mapperClass;
        }

        public void setMapperClass(Class<? extends Mapper<?, ?, ?, ?>> cls) {
            this.mapperClass = cls;
        }

        @Override // java.util.concurrent.Callable
        public Object call() {
            TaskAttemptContext taskAttemptContext = null;
            TaskAttemptContext taskAttemptContext2 = null;
            TrackingRecordReader trackingRecordReader = null;
            RecordWriter recordWriter = null;
            OutputCommitter outputCommitter = null;
            JobID jobID = new JobID();
            TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(jobID.getJtIdentifier(), jobID.getId(), TaskType.MAP, this.id), 0);
            try {
                taskAttemptContext = ReflectionUtil.createTaskAttemptContext(this.conf, taskAttemptID);
                RecordReader createRecordReader = this.inputFormat.createRecordReader(this.split, taskAttemptContext);
                recordWriter = this.outputFormat.getRecordWriter(taskAttemptContext);
                outputCommitter = this.outputFormat.getOutputCommitter(taskAttemptContext);
                trackingRecordReader = new TrackingRecordReader(createRecordReader, this.pctProgress);
                this.mapper = (Mapper) ReflectionUtils.newInstance(this.mapperClass, this.conf);
                taskAttemptContext2 = ReflectionUtil.createMapperContext(this.mapper, this.conf, taskAttemptID, trackingRecordReader, recordWriter, outputCommitter, this.reporter, this.split);
                trackingRecordReader.initialize(this.split, taskAttemptContext2);
                if (this.mapperClass == MultithreadedMapper.class) {
                    ((MultithreadedMapper) this.mapper).setThreadCount(this.threadCount);
                    ((MultithreadedMapper) this.mapper).setThreadPool(LocalJobRunner.this.pool);
                }
                this.mapper.run(taskAttemptContext2);
            } catch (Throwable th) {
                if (LocalJobRunner.LOG.isDebugEnabled()) {
                    LocalJobRunner.LOG.debug("Error running task: ", th);
                } else {
                    LocalJobRunner.LOG.error("Error running task: ");
                    LocalJobRunner.LOG.error(th.getMessage());
                }
                try {
                    synchronized (LocalJobRunner.this.pool) {
                        LocalJobRunner.this.pool.notify();
                    }
                } catch (Throwable th2) {
                    LocalJobRunner.LOG.error(th2);
                }
            }
            if (trackingRecordReader != null) {
                try {
                    trackingRecordReader.close();
                } catch (Throwable th3) {
                    LocalJobRunner.LOG.error("Error closing reader: " + th3.getMessage());
                    if (LocalJobRunner.LOG.isDebugEnabled()) {
                        LocalJobRunner.LOG.debug(th3);
                    }
                }
            }
            if (recordWriter != null) {
                try {
                    recordWriter.close(taskAttemptContext2);
                } catch (Throwable th4) {
                    LocalJobRunner.LOG.error("Error closing writer: " + th4.getMessage());
                    if (LocalJobRunner.LOG.isDebugEnabled()) {
                        LocalJobRunner.LOG.debug(th4);
                    }
                }
            }
            try {
                outputCommitter.commitTask(taskAttemptContext);
                return null;
            } catch (Throwable th5) {
                LocalJobRunner.LOG.error("Error committing task: " + th5.getMessage());
                if (!LocalJobRunner.LOG.isDebugEnabled()) {
                    return null;
                }
                LocalJobRunner.LOG.debug(th5);
                return null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/marklogic/contentpump/LocalJobRunner$Monitor.class */
    public class Monitor extends Thread {
        private String lastReport;

        Monitor() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!ContentPump.shutdown && !interrupted() && !LocalJobRunner.this.job.done()) {
                try {
                    Thread.sleep(1000L);
                    if (ContentPump.shutdown) {
                        break;
                    }
                    String str = " completed " + StringUtils.formatPercent(LocalJobRunner.this.computeProgress(), 0);
                    if (!str.equals(this.lastReport)) {
                        LocalJobRunner.LOG.info(str);
                        this.lastReport = str;
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    LocalJobRunner.LOG.error("Error in monitor thread", th);
                }
            }
            String str2 = " completed " + StringUtils.formatPercent(LocalJobRunner.this.computeProgress(), 0);
            if (str2.equals(this.lastReport)) {
                return;
            }
            LocalJobRunner.LOG.info(str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/contentpump/LocalJobRunner$SplitLengthComparator.class */
    public static class SplitLengthComparator implements Comparator<InputSplit> {
        private SplitLengthComparator() {
        }

        @Override // java.util.Comparator
        public int compare(InputSplit inputSplit, InputSplit inputSplit2) {
            try {
                long length = inputSplit.getLength();
                long length2 = inputSplit2.getLength();
                if (length < length2) {
                    return 1;
                }
                return length == length2 ? 0 : -1;
            } catch (IOException e) {
                throw new RuntimeException("exception in compare", e);
            } catch (InterruptedException e2) {
                throw new RuntimeException("exception in compare", e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/marklogic/contentpump/LocalJobRunner$TrackingRecordReader.class */
    public class TrackingRecordReader<K, V> extends RecordReader<K, V> {
        private final RecordReader<K, V> real;
        private AtomicInteger pctProgress;

        TrackingRecordReader(RecordReader<K, V> recordReader, AtomicInteger atomicInteger) {
            this.real = recordReader;
            this.pctProgress = atomicInteger;
        }

        public void close() throws IOException {
            this.real.close();
        }

        public K getCurrentKey() throws IOException, InterruptedException {
            return (K) this.real.getCurrentKey();
        }

        public V getCurrentValue() throws IOException, InterruptedException {
            return (V) this.real.getCurrentValue();
        }

        public float getProgress() throws IOException, InterruptedException {
            return this.real.getProgress();
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.real.initialize(inputSplit, taskAttemptContext);
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            boolean nextKeyValue = this.real.nextKeyValue();
            this.pctProgress.set((int) (getProgress() * 100.0f));
            return nextKeyValue;
        }
    }

    public LocalJobRunner(LocalJob localJob, CommandLine commandLine, Command command) {
        this.threadsPerSplit = 0;
        this.minThreads = 1;
        this.job = localJob;
        this.cmd = command;
        this.threadCount = 4;
        if (commandLine.hasOption(ConfigConstants.THREAD_COUNT)) {
            this.threadCount = Integer.parseInt(commandLine.getOptionValue(ConfigConstants.THREAD_COUNT));
        }
        if (this.threadCount > 1) {
            this.pool = Executors.newFixedThreadPool(this.threadCount);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Thread pool size: " + this.threadCount);
            }
        }
        if (commandLine.hasOption(ConfigConstants.THREADS_PER_SPLIT)) {
            this.threadsPerSplit = Integer.parseInt(commandLine.getOptionValue(ConfigConstants.THREADS_PER_SPLIT));
        }
        this.minThreads = localJob.getConfiguration().getInt(ConfigConstants.CONF_MIN_THREADS, this.minThreads);
        this.startTime = System.currentTimeMillis();
    }

    public <INKEY, INVALUE, OUTKEY, OUTVALUE, T extends InputSplit> void run() throws Exception {
        Configuration configuration = this.job.getConfiguration();
        this.reporter = new ContentPumpReporter();
        InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(this.job.getInputFormatClass(), configuration);
        try {
            List splits = inputFormat.getSplits(this.job);
            InputSplit[] inputSplitArr = (InputSplit[]) splits.toArray(new InputSplit[splits.size()]);
            Arrays.sort(inputSplitArr, new SplitLengthComparator());
            OutputFormat outputFormat = (OutputFormat) ReflectionUtils.newInstance(this.job.getOutputFormatClass(), configuration);
            Class mapperClass = this.job.getMapperClass();
            Mapper mapper = (Mapper) ReflectionUtils.newInstance(mapperClass, configuration);
            try {
                outputFormat.checkOutputSpecs(this.job);
                this.progress = new AtomicInteger[splits.size()];
                for (int i = 0; i < splits.size(); i++) {
                    this.progress[i] = new AtomicInteger();
                }
                this.job.setJobState(JobStatus.State.RUNNING);
                Monitor monitor = new Monitor();
                monitor.start();
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < inputSplitArr.length && !ContentPump.shutdown; i2++) {
                    InputSplit inputSplit = inputSplitArr[i2];
                    if (this.pool != null) {
                        LocalMapTask localMapTask = new LocalMapTask(inputFormat, outputFormat, configuration, i2, inputSplit, this.reporter, this.progress[i2]);
                        this.availableThreads = assignThreads(i2, inputSplitArr.length);
                        Class mapperClass2 = this.job.getMapperClass();
                        if (this.availableThreads > 1 && this.availableThreads != this.threadsPerSplit) {
                            if (mapperClass2 != MultithreadedMapper.class) {
                                mapperClass2 = this.cmd.getRuntimeMapperClass(this.job, mapperClass, this.threadsPerSplit, this.availableThreads);
                            }
                            if (mapperClass2 != mapperClass) {
                                localMapTask.setMapperClass(mapperClass2);
                            }
                            if (mapperClass2 == MultithreadedMapper.class) {
                                localMapTask.setThreadCount(this.availableThreads);
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Thread Count for Split#" + i2 + " : " + this.availableThreads);
                                }
                            }
                        }
                        if (mapperClass2 == MultithreadedMapper.class) {
                            synchronized (this.pool) {
                                arrayList.add(this.pool.submit(localMapTask));
                                this.pool.wait();
                            }
                        } else {
                            this.pool.submit(localMapTask);
                        }
                    } else {
                        JobID jobID = new JobID();
                        TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(jobID.getJtIdentifier(), jobID.getId(), TaskType.MAP, i2), 0);
                        TaskAttemptContext createTaskAttemptContext = ReflectionUtil.createTaskAttemptContext(configuration, taskAttemptID);
                        RecordReader createRecordReader = inputFormat.createRecordReader(inputSplit, createTaskAttemptContext);
                        RecordWriter recordWriter = outputFormat.getRecordWriter(createTaskAttemptContext);
                        OutputCommitter outputCommitter = outputFormat.getOutputCommitter(createTaskAttemptContext);
                        TrackingRecordReader trackingRecordReader = new TrackingRecordReader(createRecordReader, this.progress[i2]);
                        Mapper.Context createMapperContext = ReflectionUtil.createMapperContext(mapper, configuration, taskAttemptID, trackingRecordReader, recordWriter, outputCommitter, this.reporter, inputSplit);
                        trackingRecordReader.initialize(inputSplit, createMapperContext);
                        Class mapperClass3 = this.job.getMapperClass();
                        createMapperContext.getConfiguration().setClass(ConfigConstants.CONF_MAPREDUCE_JOB_MAP_CLASS, mapperClass3, Mapper.class);
                        mapper = (Mapper) ReflectionUtils.newInstance(mapperClass3, createMapperContext.getConfiguration());
                        try {
                            mapper.run(createMapperContext);
                            try {
                                trackingRecordReader.close();
                            } catch (Throwable th) {
                                LOG.error("Error closing reader: " + th.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th);
                                }
                            }
                            try {
                                recordWriter.close(createMapperContext);
                            } catch (Throwable th2) {
                                LOG.error("Error closing writer: " + th2.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th2);
                                }
                            }
                            try {
                                outputCommitter.commitTask(createTaskAttemptContext);
                            } catch (Throwable th3) {
                                LOG.error("Error committing task: " + th3.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th3);
                                }
                            }
                        } catch (Throwable th4) {
                            try {
                                trackingRecordReader.close();
                            } catch (Throwable th5) {
                                LOG.error("Error closing reader: " + th5.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th5);
                                }
                            }
                            try {
                                recordWriter.close(createMapperContext);
                            } catch (Throwable th6) {
                                LOG.error("Error closing writer: " + th6.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th6);
                                }
                            }
                            try {
                                outputCommitter.commitTask(createTaskAttemptContext);
                            } catch (Throwable th7) {
                                LOG.error("Error committing task: " + th7.getMessage());
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug(th7);
                                }
                            }
                            throw th4;
                        }
                    }
                }
                if (this.pool != null) {
                    this.pool.shutdown();
                    do {
                    } while (!this.pool.awaitTermination(1L, TimeUnit.HOURS));
                }
                this.job.setJobState(JobStatus.State.SUCCEEDED);
                monitor.interrupt();
                monitor.join(1000L);
                Iterator it = this.reporter.counters.iterator();
                while (it.hasNext()) {
                    CounterGroup<Counter> counterGroup = (CounterGroup) it.next();
                    LOG.info(counterGroup.getDisplayName() + ": ");
                    for (Counter counter : counterGroup) {
                        LOG.info(counter.getDisplayName() + ": " + counter.getValue());
                    }
                }
                LOG.info("Total execution time: " + ((System.currentTimeMillis() - this.startTime) / 1000) + " sec");
            } catch (Exception e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Error checking output specification: ", e);
                } else {
                    LOG.error("Error checking output specification: ");
                    LOG.error(e.getMessage());
                }
                this.job.setJobState(JobStatus.State.FAILED);
            }
        } catch (Exception e2) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error getting input splits: ", e2);
            } else {
                LOG.error("Error getting input splits: ");
                LOG.error(e2.getMessage());
            }
            this.job.setJobState(JobStatus.State.FAILED);
        }
    }

    private int assignThreads(int i, int i2) {
        return this.threadsPerSplit > 0 ? this.threadsPerSplit : i2 == 1 ? this.threadCount : i2 * this.minThreads > this.threadCount ? this.minThreads : i % this.threadCount < this.threadCount % i2 ? (this.threadCount / i2) + 1 : this.threadCount / i2;
    }

    public double computeProgress() {
        if (this.progress.length == 0) {
            return 1.0d;
        }
        long j = 0;
        for (AtomicInteger atomicInteger : this.progress) {
            j += atomicInteger.longValue();
        }
        return (j / this.progress.length) / 100.0d;
    }

    public ContentPumpReporter getReporter() {
        return this.reporter;
    }
}
