package com.facebook.presto.spark.execution.http;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.http.client.HeaderName;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.http.client.ResponseHandlerUtils;
import com.facebook.airlift.http.client.StaticBodyGenerator;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.operator.HttpRpcShuffleClient;
import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.RequestHelpers;
import com.facebook.presto.server.SimpleHttpResponseCallback;
import com.facebook.presto.server.SimpleHttpResponseHandler;
import com.facebook.presto.server.SimpleHttpResponseHandlerStats;
import com.facebook.presto.server.TaskUpdateRequest;
import com.facebook.presto.server.smile.AdaptingJsonResponseHandler;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:com/facebook/presto/spark/execution/http/PrestoSparkHttpTaskClient.class */
public class PrestoSparkHttpTaskClient {
    private static final String TASK_URI = "/v1/task/";
    private final HttpClient httpClient;
    private final URI location;
    private final URI taskUri;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final JsonCodec<PlanFragment> planFragmentCodec;
    private final JsonCodec<BatchTaskUpdateRequest> taskUpdateRequestCodec;
    private final Duration infoRefreshMaxWait;
    private final Executor executor;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Duration remoteTaskMaxErrorDuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/spark/execution/http/PrestoSparkHttpTaskClient$BytesResponse.class */
    public static class BytesResponse implements BaseResponse<byte[]> {
        private final int statusCode;
        private final String statusMessage;
        private final ListMultimap<HeaderName, String> headers;
        private final byte[] bytes;

        public BytesResponse(int i, String str, ListMultimap<HeaderName, String> listMultimap, byte[] bArr) {
            this.statusCode = i;
            this.statusMessage = (String) Objects.requireNonNull(str, "statusMessage is null");
            this.headers = ImmutableListMultimap.copyOf((Multimap) Objects.requireNonNull(listMultimap, "headers is null"));
            this.bytes = bArr;
        }

        public int getStatusCode() {
            return this.statusCode;
        }

        public String getStatusMessage() {
            return this.statusMessage;
        }

        public String getHeader(String str) {
            List list = getHeaders().get(HeaderName.of(str));
            if (list.isEmpty()) {
                return null;
            }
            return (String) list.get(0);
        }

        public List<String> getHeaders(String str) {
            return this.headers.get(HeaderName.of(str));
        }

        public ListMultimap<HeaderName, String> getHeaders() {
            return this.headers;
        }

        public boolean hasValue() {
            return true;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public byte[] m20getValue() {
            return this.bytes;
        }

        public int getResponseSize() {
            return this.bytes.length;
        }

        public byte[] getResponseBytes() {
            return this.bytes;
        }

        public Exception getException() {
            return null;
        }
    }

    /* loaded from: input_file:com/facebook/presto/spark/execution/http/PrestoSparkHttpTaskClient$BytesResponseHandler.class */
    private static class BytesResponseHandler implements ResponseHandler<BaseResponse<byte[]>, RuntimeException> {
        private BytesResponseHandler() {
        }

        /* renamed from: handleException, reason: merged with bridge method [inline-methods] */
        public BaseResponse<byte[]> m22handleException(Request request, Exception exc) {
            throw ResponseHandlerUtils.propagate(request, exc);
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public BaseResponse<byte[]> m21handle(Request request, Response response) {
            return new BytesResponse(response.getStatusCode(), response.getStatusMessage(), response.getHeaders(), readResponseBytes(response));
        }

        private static byte[] readResponseBytes(Response response) {
            try {
                InputStream inputStream = response.getInputStream();
                return inputStream == null ? new byte[0] : ByteStreams.toByteArray(inputStream);
            } catch (IOException e) {
                throw new RuntimeException("Error reading response from server", e);
            }
        }
    }

    public PrestoSparkHttpTaskClient(HttpClient httpClient, TaskId taskId, URI uri, JsonCodec<TaskInfo> jsonCodec, JsonCodec<PlanFragment> jsonCodec2, JsonCodec<BatchTaskUpdateRequest> jsonCodec3, Duration duration, Executor executor, ScheduledExecutorService scheduledExecutorService, Duration duration2) {
        this.httpClient = (HttpClient) Objects.requireNonNull(httpClient, "httpClient is null");
        this.location = (URI) Objects.requireNonNull(uri, "location is null");
        this.taskInfoCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "taskInfoCodec is null");
        this.planFragmentCodec = (JsonCodec) Objects.requireNonNull(jsonCodec2, "planFragmentCodec is null");
        this.taskUpdateRequestCodec = (JsonCodec) Objects.requireNonNull(jsonCodec3, "taskUpdateRequestCodec is null");
        this.taskUri = createTaskUri(uri, taskId);
        this.infoRefreshMaxWait = (Duration) Objects.requireNonNull(duration, "infoRefreshMaxWait is null");
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.scheduledExecutorService = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
        this.remoteTaskMaxErrorDuration = (Duration) Objects.requireNonNull(duration2, "remoteTaskMaxErrorDuration is null");
    }

    public ListenableFuture<PageBufferClient.PagesResponse> getResults(long j, DataSize dataSize) {
        RequestErrorTracker requestErrorTracker = new RequestErrorTracker("NativeExecution", this.location, StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, "getResults encountered too many errors talking to native process", this.remoteTaskMaxErrorDuration, this.scheduledExecutorService, "sending update request to native process");
        SettableFuture<PageBufferClient.PagesResponse> create = SettableFuture.create();
        scheduleGetResultsRequest(prepareGetResultsRequest(j, dataSize), requestErrorTracker, create);
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleGetResultsRequest(final Request request, final RequestErrorTracker requestErrorTracker, final SettableFuture<PageBufferClient.PagesResponse> settableFuture) {
        Futures.addCallback(Futures.transformAsync(requestErrorTracker.acquireRequestPermit(), obj -> {
            requestErrorTracker.startRequest();
            return this.httpClient.executeAsync(request, new HttpRpcShuffleClient.PageResponseHandler());
        }, this.executor), new FutureCallback<PageBufferClient.PagesResponse>() { // from class: com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient.1
            public void onSuccess(PageBufferClient.PagesResponse pagesResponse) {
                requestErrorTracker.requestSucceeded();
                settableFuture.set(pagesResponse);
            }

            public void onFailure(Throwable th) {
                if (th instanceof PrestoException) {
                    settableFuture.setException(th);
                    return;
                }
                try {
                    requestErrorTracker.requestFailed(th);
                    PrestoSparkHttpTaskClient.this.scheduleGetResultsRequest(request, requestErrorTracker, settableFuture);
                } catch (Throwable th2) {
                    settableFuture.setException(th2);
                }
            }
        }, this.executor);
    }

    private Request prepareGetResultsRequest(long j, DataSize dataSize) {
        return Request.Builder.prepareGet().setHeader("X-Presto-Max-Size", dataSize.toString()).setUri(HttpUriBuilder.uriBuilderFrom(this.taskUri).appendPath("/results/0").appendPath(String.valueOf(j)).build()).build();
    }

    public void acknowledgeResultsAsync(long j) {
        executeWithRetries("acknowledgeResults", "acknowledge task results are received", Request.Builder.prepareGet().setUri(HttpUriBuilder.uriBuilderFrom(this.taskUri).appendPath("/results/0").appendPath(String.valueOf(j)).appendPath("acknowledge").build()).build(), new BytesResponseHandler());
    }

    public ListenableFuture<Void> abortResultsAsync() {
        return asVoidFuture(executeWithRetries("abortResults", "abort task results", Request.Builder.prepareDelete().setUri(HttpUriBuilder.uriBuilderFrom(this.taskUri).appendPath("/results/0").build()).build(), new BytesResponseHandler()));
    }

    private static ListenableFuture<Void> asVoidFuture(ListenableFuture<?> listenableFuture) {
        return Futures.transform(listenableFuture, obj -> {
            return null;
        }, MoreExecutors.directExecutor());
    }

    public TaskInfo getTaskInfo() {
        return (TaskInfo) MoreFutures.getFutureValue(executeWithRetries("getTaskInfo", "get remote task info", RequestHelpers.setContentTypeHeaders(false, Request.Builder.prepareGet()).setHeader("X-Presto-Max-Wait", this.infoRefreshMaxWait.toString()).setUri(this.taskUri).build(), AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler(this.taskInfoCodec)));
    }

    public TaskInfo updateTask(List<TaskSource> list, PlanFragment planFragment, TableWriteInfo tableWriteInfo, Optional<String> optional, Optional<String> optional2, Session session, OutputBuffers outputBuffers) {
        return (TaskInfo) MoreFutures.getFutureValue(executeWithRetries("updateTask", "create or update remote task", RequestHelpers.setContentTypeHeaders(false, Request.Builder.preparePost()).setUri(HttpUriBuilder.uriBuilderFrom(this.taskUri).appendPath("batch").build()).setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(this.taskUpdateRequestCodec.toBytes(new BatchTaskUpdateRequest(new TaskUpdateRequest(session.toSessionRepresentation(), session.getIdentity().getExtraCredentials(), Optional.of(planFragment.toBytes(this.planFragmentCodec)), list, outputBuffers, Optional.of(tableWriteInfo)), optional, optional2)))).build(), AdaptingJsonResponseHandler.createAdaptingJsonResponseHandler(this.taskInfoCodec)));
    }

    public URI getLocation() {
        return this.location;
    }

    public URI getTaskUri() {
        return this.taskUri;
    }

    private URI createTaskUri(URI uri, TaskId taskId) {
        return HttpUriBuilder.uriBuilderFrom(uri).appendPath(TASK_URI).appendPath(taskId.toString()).build();
    }

    private <T> ListenableFuture<T> executeWithRetries(String str, String str2, Request request, ResponseHandler<BaseResponse<T>, RuntimeException> responseHandler) {
        RequestErrorTracker requestErrorTracker = new RequestErrorTracker("NativeExecution", this.location, StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, str + " encountered too many errors talking to native process", this.remoteTaskMaxErrorDuration, this.scheduledExecutorService, str2);
        SettableFuture<T> create = SettableFuture.create();
        scheduleRequest(request, responseHandler, requestErrorTracker, create);
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void scheduleRequest(final Request request, final ResponseHandler<BaseResponse<T>, RuntimeException> responseHandler, final RequestErrorTracker requestErrorTracker, final SettableFuture<T> settableFuture) {
        Futures.addCallback(Futures.transformAsync(requestErrorTracker.acquireRequestPermit(), obj -> {
            requestErrorTracker.startRequest();
            return this.httpClient.executeAsync(request, responseHandler);
        }, this.executor), new SimpleHttpResponseHandler(new SimpleHttpResponseCallback<T>() { // from class: com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient.2
            public void success(T t) {
                settableFuture.set(t);
            }

            public void failed(Throwable th) {
                if (th instanceof PrestoException) {
                    settableFuture.setException(th);
                    return;
                }
                try {
                    requestErrorTracker.requestFailed(th);
                    PrestoSparkHttpTaskClient.this.scheduleRequest(request, responseHandler, requestErrorTracker, settableFuture);
                } catch (Throwable th2) {
                    settableFuture.setException(th2);
                }
            }

            public void fatal(Throwable th) {
                settableFuture.setException(th);
            }
        }, this.location, new SimpleHttpResponseHandlerStats(), StandardErrorCode.REMOTE_TASK_ERROR), this.executor);
    }
}
