package com.facebook.presto.operator;

import com.facebook.presto.execution.TaskId;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.serde.PagesSerde;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.analyzer.Session;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.tuple.TupleInfo;
import com.facebook.presto.util.Threads;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterables;
import io.airlift.http.client.AsyncHttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.units.DataSize;
import java.io.Closeable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/operator/TestExchangeOperator.class */
public class TestExchangeOperator {
    private static final List<TupleInfo> TUPLE_INFOS = ImmutableList.of(TupleInfo.SINGLE_VARBINARY);
    private static final Page PAGE = SequencePageBuilder.createSequencePage(TUPLE_INFOS, 10, 100);
    private static final String TASK_1_ID = "task1";
    private static final String TASK_2_ID = "task2";
    private static final String TASK_3_ID = "task3";
    private LoadingCache<String, TaskBuffer> taskBuffers;
    private ExecutorService executor;
    private AsyncHttpClient httpClient;
    private DriverContext driverContext;
    private Supplier<ExchangeClient> exchangeClientSupplier;

    /* loaded from: input_file:com/facebook/presto/operator/TestExchangeOperator$HttpClientHandler.class */
    private class HttpClientHandler implements Function<Request, Response> {
        private HttpClientHandler() {
        }

        public Response apply(Request request) {
            ImmutableList copyOf = ImmutableList.copyOf(Splitter.on("/").omitEmptyStrings().split(request.getUri().getPath()));
            Assert.assertEquals(copyOf.size(), 2);
            String str = (String) copyOf.get(0);
            int parseInt = Integer.parseInt((String) copyOf.get(1));
            ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder();
            builder.put("X-Presto-Page-Sequence-Id", String.valueOf(parseInt));
            TaskBuffer taskBuffer = (TaskBuffer) TestExchangeOperator.this.taskBuffers.getUnchecked(str);
            Page page = taskBuffer.getPage(parseInt);
            if (page == null) {
                return taskBuffer.isClosed() ? new TestingResponse(HttpStatus.GONE, builder.build(), new byte[0]) : new TestingResponse(HttpStatus.NO_CONTENT, builder.build(), new byte[0]);
            }
            builder.put("Content-Type", "application/X-presto-pages");
            DynamicSliceOutput dynamicSliceOutput = new DynamicSliceOutput(256);
            PagesSerde.writePages(dynamicSliceOutput, new Page[]{page});
            return new TestingResponse(HttpStatus.OK, builder.build(), dynamicSliceOutput.slice().getInput());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/facebook/presto/operator/TestExchangeOperator$TaskBuffer.class */
    public static class TaskBuffer implements Closeable {
        private final List<Page> buffer;
        private boolean closed;

        private TaskBuffer() {
            this.buffer = new ArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addPages(int i, boolean z) {
            addPages(Collections.nCopies(i, TestExchangeOperator.PAGE));
            if (z) {
                this.closed = true;
            }
        }

        public void addPages(Iterable<Page> iterable) {
            Iterables.addAll(this.buffer, iterable);
        }

        public Page getPage(int i) {
            if (i >= this.buffer.size()) {
                return null;
            }
            return this.buffer.get(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isClosed() {
            return this.closed;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.closed = true;
        }
    }

    @BeforeMethod
    public void setUp() throws Exception {
        this.taskBuffers = CacheBuilder.newBuilder().build(new CacheLoader<String, TaskBuffer>() { // from class: com.facebook.presto.operator.TestExchangeOperator.1
            public TaskBuffer load(String str) throws Exception {
                return new TaskBuffer();
            }
        });
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("test-%s"));
        this.httpClient = new TestingHttpClient(new HttpClientHandler(), this.executor);
        this.driverContext = new TaskContext(new TaskId("query", "stage", "task"), this.executor, new Session("user", "source", "catalog", "schema", "address", "agent")).addPipelineContext(true, true).addDriverContext();
        this.exchangeClientSupplier = new Supplier<ExchangeClient>() { // from class: com.facebook.presto.operator.TestExchangeOperator.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public ExchangeClient m22get() {
                return new ExchangeClient(new DataSize(32.0d, DataSize.Unit.MEGABYTE), new DataSize(10.0d, DataSize.Unit.MEGABYTE), 3, TestExchangeOperator.this.httpClient, TestExchangeOperator.this.executor);
            }
        };
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this.taskBuffers = null;
        this.httpClient.close();
        this.httpClient = null;
        this.executor.shutdownNow();
        this.executor = null;
    }

    @Test
    public void testSimple() throws Exception {
        SourceOperator createOperator = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, TUPLE_INFOS).createOperator(this.driverContext);
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task1"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task2"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task3"), TUPLE_INFOS));
        createOperator.noMoreSplits();
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(10, true);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(10, true);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(10, true);
        waitForPages(createOperator, 30);
        waitForFinished(createOperator);
    }

    @Test
    public void testWaitForClose() throws Exception {
        SourceOperator createOperator = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, TUPLE_INFOS).createOperator(this.driverContext);
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task1"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task2"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task3"), TUPLE_INFOS));
        createOperator.noMoreSplits();
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, false);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(1, false);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(1, false);
        waitForPages(createOperator, 3);
        Assert.assertEquals(createOperator.isFinished(), false);
        Assert.assertEquals(createOperator.needsInput(), false);
        Assert.assertEquals(createOperator.getOutput(), (Object) null);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(2, true);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(2, true);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(2, true);
        waitForPages(createOperator, 6);
        waitForFinished(createOperator);
    }

    @Test
    public void testWaitForNoMoreSplits() throws Exception {
        SourceOperator createOperator = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, TUPLE_INFOS).createOperator(this.driverContext);
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task1"), TUPLE_INFOS));
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, true);
        waitForPages(createOperator, 1);
        Assert.assertEquals(createOperator.isFinished(), false);
        Assert.assertEquals(createOperator.needsInput(), false);
        Assert.assertEquals(createOperator.getOutput(), (Object) null);
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task2"), TUPLE_INFOS));
        createOperator.noMoreSplits();
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(2, true);
        waitForPages(createOperator, 2);
        waitForFinished(createOperator);
    }

    @Test
    public void testFinish() throws Exception {
        SourceOperator createOperator = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, TUPLE_INFOS).createOperator(this.driverContext);
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task1"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task2"), TUPLE_INFOS));
        createOperator.addSplit(new RemoteSplit(URI.create("http://localhost/task3"), TUPLE_INFOS));
        createOperator.noMoreSplits();
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_1_ID)).addPages(1, false);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_2_ID)).addPages(1, false);
        ((TaskBuffer) this.taskBuffers.getUnchecked(TASK_3_ID)).addPages(1, false);
        waitForPages(createOperator, 3);
        Assert.assertEquals(createOperator.isFinished(), false);
        Assert.assertEquals(createOperator.needsInput(), false);
        Assert.assertEquals(createOperator.getOutput(), (Object) null);
        createOperator.finish();
        waitForFinished(createOperator);
    }

    private List<Page> waitForPages(Operator operator, int i) throws InterruptedException {
        long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < i && System.nanoTime() < nanoTime) {
            Assert.assertEquals(operator.needsInput(), false);
            if (operator.isFinished()) {
                break;
            }
            Page output = operator.getOutput();
            if (output != null) {
                arrayList.add(output);
            } else {
                Thread.sleep(10L);
            }
        }
        Thread.sleep(10L);
        Assert.assertEquals(operator.needsInput(), false);
        Assert.assertNull(operator.getOutput());
        Assert.assertEquals(arrayList.size(), i);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            PageAssertions.assertPageEquals((Page) it.next(), PAGE);
        }
        return arrayList;
    }

    private void waitForFinished(Operator operator) throws InterruptedException {
        long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        while (System.nanoTime() < nanoTime) {
            Assert.assertEquals(operator.needsInput(), false);
            Assert.assertNull(operator.getOutput());
            if (operator.isFinished()) {
                break;
            } else {
                Thread.sleep(10L);
            }
        }
        Assert.assertEquals(operator.isFinished(), true);
        Assert.assertEquals(operator.needsInput(), false);
        Assert.assertNull(operator.getOutput());
    }
}
