package com.facebook.presto.hive;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.TestingSession;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.tpch.TpchTable;
import io.airlift.units.Duration;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.intellij.lang.annotations.Language;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/facebook/presto/hive/TestHiveRecoverableExecution.class */
public class TestHiveRecoverableExecution {
    private static final Logger log = Logger.get(TestHiveRecoverableExecution.class);
    private static final int TEST_TIMEOUT = 120000;
    private static final int INVOCATION_COUNT = 1;
    private DistributedQueryRunner queryRunner;
    private ListeningExecutorService executor;

    @BeforeClass
    public void setUp() throws Exception {
        this.queryRunner = createQueryRunner();
        this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    private DistributedQueryRunner createQueryRunner() throws Exception {
        ImmutableMap.Builder put = ImmutableMap.builder().put("exchange.max-error-duration", "5m").put("exchange.http-client.request-timeout", "1s");
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put("failure-detector.enabled", "true").put("failure-detector.heartbeat-interval", "1s").put("failure-detector.http-client.request-timeout", "500ms").put("failure-detector.exponential-decay-seconds", "1").put("failure-detector.threshold", "0.1").put("max-failed-task-percentage", "0.6").put("scheduler.http-client.request-timeout", "5s").put("query.remote-task.max-error-duration", "1s");
        return HiveQueryRunner.createQueryRunner((Iterable<TpchTable<?>>) ImmutableList.of(TpchTable.ORDERS), (Map<String, String>) put.build(), (Map<String, String>) builder.build(), (Optional<Path>) Optional.empty());
    }

    @AfterClass(alwaysRun = true)
    public void shutdown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        this.executor = null;
        if (this.queryRunner != null) {
            this.queryRunner.close();
        }
        this.queryRunner = null;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "testSettings")
    public static Object[][] testSettings() {
        return new Object[]{new Object[]{Integer.valueOf(INVOCATION_COUNT), true}, new Object[]{2, false}, new Object[]{2, true}};
    }

    @Test(timeOut = 120000, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT)
    public void testCreateBucketedTable(int i, boolean z) throws Exception {
        testRecoverableGroupedExecution(this.queryRunner, i, z, ImmutableList.of("CREATE TABLE create_bucketed_table_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE create_bucketed_table_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE create_bucketed_table_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders"), "CREATE TABLE create_bucketed_table_success WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_bucketed_table_1\nJOIN create_bucketed_table_2\nON key1 = key2\nJOIN create_bucketed_table_3\nON key2 = key3", "CREATE TABLE create_bucketed_table_failure WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_bucketed_table_1\nJOIN create_bucketed_table_2\nON key1 = key2\nJOIN create_bucketed_table_3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS create_bucketed_table_1", "DROP TABLE IF EXISTS create_bucketed_table_2", "DROP TABLE IF EXISTS create_bucketed_table_3", "DROP TABLE IF EXISTS create_bucketed_table_success", "DROP TABLE IF EXISTS create_bucketed_table_failure"));
    }

    @Test(timeOut = 120000, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT)
    public void testInsertBucketedTable(int i, boolean z) throws Exception {
        testRecoverableGroupedExecution(this.queryRunner, i, z, ImmutableList.of("CREATE TABLE insert_bucketed_table_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE insert_bucketed_table_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE insert_bucketed_table_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders", "CREATE TABLE insert_bucketed_table_success (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])", "CREATE TABLE insert_bucketed_table_failure (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (bucket_count = 13, bucketed_by = ARRAY['key'], partitioned_by = ARRAY['partition_key'])"), "INSERT INTO insert_bucketed_table_success\nSELECT key1, value1, 'foo'\nFROM insert_bucketed_table_1\nJOIN insert_bucketed_table_2\nON key1 = key2\nJOIN insert_bucketed_table_3\nON key2 = key3", "INSERT INTO insert_bucketed_table_failure\nSELECT key1, value1, 'foo'\nFROM insert_bucketed_table_1\nJOIN insert_bucketed_table_2\nON key1 = key2\nJOIN insert_bucketed_table_3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS insert_bucketed_table_1", "DROP TABLE IF EXISTS insert_bucketed_table_2", "DROP TABLE IF EXISTS insert_bucketed_table_3", "DROP TABLE IF EXISTS insert_bucketed_table_success", "DROP TABLE IF EXISTS insert_bucketed_table_failure"));
    }

    @Test(timeOut = 120000, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT)
    public void testCreateUnbucketedTableWithGroupedExecution(int i, boolean z) throws Exception {
        testRecoverableGroupedExecution(this.queryRunner, i, z, ImmutableList.of("CREATE TABLE create_unbucketed_table_with_grouped_execution_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE create_unbucketed_table_with_grouped_execution_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE create_unbucketed_table_with_grouped_execution_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders"), "CREATE TABLE create_unbucketed_table_with_grouped_execution_success AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_unbucketed_table_with_grouped_execution_1\nJOIN create_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN create_unbucketed_table_with_grouped_execution_3\nON key2 = key3", "CREATE TABLE create_unbucketed_table_with_grouped_execution_failure AS\nSELECT key1, value1, key2, value2, key3, value3\nFROM create_unbucketed_table_with_grouped_execution_1\nJOIN create_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN create_unbucketed_table_with_grouped_execution_3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_1", "DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_2", "DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_3", "DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_success", "DROP TABLE IF EXISTS create_unbucketed_table_with_grouped_execution_failure"));
    }

    @Test(timeOut = 120000, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT)
    public void testInsertUnbucketedTableWithGroupedExecution(int i, boolean z) throws Exception {
        testRecoverableGroupedExecution(this.queryRunner, i, z, ImmutableList.of("CREATE TABLE insert_unbucketed_table_with_grouped_execution_1\nWITH (bucket_count = 13, bucketed_by = ARRAY['key1']) AS\nSELECT orderkey key1, comment value1 FROM orders", "CREATE TABLE insert_unbucketed_table_with_grouped_execution_2\nWITH (bucket_count = 13, bucketed_by = ARRAY['key2']) AS\nSELECT orderkey key2, comment value2 FROM orders", "CREATE TABLE insert_unbucketed_table_with_grouped_execution_3\nWITH (bucket_count = 13, bucketed_by = ARRAY['key3']) AS\nSELECT orderkey key3, comment value3 FROM orders", "CREATE TABLE insert_unbucketed_table_with_grouped_execution_success (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (partitioned_by = ARRAY['partition_key'])", "CREATE TABLE insert_unbucketed_table_with_grouped_execution_failure (key BIGINT, value VARCHAR, partition_key VARCHAR)\nWITH (partitioned_by = ARRAY['partition_key'])"), "INSERT INTO insert_unbucketed_table_with_grouped_execution_success\nSELECT key1, value1, 'foo'\nFROM insert_unbucketed_table_with_grouped_execution_1\nJOIN insert_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN insert_unbucketed_table_with_grouped_execution_3\nON key2 = key3", "INSERT INTO insert_unbucketed_table_with_grouped_execution_failure\nSELECT key1, value1, 'foo'\nFROM insert_unbucketed_table_with_grouped_execution_1\nJOIN insert_unbucketed_table_with_grouped_execution_2\nON key1 = key2\nJOIN insert_unbucketed_table_with_grouped_execution_3\nON key2 = key3", 15000, ImmutableList.of("DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_1", "DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_2", "DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_3", "DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_success", "DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_failure"));
    }

    private void testRecoverableGroupedExecution(DistributedQueryRunner distributedQueryRunner, int i, boolean z, List<String> list, @Language("SQL") String str, @Language("SQL") String str2, int i2, List<String> list2) throws Exception {
        waitUntilAllNodesAreHealthy(distributedQueryRunner, new Duration(10.0d, TimeUnit.SECONDS));
        Session createRecoverableSession = createRecoverableSession(i, z);
        Iterator<String> it = list2.iterator();
        while (it.hasNext()) {
            distributedQueryRunner.execute(createRecoverableSession, it.next());
        }
        try {
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                distributedQueryRunner.execute(createRecoverableSession, it2.next());
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            Assert.assertEquals(distributedQueryRunner.execute(createRecoverableSession, str).getUpdateCount(), OptionalLong.of(i2));
            log.info("Query with no recovery took %sms", new Object[]{Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
            cancelAllQueries(distributedQueryRunner);
            cancelAllTasks(distributedQueryRunner);
            Stopwatch createStarted2 = Stopwatch.createStarted();
            ListenableFuture submit = this.executor.submit(() -> {
                return distributedQueryRunner.execute(createRecoverableSession, str2);
            });
            List list3 = (List) distributedQueryRunner.getServers().stream().filter(testingPrestoServer -> {
                return !testingPrestoServer.isCoordinator();
            }).collect(Collectors.toList());
            Collections.shuffle(list3);
            ((TestingPrestoServer) list3.get(0)).stopResponding();
            TestingPrestoServer testingPrestoServer2 = (TestingPrestoServer) list3.get(INVOCATION_COUNT);
            Thread.sleep(1000L);
            testingPrestoServer2.stopResponding();
            Assert.assertEquals(((MaterializedResult) submit.get(1000L, TimeUnit.SECONDS)).getUpdateCount(), OptionalLong.of(i2));
            log.info("Query with recovery took %sms", new Object[]{Long.valueOf(createStarted2.elapsed(TimeUnit.MILLISECONDS))});
            distributedQueryRunner.getServers().forEach((v0) -> {
                v0.startResponding();
            });
            cancelAllQueries(distributedQueryRunner);
            cancelAllTasks(distributedQueryRunner);
            Iterator<String> it3 = list2.iterator();
            while (it3.hasNext()) {
                distributedQueryRunner.execute(createRecoverableSession, it3.next());
            }
        } catch (Throwable th) {
            distributedQueryRunner.getServers().forEach((v0) -> {
                v0.startResponding();
            });
            cancelAllQueries(distributedQueryRunner);
            cancelAllTasks(distributedQueryRunner);
            Iterator<String> it4 = list2.iterator();
            while (it4.hasNext()) {
                distributedQueryRunner.execute(createRecoverableSession, it4.next());
            }
            throw th;
        }
    }

    private static void waitUntilAllNodesAreHealthy(DistributedQueryRunner distributedQueryRunner, Duration duration) throws TimeoutException, InterruptedException {
        TestingPrestoServer coordinator = distributedQueryRunner.getCoordinator();
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (coordinator.refreshNodes().getActiveNodes().size() == distributedQueryRunner.getNodeCount()) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
        throw new TimeoutException(String.format("one of the nodes is still missing after: %s", duration));
    }

    private static void cancelAllQueries(DistributedQueryRunner distributedQueryRunner) {
        distributedQueryRunner.getQueries().forEach(basicQueryInfo -> {
            distributedQueryRunner.getCoordinator().getQueryManager().cancelQuery(basicQueryInfo.getQueryId());
        });
    }

    private static void cancelAllTasks(DistributedQueryRunner distributedQueryRunner) {
        distributedQueryRunner.getServers().forEach(TestHiveRecoverableExecution::cancelAllTasks);
    }

    private static void cancelAllTasks(TestingPrestoServer testingPrestoServer) {
        testingPrestoServer.getTaskManager().getAllTaskInfo().forEach(taskInfo -> {
            testingPrestoServer.getTaskManager().cancelTask(taskInfo.getTaskId());
        });
    }

    private static Session createRecoverableSession(int i, boolean z) {
        return TestingSession.testSessionBuilder().setIdentity(new Identity(HiveQueryRunner.HIVE_CATALOG, Optional.empty(), (Map) Optional.of(new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))).map(selectedRole -> {
            return ImmutableMap.of(HiveQueryRunner.HIVE_CATALOG, selectedRole);
        }).orElse(ImmutableMap.of()), ImmutableMap.of(), ImmutableMap.of(), Optional.empty(), Optional.empty())).setSystemProperty("colocated_join", "true").setSystemProperty("grouped_execution", "true").setSystemProperty("concurrent_lifespans_per_task", "1").setSystemProperty("recoverable_grouped_execution", "true").setSystemProperty("scale_writers", "false").setSystemProperty("redistribute_writes", "false").setSystemProperty("task_writer_count", Integer.toString(i)).setSystemProperty("task_partitioned_writer_count", Integer.toString(i)).setSystemProperty("partitioning_provider_catalog", HiveQueryRunner.HIVE_CATALOG).setSystemProperty("exchange_materialization_strategy", SelectedRole.Type.ALL.name()).setSystemProperty("hash_partition_count", "11").setCatalogSessionProperty(HiveQueryRunner.HIVE_CATALOG, "virtual_bucket_count", "16").setCatalogSessionProperty(HiveQueryRunner.HIVE_CATALOG, "optimized_partition_update_serialization_enabled", z + "").setCatalog(HiveQueryRunner.HIVE_CATALOG).setSchema(HiveQueryRunner.TPCH_BUCKETED_SCHEMA).build();
    }
}
