package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.TestingSession;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MoreCollectors;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.tpch.TpchTable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.intellij.lang.annotations.Language;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/facebook/presto/hive/TestHiveRecoverableGroupedExecution.class */
public class TestHiveRecoverableGroupedExecution {
    private static final Set<StageState> SPLIT_SCHEDULING_STARTED_STATES = ImmutableSet.of(StageState.SCHEDULING_SPLITS, StageState.SCHEDULED, StageState.RUNNING, StageState.FINISHED);
    private final Session recoverableSession;
    private final DistributedQueryRunnerSupplier distributedQueryRunnerSupplier;
    private ListeningExecutorService executor;

    @FunctionalInterface
    /* loaded from: input_file:com/facebook/presto/hive/TestHiveRecoverableGroupedExecution$DistributedQueryRunnerSupplier.class */
    public interface DistributedQueryRunnerSupplier {
        DistributedQueryRunner get() throws Exception;
    }

    public TestHiveRecoverableGroupedExecution() {
        this(() -> {
            return HiveQueryRunner.createQueryRunner(ImmutableList.of(TpchTable.ORDERS), ImmutableMap.of("query.remote-task.max-error-duration", "1s"), Optional.empty());
        }, createRecoverableSession(Optional.of(new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin")))));
    }

    protected TestHiveRecoverableGroupedExecution(DistributedQueryRunnerSupplier distributedQueryRunnerSupplier, Session session) {
        this.distributedQueryRunnerSupplier = (DistributedQueryRunnerSupplier) Objects.requireNonNull(distributedQueryRunnerSupplier, "distributedQueryRunnerSupplier is null");
        this.recoverableSession = (Session) Objects.requireNonNull(session, "recoverableSession is null");
    }

    @BeforeClass
    public void setUp() {
        this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    @AfterClass(alwaysRun = true)
    public void shutdown() {
        this.executor.shutdownNow();
    }

    @Test(timeOut = 60000)
    public void testCreateBucketedTable() throws Exception {
        testRecoverableGroupedExecution(ImmutableList.of("CREATE TABLE test_table1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE test_table2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE test_table3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders"), "CREATE TABLE test_success WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM test_table1\nJOIN test_table2\nON key1 = key2\nJOIN test_table3\nON key2 = key3", "CREATE TABLE test_failure WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM test_table1\nJOIN test_table2\nON key1 = key2\nJOIN test_table3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS test_table1", "DROP TABLE IF EXISTS test_table2", "DROP TABLE IF EXISTS test_table3", "DROP TABLE IF EXISTS test_success", "DROP TABLE IF EXISTS test_failure"));
    }

    @Test(timeOut = 60000)
    public void testInsertBucketedTable() throws Exception {
        testRecoverableGroupedExecution(ImmutableList.of("CREATE TABLE test_table1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE test_table2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE test_table3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders", "CREATE TABLE test_success (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])", "CREATE TABLE test_failure (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])"), "INSERT INTO test_success\nSELECT key1, value1, 'foo'\nFROM test_table1\nJOIN test_table2\nON key1 = key2\nJOIN test_table3\nON key2 = key3", "INSERT INTO test_failure\nSELECT key1, value1, 'foo'\nFROM test_table1\nJOIN test_table2\nON key1 = key2\nJOIN test_table3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS test_table1", "DROP TABLE IF EXISTS test_table2", "DROP TABLE IF EXISTS test_table3", "DROP TABLE IF EXISTS test_success", "DROP TABLE IF EXISTS test_failure"));
    }

    private void testRecoverableGroupedExecution(List<String> list, @Language("SQL") String str, @Language("SQL") String str2, int i, List<String> list2) throws Exception {
        DistributedQueryRunner distributedQueryRunner = this.distributedQueryRunnerSupplier.get();
        Throwable th = null;
        try {
            try {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    distributedQueryRunner.execute(this.recoverableSession, it.next());
                }
                Assert.assertEquals(distributedQueryRunner.execute(this.recoverableSession, str).getUpdateCount(), OptionalLong.of(i));
                ListenableFuture submit = this.executor.submit(() -> {
                    return distributedQueryRunner.execute(this.recoverableSession, str2);
                });
                while (!isSplitSchedulingStarted(distributedQueryRunner)) {
                    Thread.sleep(10L);
                }
                Thread.sleep(300L);
                Assert.assertTrue(getRunningQueryId(distributedQueryRunner.getQueries()).isPresent());
                ((TestingPrestoServer) distributedQueryRunner.getServers().stream().filter(testingPrestoServer -> {
                    return !testingPrestoServer.isCoordinator();
                }).findFirst().get()).close();
                Assert.assertEquals(((MaterializedResult) submit.get(30L, TimeUnit.SECONDS)).getUpdateCount(), OptionalLong.of(i));
                Iterator<String> it2 = list2.iterator();
                while (it2.hasNext()) {
                    distributedQueryRunner.execute(this.recoverableSession, it2.next());
                }
                if (distributedQueryRunner != null) {
                    if (0 == 0) {
                        distributedQueryRunner.close();
                        return;
                    }
                    try {
                        distributedQueryRunner.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (distributedQueryRunner != null) {
                    if (0 != 0) {
                        try {
                            distributedQueryRunner.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        distributedQueryRunner.close();
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            Iterator<String> it3 = list2.iterator();
            while (it3.hasNext()) {
                distributedQueryRunner.execute(this.recoverableSession, it3.next());
            }
            throw th5;
        }
    }

    private boolean isSplitSchedulingStarted(DistributedQueryRunner distributedQueryRunner) {
        Optional<QueryId> runningQueryId = getRunningQueryId(distributedQueryRunner.getQueries());
        if (!runningQueryId.isPresent()) {
            return false;
        }
        Stream map = ((StageInfo) distributedQueryRunner.getQueryInfo(runningQueryId.get()).getOutputStage().get()).getSubStages().stream().map((v0) -> {
            return v0.getState();
        });
        Set<StageState> set = SPLIT_SCHEDULING_STARTED_STATES;
        set.getClass();
        return map.allMatch((v1) -> {
            return r1.contains(v1);
        });
    }

    private static Optional<QueryId> getRunningQueryId(List<BasicQueryInfo> list) {
        return (Optional) list.stream().filter(basicQueryInfo -> {
            return basicQueryInfo.getState() == QueryState.RUNNING;
        }).map((v0) -> {
            return v0.getQueryId();
        }).collect(MoreCollectors.toOptional());
    }

    private static Session createRecoverableSession(Optional<SelectedRole> optional) {
        return TestingSession.testSessionBuilder().setIdentity(new Identity(HiveQueryRunner.HIVE_CATALOG, Optional.empty(), (Map) optional.map(selectedRole -> {
            return ImmutableMap.of(HiveQueryRunner.HIVE_CATALOG, selectedRole);
        }).orElse(ImmutableMap.of()))).setSystemProperty("colocated_join", "true").setSystemProperty("grouped_execution_for_aggregation", "true").setSystemProperty("dynamic_schedule_for_grouped_execution", "true").setSystemProperty("concurrent_lifespans_per_task", "1").setSystemProperty("recoverable_grouped_execution", "true").setCatalog(HiveQueryRunner.HIVE_CATALOG).setSchema(HiveQueryRunner.TPCH_BUCKETED_SCHEMA).build();
    }
}
