package com.facebook.presto.spark.execution;

import com.facebook.airlift.json.Codec;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.cost.FragmentStatsProvider;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateTimer;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spark.ErrorClassifier;
import com.facebook.presto.spark.PrestoSparkMetadataStorage;
import com.facebook.presto.spark.PrestoSparkQueryData;
import com.facebook.presto.spark.PrestoSparkQueryExecutionFactory;
import com.facebook.presto.spark.PrestoSparkQueryStatusInfo;
import com.facebook.presto.spark.PrestoSparkServiceWaitTimeMetrics;
import com.facebook.presto.spark.PrestoSparkTaskDescriptor;
import com.facebook.presto.spark.classloader_interface.MutablePartitionId;
import com.facebook.presto.spark.classloader_interface.PrestoSparkSerializedPage;
import com.facebook.presto.spark.classloader_interface.PrestoSparkShuffleStats;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskExecutorFactoryProvider;
import com.facebook.presto.spark.classloader_interface.SerializedTaskInfo;
import com.facebook.presto.spark.execution.task.PrestoSparkTaskExecutorFactory;
import com.facebook.presto.spark.node.PrestoSparkNodePartitioningManager;
import com.facebook.presto.spark.planner.IterativePlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkPlanFragmenter;
import com.facebook.presto.spark.planner.PrestoSparkQueryPlanner;
import com.facebook.presto.spark.planner.PrestoSparkRddFactory;
import com.facebook.presto.spark.planner.optimizers.AdaptivePlanOptimizers;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragmenterUtils;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.sql.planner.optimizations.PlanOptimizer;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.planPrinter.PlanPrinter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.units.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.spark.MapOutputStatistics;
import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.ThreadUtils;
import scala.Function1;
import scala.Tuple2;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.impl.ExecutionContextImpl;
import scala.runtime.AbstractFunction1;
import scala.util.Try;

/* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.class */
public class PrestoSparkAdaptiveQueryExecution extends AbstractPrestoSparkQueryExecution {
    private static final Logger log = Logger.get(PrestoSparkAdaptiveQueryExecution.class);
    private final IterativePlanFragmenter iterativePlanFragmenter;
    private final List<PlanOptimizer> adaptivePlanOptimizers;
    private final VariableAllocator variableAllocator;
    private final PlanNodeIdAllocator idAllocator;
    private final FragmentStatsProvider fragmentStatsProvider;
    private final Set<PlanFragmentId> executedFragments;
    private final BlockingQueue<FragmentCompletionEvent> fragmentEventQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution$FragmentCompletionEvent.class */
    public class FragmentCompletionEvent {
        protected final PlanFragmentId fragmentId;

        private FragmentCompletionEvent(PlanFragmentId planFragmentId) {
            this.fragmentId = planFragmentId;
        }

        public PlanFragmentId getFragmentId() {
            return this.fragmentId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution$FragmentCompletionFailureEvent.class */
    public class FragmentCompletionFailureEvent extends FragmentCompletionEvent {
        private Throwable executionError;

        private FragmentCompletionFailureEvent(PlanFragmentId planFragmentId, Throwable th) {
            super(planFragmentId);
            this.executionError = th;
        }

        public Throwable getExecutionError() {
            return this.executionError;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution$FragmentCompletionSuccessEvent.class */
    public class FragmentCompletionSuccessEvent extends FragmentCompletionEvent {
        private Optional<MapOutputStatistics> mapOutputStats;

        private FragmentCompletionSuccessEvent(PlanFragmentId planFragmentId, Optional<MapOutputStatistics> optional) {
            super(planFragmentId);
            this.mapOutputStats = optional;
        }

        public Optional<MapOutputStatistics> getMapOutputStats() {
            return this.mapOutputStats;
        }
    }

    public PrestoSparkAdaptiveQueryExecution(JavaSparkContext javaSparkContext, Session session, QueryMonitor queryMonitor, CollectionAccumulator<SerializedTaskInfo> collectionAccumulator, CollectionAccumulator<PrestoSparkShuffleStats> collectionAccumulator2, PrestoSparkTaskExecutorFactory prestoSparkTaskExecutorFactory, PrestoSparkTaskExecutorFactoryProvider prestoSparkTaskExecutorFactoryProvider, QueryStateTimer queryStateTimer, WarningCollector warningCollector, String str, PrestoSparkQueryPlanner.PlanAndMore planAndMore, Optional<String> optional, Codec<TaskInfo> codec, JsonCodec<PrestoSparkTaskDescriptor> jsonCodec, JsonCodec<PrestoSparkQueryStatusInfo> jsonCodec2, JsonCodec<PrestoSparkQueryData> jsonCodec3, PrestoSparkRddFactory prestoSparkRddFactory, TransactionManager transactionManager, PagesSerde pagesSerde, PrestoSparkExecutionExceptionFactory prestoSparkExecutionExceptionFactory, Duration duration, long j, PrestoSparkMetadataStorage prestoSparkMetadataStorage, Optional<String> optional2, Optional<String> optional3, TempStorage tempStorage, NodeMemoryConfig nodeMemoryConfig, FeaturesConfig featuresConfig, QueryManagerConfig queryManagerConfig, Set<PrestoSparkServiceWaitTimeMetrics> set, Optional<ErrorClassifier> optional4, PrestoSparkPlanFragmenter prestoSparkPlanFragmenter, Metadata metadata, PartitioningProviderManager partitioningProviderManager, HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker, AdaptivePlanOptimizers adaptivePlanOptimizers, VariableAllocator variableAllocator, PlanNodeIdAllocator planNodeIdAllocator, FragmentStatsProvider fragmentStatsProvider, Optional<CollectionAccumulator<Map<String, Long>>> optional5) {
        super(javaSparkContext, session, queryMonitor, collectionAccumulator, collectionAccumulator2, prestoSparkTaskExecutorFactory, prestoSparkTaskExecutorFactoryProvider, queryStateTimer, warningCollector, str, planAndMore, optional, codec, jsonCodec, jsonCodec2, jsonCodec3, prestoSparkRddFactory, transactionManager, pagesSerde, prestoSparkExecutionExceptionFactory, duration, j, prestoSparkMetadataStorage, optional2, optional3, tempStorage, nodeMemoryConfig, featuresConfig, queryManagerConfig, set, optional4, prestoSparkPlanFragmenter, metadata, partitioningProviderManager, historyBasedPlanStatisticsTracker, optional5);
        this.executedFragments = ConcurrentHashMap.newKeySet();
        this.fragmentEventQueue = new LinkedBlockingQueue();
        this.fragmentStatsProvider = (FragmentStatsProvider) Objects.requireNonNull(fragmentStatsProvider, "fragmentStatsProvider is null");
        this.adaptivePlanOptimizers = ((AdaptivePlanOptimizers) Objects.requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null")).getAdaptiveOptimizers();
        this.variableAllocator = (VariableAllocator) Objects.requireNonNull(variableAllocator, "variableAllocator is null");
        this.idAllocator = (PlanNodeIdAllocator) Objects.requireNonNull(planNodeIdAllocator, "idAllocator is null");
        this.iterativePlanFragmenter = createIterativePlanFragmenter();
    }

    private IterativePlanFragmenter createIterativePlanFragmenter() {
        Set<PlanFragmentId> set = this.executedFragments;
        set.getClass();
        return new IterativePlanFragmenter(this.planAndMore.getPlan(), (v1) -> {
            return r0.contains(v1);
        }, this.metadata, new PlanChecker(this.featuresConfig, false), new SqlParser(), this.idAllocator, new PrestoSparkNodePartitioningManager(this.partitioningProviderManager), this.queryManagerConfig, this.session, this.warningCollector, false);
    }

    @Override // com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution
    protected List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> doExecute() throws SparkException, TimeoutException {
        this.queryStateTimer.beginRunning();
        log.info("Using AdaptiveQueryExecutor");
        log.info(String.format("Logical plan : %s", PlanPrinter.textLogicalPlan(this.planAndMore.getPlan().getRoot(), this.planAndMore.getPlan().getTypes(), this.planAndMore.getPlan().getStatsAndCosts(), this.metadata.getFunctionAndTypeManager(), this.session, 0)));
        this.queryMonitor.queryUpdatedEvent(PrestoSparkQueryExecutionFactory.createQueryInfo(this.session, this.query, QueryState.PLANNING, Optional.of(this.planAndMore), this.sparkQueueName, Optional.empty(), this.queryStateTimer, Optional.of(PrestoSparkQueryExecutionFactory.createStageInfo(this.session.getQueryId(), this.planFragmenter.fragmentQueryPlan(this.session, this.planAndMore.getPlan(), this.warningCollector), (List<TaskInfo>) ImmutableList.of())), this.warningCollector));
        IterativePlanFragmenter.PlanAndFragments createReadySubPlans = this.iterativePlanFragmenter.createReadySubPlans(this.planAndMore.getPlan().getRoot());
        ExecutionContextExecutorService fromExecutorService = !createReadySubPlans.hasRemainingPlan() ? null : ExecutionContextImpl.fromExecutorService(ThreadUtils.newDaemonCachedThreadPool("AdaptiveExecution", 16, 60), (Function1) null);
        TableWriteInfo tableWriteInfo = getTableWriteInfo(this.session, this.planAndMore.getPlan().getRoot());
        while (createReadySubPlans.hasRemainingPlan()) {
            List<SubPlan> readyFragments = createReadySubPlans.getReadyFragments();
            Set<PlanFragmentId> rootChildNodeFragmentIDs = getRootChildNodeFragmentIDs(createReadySubPlans.getRemainingPlan().get());
            for (SubPlan subPlan : readyFragments) {
                log.info(String.format("Executing fragment : %s", PlanPrinter.textPlanFragment(subPlan.getFragment(), this.metadata.getFunctionAndTypeManager(), this.session, true)));
                Optional<Class<?>> empty = Optional.empty();
                if (isCoordinatorOnly(this.planAndMore.getPlan()) && rootChildNodeFragmentIDs.contains(subPlan.getFragment().getId())) {
                    empty = Optional.of(PrestoSparkSerializedPage.class);
                }
                final SubPlan configureOutputPartitioning = configureOutputPartitioning(this.session, subPlan, this.planAndMore.getPhysicalResourceSettings().getHashPartitionCount());
                Optional<SimpleFutureAction<MapOutputStatistics>> mapOutputStatisticsFutureAction = executeFragment(configureOutputPartitioning, tableWriteInfo, empty).getMapOutputStatisticsFutureAction();
                if (mapOutputStatisticsFutureAction.isPresent()) {
                    mapOutputStatisticsFutureAction.get().onComplete(new AbstractFunction1<Try<MapOutputStatistics>, Void>() { // from class: com.facebook.presto.spark.execution.PrestoSparkAdaptiveQueryExecution.1
                        public Void apply(Try<MapOutputStatistics> r9) {
                            if (r9.isSuccess()) {
                                PrestoSparkAdaptiveQueryExecution.this.publishFragmentCompletionEvent(new FragmentCompletionSuccessEvent(configureOutputPartitioning.getFragment().getId(), Optional.ofNullable(r9.get())));
                                return null;
                            }
                            PrestoSparkAdaptiveQueryExecution.this.publishFragmentCompletionEvent(new FragmentCompletionFailureEvent(configureOutputPartitioning.getFragment().getId(), (Throwable) r9.failed().get()));
                            return null;
                        }
                    }, fromExecutorService);
                } else {
                    log.info("Fragment %s will not get executed now either because there was no exchange involved (a broadcast is present) or because of an unknown issue.", new Object[]{subPlan.getFragment().getId()});
                    publishFragmentCompletionEvent(new FragmentCompletionSuccessEvent(configureOutputPartitioning.getFragment().getId(), Optional.empty()));
                }
            }
            try {
                FragmentCompletionEvent poll = this.fragmentEventQueue.poll(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS);
                if (poll == null) {
                    throw this.executionExceptionFactory.toPrestoSparkExecutionException(new RuntimeException("Adaptive query execution failed due to timeout."));
                }
                if (poll instanceof FragmentCompletionFailureEvent) {
                    FragmentCompletionFailureEvent fragmentCompletionFailureEvent = (FragmentCompletionFailureEvent) poll;
                    Throwables.propagateIfPossible(fragmentCompletionFailureEvent.getExecutionError(), SparkException.class);
                    Throwables.propagateIfPossible(fragmentCompletionFailureEvent.getExecutionError(), RuntimeException.class);
                    throw new UncheckedExecutionException(fragmentCompletionFailureEvent.getExecutionError());
                }
                Verify.verify(poll instanceof FragmentCompletionSuccessEvent, String.format("Unexpected FragmentCompletionEvent type: %s", poll.getClass().getSimpleName()), new Object[0]);
                FragmentCompletionSuccessEvent fragmentCompletionSuccessEvent = (FragmentCompletionSuccessEvent) poll;
                this.executedFragments.add(fragmentCompletionSuccessEvent.getFragmentId());
                RuntimeStatistics.createRuntimeStats(fragmentCompletionSuccessEvent.getMapOutputStats()).ifPresent(planNodeStatsEstimate -> {
                    this.fragmentStatsProvider.putStats(this.session.getQueryId(), fragmentCompletionSuccessEvent.getFragmentId(), planNodeStatsEstimate);
                });
                PlanNode planNode = createReadySubPlans.getRemainingPlan().get();
                Iterator<PlanOptimizer> it = this.adaptivePlanOptimizers.iterator();
                while (it.hasNext()) {
                    planNode = it.next().optimize(planNode, this.session, TypeProvider.viewOf(this.variableAllocator.getVariables()), this.variableAllocator, this.idAllocator, this.warningCollector).getPlanNode();
                }
                if (!planNode.equals(createReadySubPlans.getRemainingPlan().get())) {
                    log.info("adaptive plan optimizations triggered");
                }
                createReadySubPlans = this.iterativePlanFragmenter.createReadySubPlans(planNode);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        Verify.verify(createReadySubPlans.getReadyFragments().size() == 1, "The last step of the adaptive execution is expected to have a single fragment remaining.", new Object[0]);
        SubPlan subPlan2 = createReadySubPlans.getReadyFragments().get(0);
        setFinalFragmentedPlan(subPlan2);
        return executeFinalFragment(subPlan2, tableWriteInfo);
    }

    private static Set<PlanFragmentId> getRootChildNodeFragmentIDs(PlanNode planNode) {
        return (Set) PlanNodeSearcher.searchFrom(planNode).recurseOnlyWhen(planNode2 -> {
            return ((planNode2 instanceof ExchangeNode) && ((ExchangeNode) planNode2).getScope() == ExchangeNode.Scope.REMOTE_STREAMING) ? false : true;
        }).where(planNode3 -> {
            return planNode3 instanceof RemoteSourceNode;
        }).findAll().stream().map(planNode4 -> {
            return ((RemoteSourceNode) planNode4).getSourceFragmentIds();
        }).flatMap(list -> {
            return list.stream();
        }).collect(Collectors.toSet());
    }

    private boolean isCoordinatorOnly(Plan plan) {
        if (plan.getRoot() instanceof OutputNode) {
            return PlanFragmenterUtils.isCoordinatorOnlyDistribution(plan.getRoot().getSource());
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishFragmentCompletionEvent(FragmentCompletionEvent fragmentCompletionEvent) {
        try {
            this.fragmentEventQueue.put(fragmentCompletionEvent);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> executeFinalFragment(SubPlan subPlan, TableWriteInfo tableWriteInfo) throws SparkException, TimeoutException {
        if (!subPlan.getFragment().getPartitioning().equals(SystemPartitioningHandle.COORDINATOR_DISTRIBUTION)) {
            return createRddForSubPlan(subPlan, tableWriteInfo, Optional.of(PrestoSparkSerializedPage.class)).collectAndDestroyDependenciesWithTimeout(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS, this.waitTimeMetrics);
        }
        HashMap hashMap = new HashMap();
        for (SubPlan subPlan2 : subPlan.getChildren()) {
            hashMap.put(subPlan2.getFragment().getId(), getRdd(subPlan2.getFragment().getId()).get());
        }
        return collectPages(tableWriteInfo, subPlan.getFragment(), hashMap);
    }
}
