package com.facebook.presto.spark.execution.nativeprocess;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.operator.PageBufferClient;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpTaskClient;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.base.Throwables;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.class */
public class HttpNativeExecutionTaskResultFetcher {
    private static final Logger log = Logger.get(HttpNativeExecutionTaskResultFetcher.class);
    private static final Duration FETCH_INTERVAL = new Duration(200.0d, TimeUnit.MILLISECONDS);
    private static final Duration POLL_TIMEOUT = new Duration(100.0d, TimeUnit.MILLISECONDS);
    private static final DataSize MAX_RESPONSE_SIZE = new DataSize(32.0d, DataSize.Unit.MEGABYTE);
    private static final DataSize MAX_BUFFER_SIZE = new DataSize(128.0d, DataSize.Unit.MEGABYTE);
    private final ScheduledExecutorService scheduler;
    private final PrestoSparkHttpTaskClient workerClient;
    private final Object taskHasResult;
    private ScheduledFuture<?> scheduledFuture;
    private long token;
    private final LinkedBlockingDeque<SerializedPage> pageBuffer = new LinkedBlockingDeque<>();
    private final AtomicReference<Throwable> lastException = new AtomicReference<>();
    private final AtomicLong bufferMemoryBytes = new AtomicLong();

    public HttpNativeExecutionTaskResultFetcher(ScheduledExecutorService scheduledExecutorService, PrestoSparkHttpTaskClient prestoSparkHttpTaskClient, Object obj) {
        this.scheduler = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "scheduler is null");
        this.workerClient = (PrestoSparkHttpTaskClient) Objects.requireNonNull(prestoSparkHttpTaskClient, "workerClient is null");
        this.taskHasResult = Objects.requireNonNull(obj, "taskHasResult is null");
    }

    public void start() {
        this.scheduledFuture = this.scheduler.scheduleAtFixedRate(this::doGetResults, 0L, (long) FETCH_INTERVAL.getValue(), FETCH_INTERVAL.getUnit());
    }

    public void stop(boolean z) {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
        }
        if (z && !this.pageBuffer.isEmpty()) {
            throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("TaskResultFetcher is closed with %s pages left in the buffer", Integer.valueOf(this.pageBuffer.size())));
        }
    }

    public Optional<SerializedPage> pollPage() throws InterruptedException {
        throwIfFailed();
        SerializedPage poll = this.pageBuffer.poll((long) POLL_TIMEOUT.getValue(), POLL_TIMEOUT.getUnit());
        if (poll == null) {
            return Optional.empty();
        }
        this.bufferMemoryBytes.addAndGet(-poll.getSizeInBytes());
        return Optional.of(poll);
    }

    public boolean hasPage() {
        throwIfFailed();
        return !this.pageBuffer.isEmpty();
    }

    private void throwIfFailed() {
        if (this.scheduledFuture == null || !this.scheduledFuture.isCancelled() || this.lastException.get() == null) {
            return;
        }
        Throwable th = this.lastException.get();
        Throwables.throwIfUnchecked(th);
        throw new RuntimeException(th);
    }

    private void doGetResults() {
        if (this.bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) {
            return;
        }
        try {
            onSuccess((PageBufferClient.PagesResponse) MoreFutures.getFutureValue(this.workerClient.getResults(this.token, MAX_RESPONSE_SIZE)));
        } catch (Throwable th) {
            onFailure(th);
        }
    }

    private void onSuccess(PageBufferClient.PagesResponse pagesResponse) {
        List pages = pagesResponse.getPages();
        long j = 0;
        long j2 = 0;
        Iterator it = pages.iterator();
        while (it.hasNext()) {
            if (!PagesSerdeUtil.isChecksumValid((SerializedPage) it.next())) {
                throw new PrestoException(StandardErrorCode.SERIALIZED_PAGE_CHECKSUM_ERROR, String.format("Received corrupted serialized page from host %s", HostAddress.fromUri(this.workerClient.getLocation())));
            }
            j += r0.getSizeInBytes();
            j2 += r0.getPositionCount();
        }
        log.info("Received %s rows in %s pages from %s", new Object[]{Long.valueOf(j2), Integer.valueOf(pages.size()), this.workerClient.getTaskUri()});
        this.pageBuffer.addAll(pages);
        this.bufferMemoryBytes.addAndGet(j);
        long nextToken = pagesResponse.getNextToken();
        if (pages.size() > 0) {
            this.workerClient.acknowledgeResultsAsync(nextToken);
        }
        this.token = nextToken;
        if (pagesResponse.isClientComplete()) {
            this.workerClient.abortResultsAsync();
            this.scheduledFuture.cancel(false);
        }
        if (pages.isEmpty()) {
            return;
        }
        synchronized (this.taskHasResult) {
            this.taskHasResult.notifyAll();
        }
    }

    private void onFailure(Throwable th) {
        this.workerClient.abortResultsAsync();
        stop(false);
        this.lastException.set(th);
        synchronized (this.taskHasResult) {
            this.taskHasResult.notifyAll();
        }
    }
}
