package com.facebook.presto.execution.scheduler;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.MockRemoteTaskFactory;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.TestSqlTaskManager;
import com.facebook.presto.failureDetector.NoOpFailureDetector;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.PrestoNode;
import com.facebook.presto.metadata.TableHandle;
import com.facebook.presto.operator.StageExecutionStrategy;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.connector.NotPartitionedPartitionHandle;
import com.facebook.presto.spi.type.VarcharType;
import com.facebook.presto.split.ConnectorAwareSplitSource;
import com.facebook.presto.split.SplitSource;
import com.facebook.presto.sql.planner.Partitioning;
import com.facebook.presto.sql.planner.PartitioningScheme;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.StageExecutionPlan;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.TableScanNode;
import com.facebook.presto.testing.TestingMetadata;
import com.facebook.presto.testing.TestingSplit;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.facebook.presto.util.FinalizerService;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.airlift.concurrent.Threads;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.class */
public class TestSourcePartitionedScheduler {
    public static final OutputBuffers.OutputBufferId OUT = new OutputBuffers.OutputBufferId(0);
    private static final ConnectorId CONNECTOR_ID = new ConnectorId("connector_id");
    private final ExecutorService queryExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("stageExecutor-%s"));
    private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed("stageScheduledExecutor-%s"));
    private final LocationFactory locationFactory = new TestSqlTaskManager.MockLocationFactory();
    private final InMemoryNodeManager nodeManager = new InMemoryNodeManager();
    private final FinalizerService finalizerService = new FinalizerService();

    /* loaded from: input_file:com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler$QueuedSplitSource.class */
    private static class QueuedSplitSource implements ConnectorSplitSource {
        private final Supplier<ConnectorSplit> splitFactory;
        private final LinkedBlockingQueue<ConnectorSplit> queue = new LinkedBlockingQueue<>();
        private CompletableFuture<?> notEmptyFuture = new CompletableFuture<>();
        private boolean closed;

        public QueuedSplitSource(Supplier<ConnectorSplit> supplier) {
            this.splitFactory = (Supplier) Objects.requireNonNull(supplier, "splitFactory is null");
        }

        /* JADX WARN: Multi-variable type inference failed */
        synchronized void addSplits(int i) {
            if (this.closed) {
                return;
            }
            for (int i2 = 0; i2 < i; i2++) {
                this.queue.add(this.splitFactory.get());
                this.notEmptyFuture.complete(null);
            }
        }

        public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle connectorPartitionHandle, int i) {
            Preconditions.checkArgument(connectorPartitionHandle.equals(NotPartitionedPartitionHandle.NOT_PARTITIONED), "partitionHandle must be NOT_PARTITIONED");
            return this.notEmptyFuture.thenApply(obj -> {
                return getBatch(i);
            }).thenApply((Function<? super U, ? extends U>) list -> {
                return new ConnectorSplitSource.ConnectorSplitBatch(list, isFinished());
            });
        }

        private synchronized List<ConnectorSplit> getBatch(int i) {
            ArrayList arrayList = new ArrayList(i);
            this.queue.drainTo(arrayList, i);
            if (this.queue.isEmpty() && !this.closed && this.notEmptyFuture.isDone()) {
                this.notEmptyFuture = new CompletableFuture<>();
            }
            return ImmutableList.copyOf(arrayList);
        }

        public synchronized boolean isFinished() {
            return this.closed && this.queue.isEmpty();
        }

        public synchronized void close() {
            this.closed = true;
        }
    }

    public TestSourcePartitionedScheduler() {
        this.nodeManager.addNode(CONNECTOR_ID, new Node[]{new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
    }

    @BeforeClass
    public void setUp() {
        this.finalizerService.start();
    }

    @AfterClass(alwaysRun = true)
    public void destroyExecutor() {
        this.queryExecutor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
        this.finalizerService.destroy();
    }

    @Test
    public void testScheduleNoSplits() {
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(0, TestingSplit::createRemoteSplit));
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        ScheduleResult schedule = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 1).schedule();
        Assert.assertTrue(schedule.isFinished());
        Assert.assertTrue(schedule.getBlocked().isDone());
        Assert.assertEquals(schedule.getNewTasks().size(), 1);
        createSqlStageExecution.abort();
    }

    @Test
    public void testScheduleSplitsOneAtATime() {
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(60, TestingSplit::createRemoteSplit));
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        StageScheduler sourcePartitionedScheduler = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 1);
        int i = 0;
        while (i < 60) {
            ScheduleResult schedule = sourcePartitionedScheduler.schedule();
            Assert.assertEquals(schedule.isFinished(), i == 59);
            Assert.assertTrue(schedule.getBlocked().isDone());
            Assert.assertEquals(schedule.getNewTasks().size(), i < 3 ? 1 : 0);
            Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), i < 3 ? i + 1 : 3);
            assertPartitionedSplitCount(createSqlStageExecution, Integer.min(i + 1, 60));
            i++;
        }
        Iterator it = createSqlStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((RemoteTask) it.next()).getPartitionedSplitCount(), 20);
        }
        createSqlStageExecution.abort();
    }

    @Test
    public void testScheduleSplitsBatched() {
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(60, TestingSplit::createRemoteSplit));
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        StageScheduler sourcePartitionedScheduler = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 7);
        int i = 0;
        while (i <= 8) {
            ScheduleResult schedule = sourcePartitionedScheduler.schedule();
            Assert.assertEquals(schedule.isFinished(), i == 8);
            Assert.assertTrue(schedule.getBlocked().isDone());
            Assert.assertEquals(schedule.getNewTasks().size(), i == 0 ? 3 : 0);
            Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), 3);
            assertPartitionedSplitCount(createSqlStageExecution, Integer.min((i + 1) * 7, 60));
            i++;
        }
        Iterator it = createSqlStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((RemoteTask) it.next()).getPartitionedSplitCount(), 20);
        }
        createSqlStageExecution.abort();
    }

    @Test
    public void testScheduleSplitsBlock() {
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(80, TestingSplit::createRemoteSplit));
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        StageScheduler sourcePartitionedScheduler = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 1);
        int i = 0;
        while (i <= 60) {
            ScheduleResult schedule = sourcePartitionedScheduler.schedule();
            Assert.assertFalse(schedule.isFinished());
            Assert.assertEquals(schedule.getBlocked().isDone(), i != 60);
            Assert.assertEquals(schedule.getNewTasks().size(), i < 3 ? 1 : 0);
            Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), i < 3 ? i + 1 : 3);
            assertPartitionedSplitCount(createSqlStageExecution, Integer.min(i + 1, 60));
            i++;
        }
        Iterator it = createSqlStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((RemoteTask) it.next()).getPartitionedSplitCount(), 20);
        }
        ((MockRemoteTaskFactory.MockRemoteTask) createSqlStageExecution.getAllTasks().get(0)).clearSplits();
        int i2 = 0;
        while (i2 < 20) {
            ScheduleResult schedule2 = sourcePartitionedScheduler.schedule();
            Assert.assertEquals(schedule2.isFinished(), i2 == 19);
            Assert.assertTrue(schedule2.getBlocked().isDone());
            Assert.assertEquals(schedule2.getNewTasks().size(), 0);
            Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), 3);
            assertPartitionedSplitCount(createSqlStageExecution, Integer.min(i2 + 41, 60));
            i2++;
        }
        Iterator it2 = createSqlStageExecution.getAllTasks().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(((RemoteTask) it2.next()).getPartitionedSplitCount(), 20);
        }
        createSqlStageExecution.abort();
    }

    @Test
    public void testScheduleSlowSplitSource() {
        QueuedSplitSource queuedSplitSource = new QueuedSplitSource(TestingSplit::createRemoteSplit);
        StageExecutionPlan createPlan = createPlan(queuedSplitSource);
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        ScheduleResult schedule = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 1).schedule();
        Assert.assertFalse(schedule.isFinished());
        Assert.assertFalse(schedule.getBlocked().isDone());
        Assert.assertEquals(schedule.getNewTasks().size(), 3);
        Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), 3);
        queuedSplitSource.addSplits(1);
        Assert.assertTrue(schedule.getBlocked().isDone());
    }

    @Test
    public void testNoNodes() {
        try {
            NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
            NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), new InMemoryNodeManager(), new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap);
            StageExecutionPlan createPlan = createPlan(createFixedSplitSource(20, TestingSplit::createRemoteSplit));
            SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
            PlanNodeId planNodeId = (PlanNodeId) Iterables.getOnlyElement(createPlan.getSplitSources().keySet());
            SplitSource splitSource = (SplitSource) Iterables.getOnlyElement(createPlan.getSplitSources().values());
            NodeSelector createNodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID);
            createSqlStageExecution.getClass();
            SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler(createSqlStageExecution, planNodeId, splitSource, new DynamicSplitPlacementPolicy(createNodeSelector, createSqlStageExecution::getAllTasks), 2).schedule();
            Assert.fail("expected PrestoException");
        } catch (PrestoException e) {
            Assert.assertEquals(e.getErrorCode(), StandardErrorCode.NO_NODES_AVAILABLE.toErrorCode());
        }
    }

    @Test
    public void testBalancedSplitAssignment() {
        InMemoryNodeManager inMemoryNodeManager = new InMemoryNodeManager();
        inMemoryNodeManager.addNode(CONNECTOR_ID, new Node[]{new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN, false), new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN, false), new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)});
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(15, TestingSplit::createRemoteSplit));
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        ScheduleResult schedule = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, inMemoryNodeManager, nodeTaskMap, 200).schedule();
        Assert.assertTrue(schedule.isFinished());
        Assert.assertTrue(schedule.getBlocked().isDone());
        Assert.assertEquals(schedule.getNewTasks().size(), 3);
        Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), 3);
        Iterator it = createSqlStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((RemoteTask) it.next()).getPartitionedSplitCount(), 5);
        }
        inMemoryNodeManager.addNode(CONNECTOR_ID, new Node[]{new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN, false)});
        StageExecutionPlan createPlan2 = createPlan(createFixedSplitSource(5, TestingSplit::createRemoteSplit));
        SqlStageExecution createSqlStageExecution2 = createSqlStageExecution(createPlan2, nodeTaskMap);
        ScheduleResult schedule2 = getSourcePartitionedScheduler(createPlan2, createSqlStageExecution2, inMemoryNodeManager, nodeTaskMap, 200).schedule();
        Assert.assertTrue(schedule2.isFinished());
        Assert.assertTrue(schedule2.getBlocked().isDone());
        Assert.assertEquals(schedule2.getNewTasks().size(), 1);
        Assert.assertEquals(createSqlStageExecution2.getAllTasks().size(), 1);
        Assert.assertEquals(((RemoteTask) createSqlStageExecution2.getAllTasks().get(0)).getPartitionedSplitCount(), 5);
        createSqlStageExecution.abort();
        createSqlStageExecution2.abort();
    }

    @Test
    public void testBlockCausesFullSchedule() {
        NodeTaskMap nodeTaskMap = new NodeTaskMap(this.finalizerService);
        StageExecutionPlan createPlan = createPlan(createFixedSplitSource(60, TestingSplit::createRemoteSplit));
        SqlStageExecution createSqlStageExecution = createSqlStageExecution(createPlan, nodeTaskMap);
        ScheduleResult schedule = getSourcePartitionedScheduler(createPlan, createSqlStageExecution, this.nodeManager, nodeTaskMap, 200).schedule();
        Assert.assertTrue(schedule.isFinished());
        Assert.assertTrue(schedule.getBlocked().isDone());
        Assert.assertEquals(schedule.getNewTasks().size(), 3);
        Assert.assertEquals(createSqlStageExecution.getAllTasks().size(), 3);
        Iterator it = createSqlStageExecution.getAllTasks().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((RemoteTask) it.next()).getPartitionedSplitCount(), 20);
        }
        StageExecutionPlan createPlan2 = createPlan(createFixedSplitSource(5, TestingSplit::createRemoteSplit));
        SqlStageExecution createSqlStageExecution2 = createSqlStageExecution(createPlan2, nodeTaskMap);
        ScheduleResult schedule2 = getSourcePartitionedScheduler(createPlan2, createSqlStageExecution2, this.nodeManager, nodeTaskMap, 200).schedule();
        Assert.assertFalse(schedule2.isFinished());
        Assert.assertTrue(schedule2.getBlocked().isDone());
        Assert.assertEquals(schedule2.getNewTasks().size(), 3);
        Assert.assertEquals(createSqlStageExecution2.getAllTasks().size(), 3);
        Iterator it2 = createSqlStageExecution2.getAllTasks().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(((RemoteTask) it2.next()).getPartitionedSplitCount(), 0);
        }
        createSqlStageExecution.abort();
        createSqlStageExecution2.abort();
    }

    private static void assertPartitionedSplitCount(SqlStageExecution sqlStageExecution, int i) {
        Assert.assertEquals(sqlStageExecution.getAllTasks().stream().mapToInt((v0) -> {
            return v0.getPartitionedSplitCount();
        }).sum(), i);
    }

    private static StageScheduler getSourcePartitionedScheduler(StageExecutionPlan stageExecutionPlan, SqlStageExecution sqlStageExecution, InternalNodeManager internalNodeManager, NodeTaskMap nodeTaskMap, int i) {
        NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), internalNodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false).setMaxSplitsPerNode(20).setMaxPendingSplitsPerTask(0), nodeTaskMap);
        PlanNodeId planNodeId = (PlanNodeId) Iterables.getOnlyElement(stageExecutionPlan.getSplitSources().keySet());
        SplitSource splitSource = (SplitSource) Iterables.getOnlyElement(stageExecutionPlan.getSplitSources().values());
        NodeSelector createNodeSelector = nodeScheduler.createNodeSelector(splitSource.getConnectorId());
        sqlStageExecution.getClass();
        return SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler(sqlStageExecution, planNodeId, splitSource, new DynamicSplitPlacementPolicy(createNodeSelector, sqlStageExecution::getAllTasks), i);
    }

    private static StageExecutionPlan createPlan(ConnectorSplitSource connectorSplitSource) {
        Symbol symbol = new Symbol("column");
        PlanNodeId planNodeId = new PlanNodeId("plan_id");
        TableScanNode tableScanNode = new TableScanNode(planNodeId, new TableHandle(CONNECTOR_ID, new TestingMetadata.TestingTableHandle()), ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")));
        RemoteSourceNode remoteSourceNode = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(), Optional.empty(), ExchangeNode.Type.GATHER);
        return new StageExecutionPlan(new PlanFragment(new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), JoinNode.Type.INNER, tableScanNode, remoteSourceNode, ImmutableList.of(), ImmutableList.builder().addAll(tableScanNode.getOutputSymbols()).addAll(remoteSourceNode.getOutputSymbols()).build(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()), ImmutableMap.of(symbol, VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, ImmutableList.of(planNodeId), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)), StageExecutionStrategy.ungroupedExecution(), StatsAndCosts.empty()), ImmutableMap.of(planNodeId, new ConnectorAwareSplitSource(CONNECTOR_ID, TestingTransactionHandle.create(), connectorSplitSource)), ImmutableList.of());
    }

    private static ConnectorSplitSource createFixedSplitSource(int i, Supplier<ConnectorSplit> supplier) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i2 = 0; i2 < i; i2++) {
            builder.add(supplier.get());
        }
        return new FixedSplitSource(builder.build());
    }

    private SqlStageExecution createSqlStageExecution(StageExecutionPlan stageExecutionPlan, NodeTaskMap nodeTaskMap) {
        StageId stageId = new StageId(new QueryId("query"), 0);
        SqlStageExecution sqlStageExecution = new SqlStageExecution(stageId, this.locationFactory.createStageLocation(stageId), stageExecutionPlan.getFragment(), new MockRemoteTaskFactory(this.queryExecutor, this.scheduledExecutor), SessionTestUtils.TEST_SESSION, true, nodeTaskMap, this.queryExecutor, new NoOpFailureDetector(), new SplitSchedulerStats());
        sqlStageExecution.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());
        return sqlStageExecution;
    }
}
