package com.facebook.presto.server.remotetask;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.TaskTestUtils;
import com.facebook.presto.execution.TestSqlTaskManager;
import com.facebook.presto.metadata.HandleJsonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.PrestoNode;
import com.facebook.presto.server.HttpRemoteTaskFactory;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.testing.TestingHandleResolver;
import com.facebook.presto.type.TypeDeserializer;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.configuration.ConfigBinder;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.jaxrs.JsonMapper;
import io.airlift.jaxrs.testing.JaxrsTestingHttpProcessor;
import io.airlift.json.JsonBinder;
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecBinder;
import io.airlift.json.JsonModule;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/server/remotetask/TestHttpRemoteTask.class */
public class TestHttpRemoteTask {
    private static final Duration IDLE_TIMEOUT = new Duration(3.0d, TimeUnit.SECONDS);
    private static final Duration FAIL_TIMEOUT = new Duration(20.0d, TimeUnit.SECONDS);
    private static final TaskManagerConfig TASK_MANAGER_CONFIG = new TaskManagerConfig().setStatusRefreshMaxWait(new Duration(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 100, TimeUnit.MILLISECONDS)).setInfoUpdateInterval(new Duration(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS) / 10, TimeUnit.MILLISECONDS));
    private static final boolean TRACE_HTTP = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/server/remotetask/TestHttpRemoteTask$TestCase.class */
    public enum TestCase {
        TASK_MISMATCH,
        TASK_MISMATCH_WHEN_VERSION_IS_HIGH,
        REJECTED_EXECUTION
    }

    @Path("/task/{nodeId}")
    /* loaded from: input_file:com/facebook/presto/server/remotetask/TestHttpRemoteTask$TestingTaskResource.class */
    public static class TestingTaskResource {
        private static final String INITIAL_TASK_INSTANCE_ID = "task-instance-id";
        private static final String NEW_TASK_INSTANCE_ID = "task-instance-id-x";
        private final AtomicLong lastActivityNanos;
        private final TestCase testCase;
        private TaskInfo initialTaskInfo;
        private TaskStatus initialTaskStatus;
        private long version;
        private TaskState taskState;
        private String taskInstanceId = INITIAL_TASK_INSTANCE_ID;
        private long statusFetchCounter;

        public TestingTaskResource(AtomicLong atomicLong, TestCase testCase) {
            this.lastActivityNanos = (AtomicLong) Objects.requireNonNull(atomicLong, "lastActivityNanos is null");
            this.testCase = (TestCase) Objects.requireNonNull(testCase, "testCase is null");
        }

        @GET
        @Produces({"application/json"})
        @Path("{taskId}")
        public synchronized TaskInfo getTaskInfo(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Presto-Current-State") TaskState taskState, @HeaderParam("X-Presto-Max-Wait") Duration duration, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            return buildTaskInfo();
        }

        @Path("{taskId}")
        @Consumes({"application/json"})
        @POST
        @Produces({"application/json"})
        public synchronized TaskInfo createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            return buildTaskInfo();
        }

        @GET
        @Produces({"application/json"})
        @Path("{taskId}/status")
        public synchronized TaskStatus getTaskStatus(@PathParam("taskId") TaskId taskId, @HeaderParam("X-Presto-Current-State") TaskState taskState, @HeaderParam("X-Presto-Max-Wait") Duration duration, @Context UriInfo uriInfo) throws InterruptedException {
            this.lastActivityNanos.set(System.nanoTime());
            wait(duration.roundTo(TimeUnit.MILLISECONDS));
            return buildTaskStatus();
        }

        @Produces({"application/json"})
        @Path("{taskId}")
        @DELETE
        public synchronized TaskInfo deleteTask(@PathParam("taskId") TaskId taskId, @QueryParam("abort") @DefaultValue("true") boolean z, @Context UriInfo uriInfo) {
            this.lastActivityNanos.set(System.nanoTime());
            this.taskState = z ? TaskState.ABORTED : TaskState.CANCELED;
            return buildTaskInfo();
        }

        public void setInitialTaskInfo(TaskInfo taskInfo) {
            this.initialTaskInfo = taskInfo;
            this.initialTaskStatus = taskInfo.getTaskStatus();
            this.taskState = this.initialTaskStatus.getState();
            this.version = this.initialTaskStatus.getVersion();
            switch (this.testCase) {
                case TASK_MISMATCH:
                case REJECTED_EXECUTION:
                    return;
                case TASK_MISMATCH_WHEN_VERSION_IS_HIGH:
                    this.version = 1000000L;
                    return;
                default:
                    throw new UnsupportedOperationException();
            }
        }

        private TaskInfo buildTaskInfo() {
            return new TaskInfo(buildTaskStatus(), this.initialTaskInfo.getLastHeartbeat(), this.initialTaskInfo.getOutputBuffers(), this.initialTaskInfo.getNoMoreSplits(), this.initialTaskInfo.getStats(), this.initialTaskInfo.isNeedsPlan(), this.initialTaskInfo.isComplete());
        }

        private TaskStatus buildTaskStatus() {
            this.statusFetchCounter++;
            switch (this.testCase) {
                case TASK_MISMATCH:
                case TASK_MISMATCH_WHEN_VERSION_IS_HIGH:
                    if (this.statusFetchCounter == 10) {
                        this.taskInstanceId = NEW_TASK_INSTANCE_ID;
                        this.version = 0L;
                        break;
                    }
                    break;
                case REJECTED_EXECUTION:
                    if (this.statusFetchCounter >= 10) {
                        throw new RejectedExecutionException();
                    }
                    break;
                default:
                    throw new UnsupportedOperationException();
            }
            TaskId taskId = this.initialTaskStatus.getTaskId();
            String str = this.taskInstanceId;
            long j = this.version + 1;
            this.version = j;
            return new TaskStatus(taskId, str, j, this.taskState, this.initialTaskStatus.getSelf(), this.initialTaskStatus.getFailures(), this.initialTaskStatus.getQueuedPartitionedDrivers(), this.initialTaskStatus.getRunningPartitionedDrivers(), this.initialTaskStatus.getMemoryReservation());
        }
    }

    @Test(timeOut = 30000)
    public void testRemoteTaskMismatch() throws Exception {
        runTest(TestCase.TASK_MISMATCH);
    }

    @Test(timeOut = 30000)
    public void testRejectedExecutionWhenVersionIsHigh() throws Exception {
        runTest(TestCase.TASK_MISMATCH_WHEN_VERSION_IS_HIGH);
    }

    @Test(timeOut = 30000)
    public void testRejectedExecution() throws Exception {
        runTest(TestCase.REJECTED_EXECUTION);
    }

    private void runTest(TestCase testCase) throws Exception {
        AtomicLong atomicLong = new AtomicLong(System.nanoTime());
        TestingTaskResource testingTaskResource = new TestingTaskResource(atomicLong, testCase);
        HttpRemoteTaskFactory createHttpRemoteTaskFactory = createHttpRemoteTaskFactory(testingTaskResource);
        RemoteTask createRemoteTask = createHttpRemoteTaskFactory.createRemoteTask(SessionTestUtils.TEST_SESSION, new TaskId("test", 1, 2), new PrestoNode("node-id", URI.create("http://fake.invalid/"), new NodeVersion("version"), false), TaskTestUtils.PLAN_FRAGMENT, ImmutableMultimap.of(), OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), new NodeTaskMap.PartitionedSplitCountTracker(i -> {
        }), true);
        testingTaskResource.setInitialTaskInfo(createRemoteTask.getTaskInfo());
        createRemoteTask.start();
        CompletableFuture completableFuture = new CompletableFuture();
        asyncRun(IDLE_TIMEOUT.roundTo(TimeUnit.MILLISECONDS), FAIL_TIMEOUT.roundTo(TimeUnit.MILLISECONDS), atomicLong, () -> {
            completableFuture.complete(null);
        }, (str, th) -> {
            completableFuture.completeExceptionally(new AssertionError(str, th));
        });
        completableFuture.get();
        createHttpRemoteTaskFactory.stop();
        Assert.assertTrue(createRemoteTask.getTaskStatus().getState().isDone(), String.format("TaskStatus is not in a done state: %s", createRemoteTask.getTaskStatus()));
        Assert.assertTrue(createRemoteTask.getTaskInfo().getTaskStatus().getState().isDone(), String.format("TaskInfo is not in a done state: %s", createRemoteTask.getTaskInfo()));
        ErrorCode errorCode = ((ExecutionFailureInfo) Iterables.getOnlyElement(createRemoteTask.getTaskStatus().getFailures())).getErrorCode();
        switch (testCase) {
            case TASK_MISMATCH:
            case TASK_MISMATCH_WHEN_VERSION_IS_HIGH:
                com.facebook.presto.testing.assertions.Assert.assertEquals(errorCode, StandardErrorCode.REMOTE_TASK_MISMATCH.toErrorCode());
                return;
            case REJECTED_EXECUTION:
                com.facebook.presto.testing.assertions.Assert.assertEquals(errorCode, StandardErrorCode.REMOTE_TASK_ERROR.toErrorCode());
                return;
            default:
                throw new UnsupportedOperationException();
        }
    }

    private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(final TestingTaskResource testingTaskResource) throws Exception {
        Injector initialize = new Bootstrap(new Module[]{new JsonModule(), new HandleJsonModule(), new Module() { // from class: com.facebook.presto.server.remotetask.TestHttpRemoteTask.1
            public void configure(Binder binder) {
                binder.bind(JsonMapper.class);
                ConfigBinder.configBinder(binder).bindConfig(FeaturesConfig.class);
                binder.bind(TypeRegistry.class).in(Scopes.SINGLETON);
                binder.bind(TypeManager.class).to(TypeRegistry.class).in(Scopes.SINGLETON);
                JsonBinder.jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
                Multibinder.newSetBinder(binder, Type.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskStatus.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskInfo.class);
                JsonCodecBinder.jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
            }

            @Provides
            private HttpRemoteTaskFactory createHttpRemoteTaskFactory(JsonMapper jsonMapper, JsonCodec<TaskStatus> jsonCodec, JsonCodec<TaskInfo> jsonCodec2, JsonCodec<TaskUpdateRequest> jsonCodec3) {
                return new HttpRemoteTaskFactory(new QueryManagerConfig(), TestHttpRemoteTask.TASK_MANAGER_CONFIG, new TestingHttpClient(new JaxrsTestingHttpProcessor(URI.create("http://fake.invalid/"), new Object[]{TestingTaskResource.this, jsonMapper}).setTrace(false)), new TestSqlTaskManager.MockLocationFactory(), jsonCodec, jsonCodec2, jsonCodec3, new RemoteTaskStats());
            }
        }}).strictConfig().doNotInitializeLogging().quiet().initialize();
        ((HandleResolver) initialize.getInstance(HandleResolver.class)).addConnectorName("test", new TestingHandleResolver());
        return (HttpRemoteTaskFactory) initialize.getInstance(HttpRemoteTaskFactory.class);
    }

    private static void asyncRun(long j, long j2, AtomicLong atomicLong, Runnable runnable, BiConsumer<String, Throwable> biConsumer) {
        new Thread(() -> {
            long nanoTime = System.nanoTime();
            while (true) {
                try {
                    long nanoTime2 = (System.nanoTime() - atomicLong.get()) / 1000000;
                    long nanoTime3 = (System.nanoTime() - nanoTime) / 1000000;
                    long j3 = j - nanoTime2;
                    if (j2 - nanoTime3 < j3) {
                        biConsumer.accept(String.format("Activity doesn't stop after %sms", Long.valueOf(j2)), null);
                        return;
                    } else {
                        if (j3 < 0) {
                            runnable.run();
                            return;
                        }
                        Thread.sleep(j3);
                    }
                } catch (InterruptedException e) {
                    biConsumer.accept("Idle/fail timeout monitor thread interrupted", e);
                    return;
                }
            }
        }).start();
    }
}
