package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TestingGcMonitor;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.FragmentResultCacheContext;
import com.facebook.presto.execution.ScheduledSplit;
import com.facebook.presto.execution.StageExecutionId;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferState;
import com.facebook.presto.execution.buffer.OutputBufferInfo;
import com.facebook.presto.execution.buffer.OutputBufferMemoryManager;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.memory.MemoryPool;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.memory.QueryContext;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.MetadataUpdates;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.operator.FragmentResultCacheManager;
import com.facebook.presto.operator.OutputFactory;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.TaskStats;
import com.facebook.presto.spark.PrestoSparkAuthenticatorProvider;
import com.facebook.presto.spark.PrestoSparkSessionProperties;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutor;
import com.facebook.presto.spark.classloader_interface.IPrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkMutableRow;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskInputs;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.SerializedPrestoSparkTaskSource;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.PrestoSparkPageOutputOperator;
import com.facebook.presto.spark.execution.PrestoSparkRowBatch;
import com.facebook.presto.spark.execution.PrestoSparkRowOutputOperator;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.spiller.SpillSpaceTracker;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.OutputPartitioning;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.util.Failures;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.airlift.units.DataSize;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.CollectionAccumulator;
import org.joda.time.DateTime;
import scala.Tuple2;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;

/* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory.class */
public class PrestoSparkTaskExecutorFactory implements IPrestoSparkTaskExecutorFactory {
    private static final Logger log = Logger.get(PrestoSparkTaskExecutorFactory.class);
    private final SessionPropertyManager sessionPropertyManager;
    private final BlockEncodingManager blockEncodingManager;
    private final FunctionAndTypeManager functionAndTypeManager;
    private final JsonCodec<PrestoSparkTaskDescriptor> taskDescriptorJsonCodec;
    private final JsonCodec<TaskSource> taskSourceJsonCodec;
    private final JsonCodec<TaskInfo> taskInfoJsonCodec;
    private final Executor notificationExecutor;
    private final ScheduledExecutorService yieldExecutor;
    private final ScheduledExecutorService memoryUpdateExecutor;
    private final LocalExecutionPlanner localExecutionPlanner;
    private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
    private final TaskExecutor taskExecutor;
    private final SplitMonitor splitMonitor;
    private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
    private final FragmentResultCacheManager fragmentResultCacheManager;
    private final DataSize maxUserMemory;
    private final DataSize maxTotalMemory;
    private final DataSize maxSpillMemory;
    private final DataSize sinkMaxBufferSize;
    private final boolean perOperatorCpuTimerEnabled;
    private final boolean cpuTimerEnabled;
    private final boolean perOperatorAllocationTrackingEnabled;
    private final boolean allocationTrackingEnabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$Output.class */
    public static class Output<T extends PrestoSparkTaskOutput> {
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final OutputFactory outputFactory;
        private final OutputSupplier<T> outputSupplier;

        private Output(OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> prestoSparkOutputBuffer, OutputFactory outputFactory, OutputSupplier<T> outputSupplier) {
            this.outputBufferType = (OutputBufferType) Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = (PrestoSparkOutputBuffer) Objects.requireNonNull(prestoSparkOutputBuffer, "outputBuffer is null");
            this.outputFactory = (OutputFactory) Objects.requireNonNull(outputFactory, "outputFactory is null");
            this.outputSupplier = (OutputSupplier) Objects.requireNonNull(outputSupplier, "outputSupplier is null");
        }

        public OutputBufferType getOutputBufferType() {
            return this.outputBufferType;
        }

        public PrestoSparkOutputBuffer<?> getOutputBuffer() {
            return this.outputBuffer;
        }

        public OutputFactory getOutputFactory() {
            return this.outputFactory;
        }

        public OutputSupplier<T> getOutputSupplier() {
            return this.outputSupplier;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$OutputBufferType.class */
    public enum OutputBufferType {
        SPARK_ROW_OUTPUT_BUFFER,
        SPARK_PAGE_OUTPUT_BUFFER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$OutputSupplier.class */
    public interface OutputSupplier<T extends PrestoSparkTaskOutput> {
        Tuple2<MutablePartitionId, T> getNext() throws InterruptedException;

        long getTimeSpentWaitingForOutputInMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$PageOutputSupplier.class */
    public static class PageOutputSupplier implements OutputSupplier<PrestoSparkSerializedPage> {
        private static final MutablePartitionId DEFAULT_PARTITION = new MutablePartitionId();
        private final PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> outputBuffer;
        private long timeSpentWaitingForOutputInMillis;

        private PageOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkBufferedSerializedPage> prestoSparkOutputBuffer) {
            this.outputBuffer = (PrestoSparkOutputBuffer) Objects.requireNonNull(prestoSparkOutputBuffer, "outputBuffer is null");
        }

        @Override // com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory.OutputSupplier
        public Tuple2<MutablePartitionId, PrestoSparkSerializedPage> getNext() throws InterruptedException {
            long currentTimeMillis = System.currentTimeMillis();
            PrestoSparkBufferedSerializedPage prestoSparkBufferedSerializedPage = this.outputBuffer.get();
            this.timeSpentWaitingForOutputInMillis += System.currentTimeMillis() - currentTimeMillis;
            if (prestoSparkBufferedSerializedPage == null) {
                return null;
            }
            return new Tuple2<>(DEFAULT_PARTITION, PrestoSparkUtils.toPrestoSparkSerializedPage(prestoSparkBufferedSerializedPage.getSerializedPage()));
        }

        @Override // com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory.OutputSupplier
        public long getTimeSpentWaitingForOutputInMillis() {
            return this.timeSpentWaitingForOutputInMillis;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$PrestoSparkTaskExecutor.class */
    public static class PrestoSparkTaskExecutor<T extends PrestoSparkTaskOutput> extends AbstractIterator<Tuple2<MutablePartitionId, T>> implements IPrestoSparkTaskExecutor<T> {
        private final TaskContext taskContext;
        private final TaskStateMachine taskStateMachine;
        private final OutputSupplier<T> outputSupplier;
        private final JsonCodec<TaskInfo> taskInfoJsonCodec;
        private final CollectionAccumulator<SerializedTaskInfo> taskInfoCollector;
        private final CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector;
        private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
        private final OutputBufferType outputBufferType;
        private final PrestoSparkOutputBuffer<?> outputBuffer;
        private final UUID taskInstanceId;
        private Tuple2<MutablePartitionId, T> next;
        private Long start;
        private long processedRows;
        private long processedRowBatches;
        private long processedBytes;

        private PrestoSparkTaskExecutor(TaskContext taskContext, TaskStateMachine taskStateMachine, OutputSupplier<T> outputSupplier, JsonCodec<TaskInfo> jsonCodec, CollectionAccumulator<SerializedTaskInfo> collectionAccumulator, CollectionAccumulator<PrestoSparkShuffleStats> collectionAccumulator2, PrestoSparkExecutionExceptionFactory prestoSparkExecutionExceptionFactory, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> prestoSparkOutputBuffer) {
            this.taskInstanceId = UUID.randomUUID();
            this.taskContext = (TaskContext) Objects.requireNonNull(taskContext, "taskContext is null");
            this.taskStateMachine = (TaskStateMachine) Objects.requireNonNull(taskStateMachine, "taskStateMachine is null");
            this.outputSupplier = (OutputSupplier) Objects.requireNonNull(outputSupplier, "outputSupplier is null");
            this.taskInfoJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "taskInfoJsonCodec is null");
            this.taskInfoCollector = (CollectionAccumulator) Objects.requireNonNull(collectionAccumulator, "taskInfoCollector is null");
            this.shuffleStatsCollector = (CollectionAccumulator) Objects.requireNonNull(collectionAccumulator2, "shuffleStatsCollector is null");
            this.executionExceptionFactory = (PrestoSparkExecutionExceptionFactory) Objects.requireNonNull(prestoSparkExecutionExceptionFactory, "executionExceptionFactory is null");
            this.outputBufferType = (OutputBufferType) Objects.requireNonNull(outputBufferType, "outputBufferType is null");
            this.outputBuffer = (PrestoSparkOutputBuffer) Objects.requireNonNull(prestoSparkOutputBuffer, "outputBuffer is null");
        }

        public boolean hasNext() {
            if (this.next == null) {
                this.next = computeNext();
            }
            return this.next != null;
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Tuple2<MutablePartitionId, T> m11next() {
            if (this.next == null) {
                this.next = computeNext();
            }
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            Tuple2<MutablePartitionId, T> tuple2 = this.next;
            this.next = null;
            return tuple2;
        }

        protected Tuple2<MutablePartitionId, T> computeNext() {
            try {
                return doComputeNext();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.taskStateMachine.abort();
                throw new RuntimeException(e);
            } catch (RuntimeException e2) {
                throw this.executionExceptionFactory.toPrestoSparkExecutionException(e2);
            }
        }

        private Tuple2<MutablePartitionId, T> doComputeNext() throws InterruptedException {
            if (this.start == null) {
                this.start = Long.valueOf(System.currentTimeMillis());
            }
            Tuple2<MutablePartitionId, T> next = this.outputSupplier.getNext();
            if (next != null) {
                this.processedRows += ((PrestoSparkTaskOutput) next._2).getPositionCount();
                this.processedRowBatches++;
                this.processedBytes += ((PrestoSparkTaskOutput) next._2).getSize();
                return next;
            }
            Preconditions.checkState(this.taskStateMachine.getState().isDone(), "task is expected to be done");
            this.shuffleStatsCollector.add(new PrestoSparkShuffleStats(this.taskContext.getTaskId().getStageExecutionId().getStageId().getId(), this.taskContext.getTaskId().getId(), PrestoSparkShuffleStats.Operation.WRITE, this.processedRows, this.processedRowBatches, this.processedBytes, (System.currentTimeMillis() - this.start.longValue()) - this.outputSupplier.getTimeSpentWaitingForOutputInMillis()));
            TaskInfo createTaskInfo = createTaskInfo(this.taskContext, this.taskStateMachine, this.taskInstanceId, this.outputBufferType, this.outputBuffer);
            this.taskInfoCollector.add(new SerializedTaskInfo(createTaskInfo.getTaskId().getStageExecutionId().getStageId().getId(), createTaskInfo.getTaskId().getId(), PrestoSparkUtils.compress(this.taskInfoJsonCodec.toJsonBytes(createTaskInfo))));
            LinkedBlockingQueue failureCauses = this.taskStateMachine.getFailureCauses();
            if (failureCauses.isEmpty()) {
                return null;
            }
            Throwable th = (Throwable) Iterables.getFirst(failureCauses, (Object) null);
            Throwables.propagateIfPossible(th, Error.class);
            Throwables.propagateIfPossible(th, RuntimeException.class);
            Throwables.propagateIfPossible(th, InterruptedException.class);
            throw new RuntimeException(th);
        }

        private static TaskInfo createTaskInfo(TaskContext taskContext, TaskStateMachine taskStateMachine, UUID uuid, OutputBufferType outputBufferType, PrestoSparkOutputBuffer<?> prestoSparkOutputBuffer) {
            TaskId taskId = taskContext.getTaskId();
            TaskState state = taskContext.getState();
            TaskStats taskStats = taskContext.getTaskStats();
            List of = ImmutableList.of();
            if (state == TaskState.FAILED) {
                of = Failures.toFailures(taskStateMachine.getFailureCauses());
            }
            return new TaskInfo(taskId, new TaskStatus(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits(), 1L, state, URI.create("http://fake.invalid/task/" + taskId), taskContext.getCompletedDriverGroups(), of, taskStats.getQueuedPartitionedDrivers(), taskStats.getRunningPartitionedDrivers(), 0.0d, false, taskStats.getPhysicalWrittenDataSizeInBytes(), taskStats.getUserMemoryReservationInBytes(), taskStats.getSystemMemoryReservationInBytes(), taskStats.getPeakNodeTotalMemoryInBytes(), taskStats.getFullGcCount(), taskStats.getFullGcTimeInMillis()), DateTime.now(), new OutputBufferInfo(outputBufferType.name(), BufferState.FINISHED, false, false, 0L, 0L, prestoSparkOutputBuffer.getTotalRowsProcessed(), prestoSparkOutputBuffer.getTotalPagesProcessed(), ImmutableList.of()), ImmutableSet.of(), taskStats, false, MetadataUpdates.DEFAULT_METADATA_UPDATES);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkTaskExecutorFactory$RowOutputSupplier.class */
    public static class RowOutputSupplier implements OutputSupplier<PrestoSparkMutableRow> {
        private final PrestoSparkOutputBuffer<PrestoSparkRowBatch> outputBuffer;
        private PrestoSparkRowBatch.RowTupleSupplier currentRowTupleSupplier;
        private long timeSpentWaitingForOutputInMillis;

        private RowOutputSupplier(PrestoSparkOutputBuffer<PrestoSparkRowBatch> prestoSparkOutputBuffer) {
            this.outputBuffer = (PrestoSparkOutputBuffer) Objects.requireNonNull(prestoSparkOutputBuffer, "outputBuffer is null");
        }

        @Override // com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory.OutputSupplier
        public Tuple2<MutablePartitionId, PrestoSparkMutableRow> getNext() throws InterruptedException {
            Tuple2<MutablePartitionId, PrestoSparkMutableRow> tuple2 = null;
            while (tuple2 == null) {
                if (this.currentRowTupleSupplier == null) {
                    long currentTimeMillis = System.currentTimeMillis();
                    PrestoSparkRowBatch prestoSparkRowBatch = this.outputBuffer.get();
                    this.timeSpentWaitingForOutputInMillis += System.currentTimeMillis() - currentTimeMillis;
                    if (prestoSparkRowBatch == null) {
                        return null;
                    }
                    this.currentRowTupleSupplier = prestoSparkRowBatch.createRowTupleSupplier();
                }
                tuple2 = this.currentRowTupleSupplier.getNext();
                if (tuple2 == null) {
                    this.currentRowTupleSupplier = null;
                }
            }
            return tuple2;
        }

        @Override // com.facebook.presto.spark.execution.PrestoSparkTaskExecutorFactory.OutputSupplier
        public long getTimeSpentWaitingForOutputInMillis() {
            return this.timeSpentWaitingForOutputInMillis;
        }
    }

    @Inject
    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, FunctionAndTypeManager functionAndTypeManager, JsonCodec<PrestoSparkTaskDescriptor> jsonCodec, JsonCodec<TaskSource> jsonCodec2, JsonCodec<TaskInfo> jsonCodec3, Executor executor, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory prestoSparkExecutionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> set, FragmentResultCacheManager fragmentResultCacheManager, TaskManagerConfig taskManagerConfig, NodeMemoryConfig nodeMemoryConfig, NodeSpillConfig nodeSpillConfig) {
        this(sessionPropertyManager, blockEncodingManager, functionAndTypeManager, jsonCodec, jsonCodec2, jsonCodec3, executor, scheduledExecutorService, scheduledExecutorService2, localExecutionPlanner, prestoSparkExecutionExceptionFactory, taskExecutor, splitMonitor, set, fragmentResultCacheManager, ((NodeMemoryConfig) Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null")).getMaxQueryMemoryPerNode(), ((NodeMemoryConfig) Objects.requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null")).getMaxQueryTotalMemoryPerNode(), ((NodeSpillConfig) Objects.requireNonNull(nodeSpillConfig, "nodeSpillConfig is null")).getMaxSpillPerNode(), ((TaskManagerConfig) Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null")).getSinkMaxBufferSize(), ((TaskManagerConfig) Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null")).isPerOperatorCpuTimerEnabled(), ((TaskManagerConfig) Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null")).isTaskCpuTimerEnabled(), ((TaskManagerConfig) Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null")).isPerOperatorAllocationTrackingEnabled(), ((TaskManagerConfig) Objects.requireNonNull(taskManagerConfig, "taskManagerConfig is null")).isTaskAllocationTrackingEnabled());
    }

    public PrestoSparkTaskExecutorFactory(SessionPropertyManager sessionPropertyManager, BlockEncodingManager blockEncodingManager, FunctionAndTypeManager functionAndTypeManager, JsonCodec<PrestoSparkTaskDescriptor> jsonCodec, JsonCodec<TaskSource> jsonCodec2, JsonCodec<TaskInfo> jsonCodec3, Executor executor, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, LocalExecutionPlanner localExecutionPlanner, PrestoSparkExecutionExceptionFactory prestoSparkExecutionExceptionFactory, TaskExecutor taskExecutor, SplitMonitor splitMonitor, Set<PrestoSparkAuthenticatorProvider> set, FragmentResultCacheManager fragmentResultCacheManager, DataSize dataSize, DataSize dataSize2, DataSize dataSize3, DataSize dataSize4, boolean z, boolean z2, boolean z3, boolean z4) {
        this.sessionPropertyManager = (SessionPropertyManager) Objects.requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
        this.blockEncodingManager = (BlockEncodingManager) Objects.requireNonNull(blockEncodingManager, "blockEncodingManager is null");
        this.functionAndTypeManager = (FunctionAndTypeManager) Objects.requireNonNull(functionAndTypeManager, "functionManager is null");
        this.taskDescriptorJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "sparkTaskDescriptorJsonCodec is null");
        this.taskSourceJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec2, "taskSourceJsonCodec is null");
        this.taskInfoJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec3, "taskInfoJsonCodec is null");
        this.notificationExecutor = (Executor) Objects.requireNonNull(executor, "notificationExecutor is null");
        this.yieldExecutor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "yieldExecutor is null");
        this.memoryUpdateExecutor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService2, "memoryUpdateExecutor is null");
        this.localExecutionPlanner = (LocalExecutionPlanner) Objects.requireNonNull(localExecutionPlanner, "localExecutionPlanner is null");
        this.executionExceptionFactory = (PrestoSparkExecutionExceptionFactory) Objects.requireNonNull(prestoSparkExecutionExceptionFactory, "executionExceptionFactory is null");
        this.taskExecutor = (TaskExecutor) Objects.requireNonNull(taskExecutor, "taskExecutor is null");
        this.splitMonitor = (SplitMonitor) Objects.requireNonNull(splitMonitor, "splitMonitor is null");
        this.authenticatorProviders = ImmutableSet.copyOf((Collection) Objects.requireNonNull(set, "authenticatorProviders is null"));
        this.fragmentResultCacheManager = (FragmentResultCacheManager) Objects.requireNonNull(fragmentResultCacheManager, "fragmentResultCacheManager is null");
        this.maxUserMemory = (DataSize) Objects.requireNonNull(dataSize, "maxUserMemory is null");
        this.maxTotalMemory = (DataSize) Objects.requireNonNull(dataSize2, "maxTotalMemory is null");
        this.maxSpillMemory = (DataSize) Objects.requireNonNull(dataSize3, "maxSpillMemory is null");
        this.sinkMaxBufferSize = (DataSize) Objects.requireNonNull(dataSize4, "sinkMaxBufferSize is null");
        this.perOperatorCpuTimerEnabled = z;
        this.cpuTimerEnabled = z2;
        this.perOperatorAllocationTrackingEnabled = z3;
        this.allocationTrackingEnabled = z4;
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> create(int i, int i2, SerializedPrestoSparkTaskDescriptor serializedPrestoSparkTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> iterator, PrestoSparkTaskInputs prestoSparkTaskInputs, CollectionAccumulator<SerializedTaskInfo> collectionAccumulator, CollectionAccumulator<PrestoSparkShuffleStats> collectionAccumulator2, Class<T> cls) {
        try {
            return doCreate(i, i2, serializedPrestoSparkTaskDescriptor, iterator, prestoSparkTaskInputs, collectionAccumulator, collectionAccumulator2, cls);
        } catch (RuntimeException e) {
            throw this.executionExceptionFactory.toPrestoSparkExecutionException(e);
        }
    }

    public <T extends PrestoSparkTaskOutput> IPrestoSparkTaskExecutor<T> doCreate(int i, int i2, SerializedPrestoSparkTaskDescriptor serializedPrestoSparkTaskDescriptor, Iterator<SerializedPrestoSparkTaskSource> iterator, PrestoSparkTaskInputs prestoSparkTaskInputs, CollectionAccumulator<SerializedTaskInfo> collectionAccumulator, CollectionAccumulator<PrestoSparkShuffleStats> collectionAccumulator2, Class<T> cls) {
        PrestoSparkTaskDescriptor prestoSparkTaskDescriptor = (PrestoSparkTaskDescriptor) this.taskDescriptorJsonCodec.fromJson(serializedPrestoSparkTaskDescriptor.getBytes());
        ImmutableMap.Builder builder = ImmutableMap.builder();
        this.authenticatorProviders.forEach(prestoSparkAuthenticatorProvider -> {
            builder.putAll(prestoSparkAuthenticatorProvider.getTokenAuthenticators());
        });
        Session session = prestoSparkTaskDescriptor.getSession().toSession(this.sessionPropertyManager, prestoSparkTaskDescriptor.getExtraCredentials(), builder.build());
        PlanFragment fragment = prestoSparkTaskDescriptor.getFragment();
        TaskId taskId = new TaskId(new StageExecutionId(new StageId(session.getQueryId(), fragment.getId().getId()), 0), i);
        List<TaskSource> taskSources = getTaskSources(iterator);
        log.info("Task [%s] received %d splits.", new Object[]{taskId, Integer.valueOf(taskSources.stream().mapToInt(taskSource -> {
            return taskSource.getSplits().size();
        }).sum())});
        OptionalLong computeAllSplitsSize = computeAllSplitsSize(taskSources);
        if (computeAllSplitsSize.isPresent()) {
            log.info("Total split size: %s bytes.", new Object[]{Long.valueOf(computeAllSplitsSize.getAsLong())});
        }
        log.info(PlanPrinter.textPlanFragment(fragment, this.functionAndTypeManager, session, true));
        QueryContext queryContext = new QueryContext(session.getQueryId(), this.maxUserMemory, this.maxTotalMemory, this.maxUserMemory, new MemoryPool(new MemoryPoolId("spark-executor-memory-pool"), this.maxTotalMemory), new TestingGcMonitor(), this.notificationExecutor, this.yieldExecutor, this.maxSpillMemory, new SpillSpaceTracker(this.maxSpillMemory));
        TaskStateMachine taskStateMachine = new TaskStateMachine(taskId, this.notificationExecutor);
        TaskContext addTaskContext = queryContext.addTaskContext(taskStateMachine, session, this.perOperatorCpuTimerEnabled, this.cpuTimerEnabled, this.perOperatorAllocationTrackingEnabled, this.allocationTrackingEnabled, false, FragmentResultCacheContext.createFragmentResultCacheContext(this.fragmentResultCacheManager, fragment.getRoot(), fragment.getPartitioningScheme(), session));
        ImmutableMap.Builder builder2 = ImmutableMap.builder();
        ImmutableMap.Builder builder3 = ImmutableMap.builder();
        for (RemoteSourceNode remoteSourceNode : fragment.getRemoteSourceNodes()) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                Iterator iterator2 = (Iterator) prestoSparkTaskInputs.getShuffleInputs().get(planFragmentId.toString());
                Broadcast broadcast = (Broadcast) prestoSparkTaskInputs.getBroadcastInputs().get(planFragmentId.toString());
                List list = (List) prestoSparkTaskInputs.getInMemoryInputs().get(planFragmentId.toString());
                if (iterator2 != null) {
                    Preconditions.checkArgument(broadcast == null, "single remote source is not expected to accept different kind of inputs");
                    Preconditions.checkArgument(list == null, "single remote source is not expected to accept different kind of inputs");
                    arrayList.add(new PrestoSparkShuffleInput(planFragmentId.getId(), iterator2));
                } else if (broadcast != null) {
                    Preconditions.checkArgument(list == null, "single remote source is not expected to accept different kind of inputs");
                    arrayList2.add(((List) broadcast.value()).iterator());
                } else {
                    if (list == null) {
                        throw new IllegalArgumentException("Input not found for sourceFragmentId: " + planFragmentId);
                    }
                    arrayList2.add(list.iterator());
                }
            }
            if (!arrayList.isEmpty()) {
                builder2.put(remoteSourceNode.getId(), arrayList);
            }
            if (!arrayList2.isEmpty()) {
                builder3.put(remoteSourceNode.getId(), arrayList2);
            }
        }
        OutputBufferMemoryManager outputBufferMemoryManager = new OutputBufferMemoryManager(this.sinkMaxBufferSize.toBytes(), () -> {
            return queryContext.getTaskContextByTaskId(taskId).localSystemMemoryContext();
        }, this.notificationExecutor);
        Optional empty = Optional.empty();
        if (fragment.getPartitioningScheme().getPartitioning().getHandle().equals(SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION)) {
            int hashPartitionCount = SystemSessionProperties.getHashPartitionCount(session);
            empty = Optional.of(new OutputPartitioning(new PrestoSparkRowOutputOperator.PreDeterminedPartitionFunction(i % hashPartitionCount, hashPartitionCount), ImmutableList.of(), ImmutableList.of(), false, OptionalInt.empty()));
        }
        Output configureOutput = configureOutput(cls, this.blockEncodingManager, outputBufferMemoryManager, PrestoSparkSessionProperties.getShuffleOutputTargetAverageRowSize(session), empty);
        PrestoSparkOutputBuffer<?> outputBuffer = configureOutput.getOutputBuffer();
        LocalExecutionPlanner.LocalExecutionPlan plan = this.localExecutionPlanner.plan(addTaskContext, fragment.getRoot(), fragment.getPartitioningScheme(), fragment.getStageExecutionDescriptor(), fragment.getTableScanSchedulingOrder(), configureOutput.getOutputFactory(), new PrestoSparkRemoteSourceFactory(this.blockEncodingManager, builder2.build(), builder3.build(), i, collectionAccumulator2), prestoSparkTaskDescriptor.getTableWriteInfo(), true);
        taskStateMachine.addStateChangeListener(taskState -> {
            if (taskState.isDone()) {
                outputBuffer.setNoMoreRows();
            }
        });
        new PrestoSparkTaskExecution(taskStateMachine, addTaskContext, plan, this.taskExecutor, this.splitMonitor, this.notificationExecutor, this.memoryUpdateExecutor).start(taskSources);
        return new PrestoSparkTaskExecutor(addTaskContext, taskStateMachine, configureOutput.getOutputSupplier(), this.taskInfoJsonCodec, collectionAccumulator, collectionAccumulator2, this.executionExceptionFactory, configureOutput.getOutputBufferType(), outputBuffer);
    }

    private static OptionalLong computeAllSplitsSize(List<TaskSource> list) {
        long j = 0;
        java.util.Iterator<TaskSource> it = list.iterator();
        while (it.hasNext()) {
            java.util.Iterator it2 = it.next().getSplits().iterator();
            while (it2.hasNext()) {
                ConnectorSplit connectorSplit = ((ScheduledSplit) it2.next()).getSplit().getConnectorSplit();
                if (!connectorSplit.getSplitSizeInBytes().isPresent()) {
                    return OptionalLong.empty();
                }
                j += connectorSplit.getSplitSizeInBytes().getAsLong();
            }
        }
        return OptionalLong.of(j);
    }

    private List<TaskSource> getTaskSources(Iterator<SerializedPrestoSparkTaskSource> iterator) {
        ImmutableList.Builder builder = ImmutableList.builder();
        while (iterator.hasNext()) {
            builder.add(this.taskSourceJsonCodec.fromJson(PrestoSparkUtils.decompress(((SerializedPrestoSparkTaskSource) iterator.next()).getBytes())));
        }
        return builder.build();
    }

    private static <T extends PrestoSparkTaskOutput> Output<T> configureOutput(Class<T> cls, BlockEncodingManager blockEncodingManager, OutputBufferMemoryManager outputBufferMemoryManager, DataSize dataSize, Optional<OutputPartitioning> optional) {
        if (cls.equals(PrestoSparkMutableRow.class)) {
            PrestoSparkOutputBuffer prestoSparkOutputBuffer = new PrestoSparkOutputBuffer(outputBufferMemoryManager);
            return new Output<>(OutputBufferType.SPARK_ROW_OUTPUT_BUFFER, prestoSparkOutputBuffer, new PrestoSparkRowOutputOperator.PrestoSparkRowOutputFactory(prestoSparkOutputBuffer, dataSize, optional), new RowOutputSupplier(prestoSparkOutputBuffer));
        }
        if (!cls.equals(PrestoSparkSerializedPage.class)) {
            throw new IllegalArgumentException("Unexpected output type: " + cls.getName());
        }
        PrestoSparkOutputBuffer prestoSparkOutputBuffer2 = new PrestoSparkOutputBuffer(outputBufferMemoryManager);
        return new Output<>(OutputBufferType.SPARK_PAGE_OUTPUT_BUFFER, prestoSparkOutputBuffer2, new PrestoSparkPageOutputOperator.PrestoSparkPageOutputFactory(prestoSparkOutputBuffer2, blockEncodingManager), new PageOutputSupplier(prestoSparkOutputBuffer2));
    }
}
