package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.ProgramLifecycle;
import co.cask.cdap.api.ProgramState;
import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.Resources;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.DatasetOutputCommitter;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.CConfigurationUtil;
import co.cask.cdap.common.conf.ConfigurationUtil;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.cdap.common.lang.WeakReferenceDelegatorClassLoader;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.twill.HadoopClassExcluder;
import co.cask.cdap.common.utils.DirUtils;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.transaction.RetryingLongTransactionSystemClient;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.util.hbase.HBaseDDLExecutorFactory;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.internal.app.runtime.LocalizationUtils;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.batch.RawComparatorWrapper;
import co.cask.cdap.internal.app.runtime.batch.dataset.UnsupportedOutputFormat;
import co.cask.cdap.internal.app.runtime.batch.dataset.input.MapperInput;
import co.cask.cdap.internal.app.runtime.batch.dataset.input.MultipleInputs;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputs;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsMainOutputWrapper;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.ProvidedOutput;
import co.cask.cdap.internal.app.runtime.batch.distributed.ContainerLauncherGenerator;
import co.cask.cdap.internal.app.runtime.batch.distributed.MapReduceContainerHelper;
import co.cask.cdap.internal.app.runtime.batch.distributed.MapReduceContainerLauncher;
import co.cask.cdap.internal.app.runtime.batch.stream.MapReduceStreamInputFormat;
import co.cask.cdap.internal.app.runtime.batch.stream.StreamInputFormatProvider;
import co.cask.cdap.internal.app.runtime.distributed.LocalizeResource;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.security.Action;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.cdap.security.spi.authorization.AuthorizationEnforcer;
import co.cask.cdap.security.store.SecureStoreUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.Injector;
import com.google.inject.ProvisionException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ApplicationBundler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceRuntimeService.class */
final class MapReduceRuntimeService extends AbstractExecutionThreadService {
    private static final String HADOOP_UMASK_PROPERTY = "fs.permissions.umask-mode";
    private MRClientSecurityInfo mrClientSecurityInfo;
    private final Injector injector;
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final MapReduce mapReduce;
    private final MapReduceSpecification specification;
    private final Location programJarLocation;
    private final BasicMapReduceContext context;
    private final NamespacedLocationFactory locationFactory;
    private final StreamAdmin streamAdmin;
    private final TransactionSystemClient txClient;
    private final AuthorizationEnforcer authorizationEnforcer;
    private final AuthenticationContext authenticationContext;
    private Job job;
    private Transaction transaction;
    private Runnable cleanupTask;
    private ClassLoader classLoader;
    private volatile boolean stopRequested;
    private static final Logger LOG = LoggerFactory.getLogger(MapReduceRuntimeService.class);
    private static final Pattern PROGRAMATIC_SOURCE_PATTERN = Pattern.compile("program{1,2}atically");

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceRuntimeService$TaskType.class */
    private enum TaskType {
        MAP("mapreduce.map.memory.mb", "mapreduce.map.java.opts"),
        REDUCE("mapreduce.reduce.memory.mb", "mapreduce.reduce.java.opts");

        private final String memoryConfKey;
        private final String javaOptsKey;
        private final String vcoreConfKey;

        TaskType(String str, String str2) {
            this.memoryConfKey = str;
            this.javaOptsKey = str2;
            String str3 = null;
            try {
                str3 = Job.class.getField(name() + "_CPU_VCORES").get(null).toString();
            } catch (Exception e) {
            }
            this.vcoreConfKey = str3;
        }

        public void setResources(Configuration configuration, @Nullable Resources resources) {
            if (resources == null) {
                return;
            }
            configuration.setInt(this.memoryConfKey, resources.getMemoryMB());
            configuration.set(this.javaOptsKey, "-Xmx" + ((int) (resources.getMemoryMB() * 0.8d)) + "m");
            if (this.vcoreConfKey != null) {
                configuration.setInt(this.vcoreConfKey, resources.getVirtualCores());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MapReduceRuntimeService(Injector injector, CConfiguration cConfiguration, Configuration configuration, MapReduce mapReduce, MapReduceSpecification mapReduceSpecification, BasicMapReduceContext basicMapReduceContext, Location location, NamespacedLocationFactory namespacedLocationFactory, StreamAdmin streamAdmin, TransactionSystemClient transactionSystemClient, AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext) {
        this.injector = injector;
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.mapReduce = mapReduce;
        this.specification = mapReduceSpecification;
        this.programJarLocation = location;
        this.locationFactory = namespacedLocationFactory;
        this.streamAdmin = streamAdmin;
        this.txClient = new RetryingLongTransactionSystemClient(transactionSystemClient, basicMapReduceContext.getRetryStrategy());
        this.context = basicMapReduceContext;
        this.authorizationEnforcer = authorizationEnforcer;
        this.authenticationContext = authenticationContext;
    }

    protected String getServiceName() {
        return "MapReduceRunner-" + this.specification.getName();
    }

    protected void startUp() throws Exception {
        Location createPluginArchive;
        File createTempDirectory = createTempDirectory();
        this.cleanupTask = createCleanupTask(createTempDirectory);
        try {
            Job createJob = createJob(new File(createTempDirectory, "mapreduce"));
            Configuration configuration = createJob.getConfiguration();
            this.classLoader = new MapReduceClassLoader(this.injector, this.cConf, configuration, this.context.getProgram().getClassLoader(), this.context.getApplicationSpecification().getPlugins(), this.context.getPluginInstantiator());
            this.cleanupTask = createCleanupTask(this.cleanupTask, this.classLoader);
            configuration.setClassLoader(new WeakReferenceDelegatorClassLoader(this.classLoader));
            ClassLoaders.setContextClassLoader(configuration.getClassLoader());
            this.context.setJob(createJob);
            beforeSubmit(createJob);
            Map<String, String> localizeUserResources = localizeUserResources(createJob, createTempDirectory);
            String jobName = createJob.getJobName();
            if (!jobName.isEmpty()) {
                LOG.warn("Job name {} is being overridden.", jobName);
            }
            createJob.setJobName(getJobName(this.context));
            Location createTempLocationDirectory = createTempLocationDirectory();
            this.cleanupTask = createCleanupTask(this.cleanupTask, createTempLocationDirectory);
            if (!MapReduceTaskContextProvider.isLocal(configuration) && (createPluginArchive = createPluginArchive(createTempLocationDirectory)) != null) {
                createJob.addCacheArchive(createPluginArchive.toURI());
                configuration.set("cdap.program.plugin.archive", createPluginArchive.getName());
            }
            TaskType.MAP.setResources(configuration, this.context.getMapperResources());
            TaskType.REDUCE.setResources(configuration, this.context.getReducerResources());
            MapperWrapper.wrap(createJob);
            ReducerWrapper.wrap(createJob);
            PartitionerWrapper.wrap(createJob);
            RawComparatorWrapper.CombinerGroupComparatorWrapper.wrap(createJob);
            RawComparatorWrapper.GroupComparatorWrapper.wrap(createJob);
            RawComparatorWrapper.KeyComparatorWrapper.wrap(createJob);
            File buildJobJar = buildJobJar(createJob, createTempDirectory);
            createJob.setJar(buildJobJar.toURI().toString());
            Location location = this.programJarLocation;
            if (!MapReduceTaskContextProvider.isLocal(configuration)) {
                location = copyProgramJar(createTempLocationDirectory);
                createJob.addCacheFile(location.toURI());
                Location createLauncherJar = createLauncherJar(createTempLocationDirectory);
                createJob.addCacheFile(createLauncherJar.toURI());
                ArrayList arrayList = new ArrayList();
                arrayList.add(createLauncherJar.getName());
                Location createLogbackJar = ProgramRunners.createLogbackJar(createTempLocationDirectory.append("logback.xml.jar"));
                if (createLogbackJar != null) {
                    createJob.addCacheFile(createLogbackJar.toURI());
                    arrayList.add(createLogbackJar.getName());
                    configuration.set("yarn.app.mapreduce.am.env", "CDAP_LOG_DIR=<LOG_DIR>");
                    configuration.set("mapreduce.map.env", "CDAP_LOG_DIR=<LOG_DIR>");
                    configuration.set("mapreduce.reduce.env", "CDAP_LOG_DIR=<LOG_DIR>");
                }
                ArrayList arrayList2 = new ArrayList();
                JarFile jarFile = new JarFile(buildJobJar);
                Throwable th = null;
                try {
                    try {
                        Enumeration<JarEntry> entries = jarFile.entries();
                        while (entries.hasMoreElements()) {
                            JarEntry nextElement = entries.nextElement();
                            if (nextElement.getName().startsWith("lib/") && nextElement.getName().endsWith(".jar")) {
                                arrayList2.add("job.jar/" + nextElement.getName());
                            }
                        }
                        if (jarFile != null) {
                            if (0 != 0) {
                                try {
                                    jarFile.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                jarFile.close();
                            }
                        }
                        Collections.sort(arrayList2);
                        arrayList.addAll(arrayList2);
                        arrayList.add("job.jar/classes");
                        for (URI uri : CConfigurationUtil.getExtraJars(this.cConf)) {
                            if ("file".equals(uri.getScheme())) {
                                createJob.addCacheFile(copyFileToLocation(new File(uri.getPath()), createTempLocationDirectory).toURI());
                            } else {
                                createJob.addCacheFile(uri);
                            }
                            arrayList.add(LocalizationUtils.getLocalizedName(uri));
                        }
                        MapReduceContainerHelper.addMapReduceClassPath(configuration, arrayList);
                        configuration.set("mapreduce.application.classpath", Joiner.on(",").join(arrayList));
                        configuration.set("yarn.application.classpath", Joiner.on(",").join(arrayList));
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (jarFile != null) {
                        if (th != null) {
                            try {
                                jarFile.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            jarFile.close();
                        }
                    }
                    throw th3;
                }
            }
            MapReduceContextConfig mapReduceContextConfig = new MapReduceContextConfig(configuration);
            Transaction startLong = this.txClient.startLong();
            try {
                mapReduceContextConfig.set(this.context, this.cConf, startLong, location.toURI(), localizeUserResources);
                createJob.submit();
                LOG.debug("Submitted MapReduce Job: {}.", this.context);
                this.job = createJob;
                this.transaction = startLong;
            } catch (Throwable th5) {
                Transactions.invalidateQuietly(this.txClient, startLong);
                throw th5;
            }
        } catch (LinkageError e) {
            throw new Exception(e.getMessage(), e);
        } catch (Throwable th6) {
            this.cleanupTask.run();
            if (!(th6 instanceof TransactionFailureException)) {
                throw th6;
            }
            throw ((Exception) Transactions.propagate(th6, Exception.class));
        }
    }

    protected void run() throws Exception {
        MapReduceMetricsWriter mapReduceMetricsWriter = new MapReduceMetricsWriter(this.job, this.context);
        while (!this.job.isComplete()) {
            mapReduceMetricsWriter.reportStats();
            TimeUnit.SECONDS.sleep(1L);
        }
        LOG.info("MapReduce Job completed{}. Job details: [{}]", this.job.isSuccessful() ? " successfully" : "", this.context);
        mapReduceMetricsWriter.reportStats();
        TimeUnit.SECONDS.sleep(2L);
        if (this.stopRequested) {
            return;
        }
        Preconditions.checkState(this.job.isSuccessful(), "MapReduce JobId %s failed", new Object[]{this.job.getStatus().getJobID()});
    }

    protected void shutDown() throws Exception {
        boolean isSuccessful = this.job.isSuccessful();
        String failureInfo = this.job.getStatus().getFailureInfo();
        try {
            try {
                if (isSuccessful) {
                    LOG.debug("Committing MapReduce Job transaction: {}", this.context);
                    this.context.getMessagingService().publish(StoreRequestBuilder.of(NamespaceId.SYSTEM.topic(this.cConf.get("data.event.topic"))).setTransaction(Long.valueOf(this.transaction.getWritePointer())).build());
                    if (!this.txClient.commit(this.transaction)) {
                        LOG.warn("MapReduce Job transaction failed to commit");
                        throw new TransactionFailureException("Failed to commit transaction for MapReduce " + this.context.toString());
                    }
                } else {
                    this.txClient.invalidate(this.transaction.getWritePointer());
                }
                try {
                    destroy(isSuccessful, failureInfo);
                    this.context.close();
                    this.cleanupTask.run();
                } finally {
                }
            } catch (Throwable th) {
                throw th;
            }
        } catch (Throwable th2) {
            try {
                destroy(isSuccessful, failureInfo);
                this.context.close();
                this.cleanupTask.run();
                throw th2;
            } finally {
            }
        }
    }

    protected void triggerShutdown() {
        try {
            this.stopRequested = true;
            if (this.job != null && !this.job.isComplete()) {
                this.job.killJob();
            }
        } catch (IOException e) {
            LOG.error("Failed to kill MapReduce job {}", this.context, e);
            throw Throwables.propagate(e);
        }
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.1
            @Override // java.util.concurrent.Executor
            public void execute(@Nonnull final Runnable runnable) {
                Thread thread = new Thread(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LoggingContextAccessor.setLoggingContext(MapReduceRuntimeService.this.context.getLoggingContext());
                        runnable.run();
                    }
                });
                thread.setDaemon(true);
                thread.setName(MapReduceRuntimeService.this.getServiceName());
                thread.start();
            }
        };
    }

    private Job createJob(File file) throws IOException {
        Job job = Job.getInstance(new Configuration(this.hConf));
        Configuration configuration = job.getConfiguration();
        if (MapReduceTaskContextProvider.isLocal(configuration)) {
            configuration.set("mapreduce.cluster.local.dir", new File(file, "local").getAbsolutePath());
            configuration.set("mapreduce.jobtracker.system.dir", new File(file, "system").getAbsolutePath());
            configuration.set("mapreduce.jobtracker.staging.root.dir", new File(file, "staging").getAbsolutePath());
            configuration.set("mapreduce.cluster.temp.dir", new File(file, "temp").getAbsolutePath());
        }
        if (UserGroupInformation.isSecurityEnabled()) {
            configuration.unset("mapreduce.jobhistory.address");
            configuration.setBoolean("mapreduce.job.am-access-disabled", false);
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            LOG.debug("Running in secure mode; adding all user credentials: {}", credentials.getAllTokens());
            job.getCredentials().addAll(credentials);
        }
        new GenericOptionsParser(configuration, (String[]) null).getRemainingArgs();
        return job;
    }

    private File createTempDirectory() {
        Id.Program id = this.context.getProgram().getId().toId();
        File file = new File(new File(new File(this.cConf.get("local.data.dir"), this.cConf.get("app.temp.dir")).getAbsoluteFile(), "runner"), String.format("%s.%s.%s.%s.%s", id.getType().name().toLowerCase(), id.getNamespaceId(), id.getApplicationId(), id.getId(), this.context.getRunId().getId()));
        file.mkdirs();
        return file;
    }

    private Location createTempLocationDirectory() throws IOException {
        ProgramId id = this.context.getProgram().getId();
        Location append = this.locationFactory.get(id.getNamespaceId()).append(String.format("%s/%s.%s.%s.%s.%s", this.cConf.get("app.temp.dir"), id.getType().name().toLowerCase(), id.getNamespace(), id.getApplication(), id.getProgram(), this.context.getRunId().getId()));
        append.mkdirs();
        return append;
    }

    private void beforeSubmit(final Job job) throws Exception {
        if (TransactionControl.EXPLICIT == (this.mapReduce instanceof AbstractMapReduce ? Transactions.getTransactionControl(TransactionControl.IMPLICIT, AbstractMapReduce.class, this.mapReduce, "initialize", new Class[0]) : this.mapReduce instanceof ProgramLifecycle ? Transactions.getTransactionControl(TransactionControl.IMPLICIT, MapReduce.class, this.mapReduce, "initialize", new Class[]{MapReduceContext.class}) : TransactionControl.IMPLICIT)) {
            doInitialize(job);
        } else {
            Transactionals.execute(this.context, new TxRunnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.2
                public void run(DatasetContext datasetContext) throws Exception {
                    MapReduceRuntimeService.this.doInitialize(job);
                }
            }, Exception.class);
        }
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(job.getConfiguration().getClassLoader());
        try {
            TypeToken<Mapper> inputsIfNeeded = setInputsIfNeeded(job);
            setOutputsIfNeeded(job);
            setOutputClassesIfNeeded(job, inputsIfNeeded);
            setMapOutputClassesIfNeeded(job, inputsIfNeeded);
            ClassLoaders.setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doInitialize(Job job) throws Exception {
        this.context.setState(new ProgramState(ProgramStatus.INITIALIZING, (String) null));
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(job.getConfiguration().getClassLoader());
        try {
            if (this.mapReduce instanceof ProgramLifecycle) {
                this.mapReduce.initialize(this.context);
            } else {
                this.mapReduce.beforeSubmit(this.context);
            }
            this.context.setState(new ProgramState(ProgramStatus.RUNNING, (String) null));
        } finally {
            ClassLoaders.setContextClassLoader(contextClassLoader);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOutput(boolean z, String str, OutputFormatProvider outputFormatProvider, AtomicReference<Exception> atomicReference) {
        if (outputFormatProvider instanceof DatasetOutputCommitter) {
            if (z) {
                try {
                    if (atomicReference.get() == null) {
                        ((DatasetOutputCommitter) outputFormatProvider).onSuccess();
                    }
                } catch (Throwable th) {
                    Logger logger = LOG;
                    Object[] objArr = new Object[2];
                    objArr[0] = z ? "onSuccess" : "onFailure";
                    objArr[1] = str;
                    logger.error(String.format("Error from %s method of output dataset %s.", objArr), th);
                    if (atomicReference.get() != null) {
                        atomicReference.get().addSuppressed(th);
                        return;
                    } else {
                        atomicReference.set(th instanceof Exception ? (Exception) th : new RuntimeException(th));
                        return;
                    }
                }
            }
            ((DatasetOutputCommitter) outputFormatProvider).onFailure();
        }
    }

    private ProgramState getProgramState(boolean z, String str) {
        return this.stopRequested ? new ProgramState(ProgramStatus.KILLED, (String) null) : !z ? new ProgramState(ProgramStatus.FAILED, str) : new ProgramState(ProgramStatus.COMPLETED, (String) null);
    }

    private void destroy(final boolean z, String str) throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        try {
            this.context.execute(new TxRunnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.3
                public void run(DatasetContext datasetContext) throws Exception {
                    ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(MapReduceRuntimeService.this.job.getConfiguration().getClassLoader());
                    try {
                        Iterator<Map.Entry<String, ProvidedOutput>> it = MapReduceRuntimeService.this.context.getOutputs().entrySet().iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            Map.Entry<String, ProvidedOutput> next = it.next();
                            MapReduceRuntimeService.this.commitOutput(z, next.getKey(), next.getValue().getOutputFormatProvider(), atomicReference);
                            if (z && atomicReference.get() != null) {
                                for (ProvidedOutput providedOutput : MapReduceRuntimeService.this.context.getOutputs().values()) {
                                    MapReduceRuntimeService.this.commitOutput(false, providedOutput.getAlias(), providedOutput.getOutputFormatProvider(), atomicReference);
                                }
                            }
                        }
                        Exception exc = (Exception) atomicReference.get();
                        if (exc != null) {
                            atomicReference.set(null);
                            throw exc;
                        }
                    } finally {
                        ClassLoaders.setContextClassLoader(contextClassLoader);
                    }
                }
            });
        } catch (TransactionFailureException e) {
            LOG.error("Transaction failure when committing dataset outputs", e);
            if (atomicReference.get() != null) {
                ((Exception) atomicReference.get()).addSuppressed(e);
            } else {
                atomicReference.set(e);
            }
        }
        final boolean z2 = z && atomicReference.get() == null;
        this.context.setState(getProgramState(z2, str));
        try {
            if (TransactionControl.IMPLICIT == (this.mapReduce instanceof ProgramLifecycle ? Transactions.getTransactionControl(TransactionControl.IMPLICIT, MapReduce.class, this.mapReduce, "destroy", new Class[0]) : TransactionControl.IMPLICIT)) {
                this.context.execute(new TxRunnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.4
                    public void run(DatasetContext datasetContext) throws Exception {
                        MapReduceRuntimeService.this.doDestroy(z2);
                    }
                });
            } else {
                doDestroy(z2);
            }
        } catch (Throwable th) {
            th = th;
            if ((th instanceof TransactionFailureException) && th.getCause() != null && !(th instanceof TransactionConflictException)) {
                th = th.getCause();
            }
            LOG.warn("Error executing the destroy method of the MapReduce program {}", this.context.getProgram().getName(), th);
        }
        if (atomicReference.get() != null) {
            throw ((Exception) atomicReference.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doDestroy(boolean z) throws Exception {
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(this.job.getConfiguration().getClassLoader());
        try {
            if (this.mapReduce instanceof ProgramLifecycle) {
                this.mapReduce.destroy();
            } else {
                this.mapReduce.onFinish(z, this.context);
            }
        } finally {
            ClassLoaders.setContextClassLoader(contextClassLoader);
        }
    }

    private void assertConsistentTypes(Class<? extends Mapper> cls, Map.Entry<Class, Class> entry, Class<? extends Mapper> cls2) {
        Map.Entry<Class, Class> mapperOutputKeyValueTypes = getMapperOutputKeyValueTypes(cls2);
        if (!entry.getKey().equals(mapperOutputKeyValueTypes.getKey()) || !entry.getValue().equals(mapperOutputKeyValueTypes.getValue())) {
            throw new IllegalArgumentException(String.format("Type mismatch in output type of mappers: %s and %s. Map output key types: %s and %s. Map output value types: %s and %s.", cls, cls2, entry.getKey(), mapperOutputKeyValueTypes.getKey(), entry.getValue(), mapperOutputKeyValueTypes.getValue()));
        }
    }

    private Map.Entry<Class, Class> getMapperOutputKeyValueTypes(Class<? extends Mapper> cls) {
        Type[] actualTypeArguments = ((ParameterizedType) resolveClass(cls, Mapper.class).getType()).getActualTypeArguments();
        return new AbstractMap.SimpleEntry(TypeToken.of(actualTypeArguments[2]).getRawType(), TypeToken.of(actualTypeArguments[3]).getRawType());
    }

    @Nullable
    private TypeToken<Mapper> setInputsIfNeeded(Job job) throws IOException, ClassNotFoundException {
        Class<? extends Mapper> mapperClass = job.getMapperClass();
        Class<? extends Mapper> cls = null;
        Map.Entry<Class, Class> entry = null;
        for (Map.Entry<String, MapperInput> entry2 : this.context.getMapperInputs().entrySet()) {
            MapperInput value = entry2.getValue();
            InputFormatProvider inputFormatProvider = value.getInputFormatProvider();
            Map<String, String> inputFormatConfiguration = value.getInputFormatConfiguration();
            Class<? extends Mapper> mapper = value.getMapper() == null ? mapperClass : value.getMapper();
            if (cls == null) {
                cls = mapper;
                entry = getMapperOutputKeyValueTypes(mapper);
            } else {
                assertConsistentTypes(cls, entry, mapper);
            }
            if (inputFormatProvider instanceof StreamInputFormatProvider) {
                StreamInputFormatProvider streamInputFormatProvider = (StreamInputFormatProvider) inputFormatProvider;
                setDecoderForStream(streamInputFormatProvider, job, inputFormatConfiguration, value.getMapper());
                try {
                    this.authorizationEnforcer.enforce(streamInputFormatProvider.getStreamId(), this.authenticationContext.getPrincipal(), Action.READ);
                } catch (Exception e) {
                    Throwables.propagateIfPossible(e, IOException.class);
                    throw new IOException(e);
                }
            }
            MultipleInputs.addInput(job, entry2.getKey(), value.getInputFormatClassName(), inputFormatConfiguration, mapper);
        }
        return (cls == null || cls == mapperClass) ? resolveClass(job.getConfiguration(), "mapreduce.job.map.class", Mapper.class) : resolveClass(cls, Mapper.class);
    }

    private void setDecoderForStream(StreamInputFormatProvider streamInputFormatProvider, Job job, Map<String, String> map, Class<? extends Mapper> cls) {
        streamInputFormatProvider.setDecoderType(map, getInputValueType(job.getConfiguration(), StreamEvent.class, cls == null ? null : resolveClass(cls, Mapper.class)));
        StreamId streamId = streamInputFormatProvider.getStreamId();
        try {
            this.streamAdmin.register(ImmutableList.of(this.context.getProgram().getId()), streamId);
            this.streamAdmin.addAccess(this.context.getProgram().getId().run(this.context.getRunId().getId()), streamId, AccessType.READ);
        } catch (Exception e) {
            LOG.warn("Failed to register usage {} -> {}", new Object[]{this.context.getProgram().getId(), streamId, e});
        }
    }

    private void setOutputsIfNeeded(Job job) {
        Map<String, ProvidedOutput> outputs = this.context.getOutputs();
        fixOutputPermissions(job, outputs);
        LOG.debug("Using as output for MapReduce Job: {}", outputs.keySet());
        if (outputs.isEmpty()) {
            return;
        }
        if (outputs.size() == 1) {
            ProvidedOutput next = outputs.values().iterator().next();
            ConfigurationUtil.setAll(next.getOutputFormatConfiguration(), job.getConfiguration());
            job.getConfiguration().set("mapreduce.job.outputformat.class", next.getOutputFormatClassName());
            return;
        }
        MultipleOutputsMainOutputWrapper.setRootOutputFormat(job, UnsupportedOutputFormat.class.getName(), new HashMap());
        job.setOutputFormatClass(MultipleOutputsMainOutputWrapper.class);
        Iterator<Map.Entry<String, ProvidedOutput>> it = outputs.entrySet().iterator();
        while (it.hasNext()) {
            ProvidedOutput value = it.next().getValue();
            MultipleOutputs.addNamedOutput(job, value.getAlias(), value.getOutputFormatClassName(), job.getOutputKeyClass(), job.getOutputValueClass(), value.getOutputFormatConfiguration());
        }
    }

    private void fixOutputPermissions(Job job, Map<String, ProvidedOutput> map) {
        Configuration configuration = job.getConfiguration();
        HashSet<String> hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<String, ProvidedOutput> entry : map.entrySet()) {
            String str = entry.getValue().getOutputFormatConfiguration().get(HADOOP_UMASK_PROPERTY);
            if (str != null) {
                hashSet.add(entry.getKey());
                hashSet2.add(str);
            }
        }
        boolean z = hashSet.size() == map.size();
        boolean z2 = hashSet2.size() == 1;
        boolean isProgrammaticConfig = isProgrammaticConfig(configuration, HADOOP_UMASK_PROPERTY);
        String str2 = configuration.get(HADOOP_UMASK_PROPERTY);
        boolean z3 = false;
        if (isProgrammaticConfig) {
            z3 = !hashSet.isEmpty();
            if (z3) {
                LOG.info("Overriding permissions of outputs {} because a umask of {} was set programmatically in the job configuration.", hashSet, str2);
            }
        } else if (z && z2) {
            String str3 = (String) hashSet2.iterator().next();
            configuration.set(HADOOP_UMASK_PROPERTY, str3);
            LOG.debug("Setting umask of {} in job configuration because all outputs {} agree on it.", str3, hashSet);
        } else {
            z3 = !hashSet.isEmpty();
            if (z3) {
                LOG.warn("Overriding permissions of outputs {} because they configure different permissions. Falling back to default umask of {} in job configuration.", hashSet, str2);
            }
        }
        if (z3) {
            for (String str4 : hashSet) {
                ProvidedOutput providedOutput = map.get(str4);
                HashMap hashMap = new HashMap(providedOutput.getOutputFormatConfiguration());
                hashMap.remove(HADOOP_UMASK_PROPERTY);
                map.put(str4, new ProvidedOutput(providedOutput.getAlias(), providedOutput.getOutputFormatProvider(), providedOutput.getOutputFormatClassName(), hashMap));
            }
        }
    }

    @VisibleForTesting
    static Type getInputValueType(Configuration configuration, Type type, @Nullable TypeToken<?> typeToken) {
        if (typeToken == null) {
            typeToken = resolveClass(configuration, "mapreduce.job.map.class", Mapper.class);
        }
        TypeToken<?> resolveClass = typeToken == null ? resolveClass(configuration, "mapreduce.job.reduce.class", Reducer.class) : typeToken;
        Preconditions.checkArgument(resolveClass != null, "Neither a Mapper nor a Reducer is configured for the MapReduce job.");
        if (!(resolveClass.getType() instanceof ParameterizedType)) {
            return type;
        }
        Type type2 = ((ParameterizedType) resolveClass.getType()).getActualTypeArguments()[1];
        if ((type2 instanceof TypeVariable) && type2.equals(resolveClass.getRawType().getTypeParameters()[1])) {
            type2 = type;
        }
        return type2;
    }

    private String getJobName(BasicMapReduceContext basicMapReduceContext) {
        Id.Program id = basicMapReduceContext.getProgram().getId().toId();
        return String.format("%s.%s.%s.%s.%s", basicMapReduceContext.getRunId().getId(), ProgramType.MAPREDUCE.name().toLowerCase(), id.getNamespaceId(), id.getApplicationId(), id.getId());
    }

    private File buildJobJar(Job job, File file) throws IOException, URISyntaxException {
        Class decoderClass;
        File file2 = new File(file, "job.jar");
        LOG.debug("Creating Job jar: {}", file2);
        if (MapReduceTaskContextProvider.isLocal(job.getConfiguration())) {
            new JarOutputStream(new FileOutputStream(file2)).close();
            return file2;
        }
        final HadoopClassExcluder hadoopClassExcluder = new HadoopClassExcluder();
        ApplicationBundler applicationBundler = new ApplicationBundler(new ClassAcceptor() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.5
            public boolean accept(String str, URL url, URL url2) {
                if (str.startsWith("org.apache.spark") || url2.toString().contains("spark-assembly")) {
                    return false;
                }
                return hadoopClassExcluder.accept(str, url, url2);
            }
        });
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(MapReduce.class);
        newHashSet.add(MapperWrapper.class);
        newHashSet.add(ReducerWrapper.class);
        newHashSet.add(SLF4JBridgeHandler.class);
        if (this.cConf.getBoolean("mapreduce.include.custom.format.classes")) {
            try {
                Class inputFormatClass = job.getInputFormatClass();
                newHashSet.add(inputFormatClass);
                if (MapReduceStreamInputFormat.class.isAssignableFrom(inputFormatClass) && (decoderClass = MapReduceStreamInputFormat.getDecoderClass(job.getConfiguration())) != null) {
                    newHashSet.add(decoderClass);
                }
            } catch (Throwable th) {
                LOG.debug("InputFormat class not found: {}", th.getMessage(), th);
            }
            try {
                newHashSet.add(job.getOutputFormatClass());
            } catch (Throwable th2) {
                LOG.debug("OutputFormat class not found: {}", th2.getMessage(), th2);
            }
        }
        if (SecureStoreUtils.isKMSBacked(this.cConf) && SecureStoreUtils.isKMSCapable()) {
            newHashSet.add(SecureStoreUtils.getKMSSecureStore());
        }
        Class<?> cls = new HBaseDDLExecutorFactory(this.cConf, this.hConf).get().getClass();
        try {
            newHashSet.add(HBaseTableUtilFactory.getHBaseTableUtilClass(this.cConf));
            newHashSet.add(cls);
        } catch (ProvisionException e) {
            LOG.warn("Not including HBaseTableUtil classes in submitted Job Jar since they are not available");
        }
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(new CombineClassLoader(getClass().getClassLoader(), Collections.singleton(cls.getClassLoader())));
        try {
            applicationBundler.createBundle(Locations.toLocation(file2), newHashSet);
            ClassLoaders.setContextClassLoader(contextClassLoader);
            LOG.debug("Built MapReduce Job Jar at {}", file2.toURI());
            return file2;
        } catch (Throwable th3) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th3;
        }
    }

    @VisibleForTesting
    @Nullable
    static <V> TypeToken<V> resolveClass(Configuration configuration, String str, Class<V> cls) {
        Class cls2 = configuration.getClass(str, (Class) null, cls);
        if (cls2 == null) {
            return null;
        }
        return resolveClass(cls2, cls);
    }

    private static <V> TypeToken<V> resolveClass(Class<? extends V> cls, Class<V> cls2) {
        return TypeToken.of(cls).getSupertype(cls2);
    }

    private void setOutputClassesIfNeeded(Job job, @Nullable TypeToken<?> typeToken) {
        Configuration configuration = job.getConfiguration();
        TypeToken<?> resolveClass = resolveClass(configuration, "mapreduce.job.reduce.class", Reducer.class);
        if (resolveClass == null) {
            resolveClass = typeToken;
        }
        if (resolveClass == null || !(resolveClass.getType() instanceof ParameterizedType)) {
            return;
        }
        Type[] actualTypeArguments = ((ParameterizedType) resolveClass.getType()).getActualTypeArguments();
        if (!isProgrammaticConfig(configuration, "mapreduce.job.output.key.class")) {
            Class rawType = TypeToken.of(actualTypeArguments[2]).getRawType();
            LOG.debug("Set output key class to {}", rawType);
            job.setOutputKeyClass(rawType);
        }
        if (isProgrammaticConfig(configuration, "mapreduce.job.output.value.class")) {
            return;
        }
        Class rawType2 = TypeToken.of(actualTypeArguments[3]).getRawType();
        LOG.debug("Set output value class to {}", rawType2);
        job.setOutputValueClass(rawType2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void setMapOutputClassesIfNeeded(Job job, @Nullable TypeToken<?> typeToken) {
        Configuration configuration = job.getConfiguration();
        TypeToken<?> typeToken2 = typeToken;
        Object[] objArr = 2;
        Object[] objArr2 = 3;
        if (typeToken2 == null) {
            typeToken2 = resolveClass(configuration, "mapreduce.job.reduce.class", Reducer.class);
            objArr = false;
            objArr2 = true;
        }
        if (typeToken2 == null || !(typeToken2.getType() instanceof ParameterizedType)) {
            return;
        }
        Type[] actualTypeArguments = ((ParameterizedType) typeToken2.getType()).getActualTypeArguments();
        if (!isProgrammaticConfig(configuration, "mapreduce.map.output.key.class")) {
            Class rawType = TypeToken.of(actualTypeArguments[objArr == true ? 1 : 0]).getRawType();
            LOG.debug("Set map output key class to {}", rawType);
            job.setMapOutputKeyClass(rawType);
        }
        if (isProgrammaticConfig(configuration, "mapreduce.map.output.value.class")) {
            return;
        }
        Class rawType2 = TypeToken.of(actualTypeArguments[objArr2 == true ? 1 : 0]).getRawType();
        LOG.debug("Set map output value class to {}", rawType2);
        job.setMapOutputValueClass(rawType2);
    }

    private boolean isProgrammaticConfig(Configuration configuration, String str) {
        String[] propertySources = configuration.getPropertySources(str);
        return propertySources != null && propertySources.length > 0 && PROGRAMATIC_SOURCE_PATTERN.matcher(propertySources[0]).matches();
    }

    @Nullable
    private Location createPluginArchive(Location location) throws IOException {
        File pluginArchive = this.context.getPluginArchive();
        if (pluginArchive == null) {
            return null;
        }
        return copyFileToLocation(pluginArchive, location);
    }

    private Location copyFileToLocation(File file, Location location) throws IOException {
        Location tempFile = location.append(file.getName()).getTempFile(".jar");
        Files.copy(file, Locations.newOutputSupplier(tempFile));
        return tempFile;
    }

    private Location copyProgramJar(Location location) throws IOException {
        Location append = location.append("program.jar");
        ByteStreams.copy(Locations.newInputSupplier(this.programJarLocation), Locations.newOutputSupplier(append));
        LOG.debug("Copied Program Jar to {}, source: {}", append, this.programJarLocation);
        return append;
    }

    private Location createLauncherJar(Location location) throws IOException {
        Location append = location.append("launcher.jar");
        ContainerLauncherGenerator.generateLauncherJar(Arrays.asList("org.apache.hadoop.mapreduce.v2.app.MRAppMaster", "org.apache.hadoop.mapred.YarnChild"), MapReduceContainerLauncher.class, Locations.newOutputSupplier(append));
        return append;
    }

    private Runnable createCleanupTask(final Object... objArr) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.batch.MapReduceRuntimeService.6
            @Override // java.lang.Runnable
            public void run() {
                for (Object obj : objArr) {
                    if (obj != null) {
                        try {
                            if (obj instanceof File) {
                                if (((File) obj).isDirectory()) {
                                    DirUtils.deleteDirectoryContents((File) obj);
                                } else {
                                    ((File) obj).delete();
                                }
                            } else if (obj instanceof Location) {
                                Locations.deleteQuietly((Location) obj, true);
                            } else if (obj instanceof AutoCloseable) {
                                ((AutoCloseable) obj).close();
                            } else if (obj instanceof Runnable) {
                                ((Runnable) obj).run();
                            }
                        } catch (Throwable th) {
                            MapReduceRuntimeService.LOG.warn("Exception when cleaning up resource {}", obj, th);
                        }
                    }
                }
            }
        };
    }

    private Map<String, String> localizeUserResources(Job job, File file) throws IOException {
        String str;
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, LocalizeResource> entry : this.context.getResourcesToLocalize().entrySet()) {
            String key = entry.getKey();
            if (MapReduceTaskContextProvider.isLocal(job.getConfiguration())) {
                str = LocalizationUtils.localizeResource(entry.getKey(), entry.getValue(), file).getAbsolutePath();
            } else {
                URI uri = entry.getValue().getURI();
                try {
                    URI uri2 = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), uri.getQuery(), key);
                    if (entry.getValue().isArchive()) {
                        job.addCacheArchive(uri2);
                    } else {
                        job.addCacheFile(uri2);
                    }
                    str = key;
                } catch (URISyntaxException e) {
                    throw Throwables.propagate(e);
                }
            }
            LOG.debug("MapReduce Localizing file {} {}", entry.getKey(), entry.getValue());
            hashMap.put(key, str);
        }
        return hashMap;
    }
}
