package com.facebook.presto.execution.buffer;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.execution.buffer.ClientBuffer;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/buffer/TestClientBuffer.class */
public class TestClientBuffer {
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private static final String INVALID_SEQUENCE_ID = "Invalid sequence id";
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();
    private static final Duration NO_WAIT = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private static final DataSize BUFFERED_PAGE_SIZE = new DataSize(PAGES_SERDE.serialize(createPage(42)).getRetainedSizeInBytes(), DataSize.Unit.BYTE);
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final OutputBuffers.OutputBufferId BUFFER_ID = new OutputBuffers.OutputBufferId(33);

    @ThreadSafe
    /* loaded from: input_file:com/facebook/presto/execution/buffer/TestClientBuffer$TestingPagesSupplier.class */
    private static class TestingPagesSupplier implements ClientBuffer.PagesSupplier {

        @GuardedBy("this")
        private final Deque<SerializedPageReference> buffer;

        @GuardedBy("this")
        private boolean noMorePages;

        private TestingPagesSupplier() {
            this.buffer = new ArrayDeque();
        }

        public synchronized boolean mayHaveMorePages() {
            return (this.noMorePages && this.buffer.isEmpty()) ? false : true;
        }

        public synchronized void setNoMorePages() {
            this.noMorePages = true;
        }

        public synchronized int getBufferedPages() {
            return this.buffer.size();
        }

        public synchronized void addPage(Page page) {
            Objects.requireNonNull(page, "page is null");
            Preconditions.checkState(!this.noMorePages);
            this.buffer.add(new SerializedPageReference(TestClientBuffer.PAGES_SERDE.serialize(page), 1, () -> {
            }));
        }

        public synchronized List<SerializedPageReference> getPages(DataSize dataSize) {
            long bytes = dataSize.toBytes();
            ArrayList arrayList = new ArrayList();
            long j = 0;
            while (true) {
                SerializedPageReference peek = this.buffer.peek();
                if (peek != null) {
                    j += peek.getRetainedSizeInBytes();
                    if (!arrayList.isEmpty() && j > bytes) {
                        break;
                    }
                    Preconditions.checkState(this.buffer.poll() == peek, "Buffer corrupted");
                    arrayList.add(peek);
                } else {
                    break;
                }
            }
            return ImmutableList.copyOf(arrayList);
        }
    }

    @Test
    public void testSimplePushBuffer() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        for (int i = 0; i < 3; i++) {
            addPage(clientBuffer, createPage(i));
        }
        assertBufferInfo(clientBuffer, 3, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertBufferInfo(clientBuffer, 3, 0);
        clientBuffer.getPages(3L, sizeOfPages(10)).cancel(true);
        assertBufferInfo(clientBuffer, 0, 3);
        for (int i2 = 3; i2 < 6; i2++) {
            addPage(clientBuffer, createPage(i2));
        }
        assertBufferInfo(clientBuffer, 3, 3);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 3L, sizeOfPages(1), NO_WAIT), bufferResult(3L, createPage(3), new Page[0]));
        assertBufferInfo(clientBuffer, 3, 3);
        clientBuffer.setNoMorePages();
        assertBufferInfo(clientBuffer, 3, 3);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 4L, sizeOfPages(1), NO_WAIT), bufferResult(4L, createPage(4), new Page[0]));
        assertBufferInfo(clientBuffer, 2, 4);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 5L, sizeOfPages(30), NO_WAIT), bufferResult(5L, createPage(5), new Page[0]));
        assertBufferInfo(clientBuffer, 1, 5);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 6L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        assertBufferInfo(clientBuffer, 0, 6);
        clientBuffer.destroy();
        assertBufferDestroyed(clientBuffer, 6);
    }

    @Test
    public void testSimplePullBuffer() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        TestingPagesSupplier testingPagesSupplier = new TestingPagesSupplier();
        for (int i = 0; i < 3; i++) {
            testingPagesSupplier.addPage(createPage(i));
        }
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 3);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, testingPagesSupplier, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 0);
        assertBufferInfo(clientBuffer, 3, 0);
        ListenableFuture pages = clientBuffer.getPages(3L, sizeOfPages(1));
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 0);
        assertBufferInfo(clientBuffer, 0, 3);
        Assert.assertFalse(pages.isDone());
        for (int i2 = 3; i2 < 6; i2++) {
            testingPagesSupplier.addPage(createPage(i2));
        }
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 3);
        clientBuffer.loadPagesIfNecessary(testingPagesSupplier);
        assertBufferResultEquals(TYPES, getFuture(pages, NO_WAIT), bufferResult(3L, createPage(3), new Page[0]));
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 2);
        assertBufferInfo(clientBuffer, 1, 3);
        testingPagesSupplier.setNoMorePages();
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 2);
        assertBufferInfo(clientBuffer, 1, 3);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, testingPagesSupplier, 4L, sizeOfPages(1), NO_WAIT), bufferResult(4L, createPage(4), new Page[0]));
        assertBufferInfo(clientBuffer, 1, 4);
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 1);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, testingPagesSupplier, 5L, sizeOfPages(30), NO_WAIT), bufferResult(5L, createPage(5), new Page[0]));
        assertBufferInfo(clientBuffer, 1, 5);
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, testingPagesSupplier, 6L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        assertBufferInfo(clientBuffer, 0, 6);
        Assert.assertEquals(testingPagesSupplier.getBufferedPages(), 0);
        clientBuffer.destroy();
        assertBufferDestroyed(clientBuffer, 6);
    }

    @Test
    public void testDuplicateRequests() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        for (int i = 0; i < 3; i++) {
            addPage(clientBuffer, createPage(i));
        }
        assertBufferInfo(clientBuffer, 3, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertBufferInfo(clientBuffer, 3, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertBufferInfo(clientBuffer, 3, 0);
        clientBuffer.getPages(3L, sizeOfPages(10)).cancel(true);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        assertBufferInfo(clientBuffer, 0, 3);
    }

    @Test
    public void testAddAfterNoMorePages() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        clientBuffer.setNoMorePages();
        addPage(clientBuffer, createPage(0));
        addPage(clientBuffer, createPage(0));
        assertBufferInfo(clientBuffer, 0, 0);
    }

    @Test
    public void testAddAfterDestroy() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        clientBuffer.destroy();
        addPage(clientBuffer, createPage(0));
        addPage(clientBuffer, createPage(0));
        assertBufferDestroyed(clientBuffer, 0);
    }

    @Test
    public void testDestroy() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        for (int i = 0; i < 5; i++) {
            addPage(clientBuffer, createPage(i));
        }
        clientBuffer.setNoMorePages();
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        clientBuffer.destroy();
        assertBufferDestroyed(clientBuffer, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testNoMorePagesFreesReader() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        ListenableFuture pages = clientBuffer.getPages(0L, sizeOfPages(10));
        Assert.assertFalse(pages.isDone());
        addPage(clientBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(pages, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture pages2 = clientBuffer.getPages(1L, sizeOfPages(10));
        Assert.assertFalse(pages2.isDone());
        clientBuffer.setNoMorePages();
        assertBufferInfo(clientBuffer, 0, 1);
        assertBufferResultEquals(TYPES, getFuture(pages2, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, true));
    }

    @Test
    public void testDestroyFreesReader() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        ListenableFuture pages = clientBuffer.getPages(0L, sizeOfPages(10));
        Assert.assertFalse(pages.isDone());
        addPage(clientBuffer, createPage(0));
        Assert.assertTrue(pages.isDone());
        assertBufferResultEquals(TYPES, getFuture(pages, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture pages2 = clientBuffer.getPages(1L, sizeOfPages(10));
        Assert.assertFalse(pages2.isDone());
        clientBuffer.destroy();
        assertBufferResultEquals(TYPES, getFuture(pages2, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
        assertBufferDestroyed(clientBuffer, 1);
    }

    @Test
    public void testInvalidTokenFails() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        addPage(clientBuffer, createPage(0));
        addPage(clientBuffer, createPage(1));
        clientBuffer.getPages(1L, sizeOfPages(10)).cancel(true);
        assertBufferInfo(clientBuffer, 1, 1);
        assertInvalidSequenceId(clientBuffer, -1);
        assertBufferInfo(clientBuffer, 1, 1);
        assertInvalidSequenceId(clientBuffer, 10);
        assertBufferInfo(clientBuffer, 1, 1);
    }

    @Test
    public void testReferenceCount() throws Exception {
        ClientBuffer clientBuffer = new ClientBuffer(TASK_INSTANCE_ID, BUFFER_ID);
        AtomicBoolean addPage = addPage(clientBuffer, createPage(0));
        AtomicBoolean addPage2 = addPage(clientBuffer, createPage(1));
        Assert.assertTrue(addPage.get());
        Assert.assertTrue(addPage2.get());
        assertBufferInfo(clientBuffer, 2, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 0L, sizeOfPages(0), NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        Assert.assertTrue(addPage.get());
        Assert.assertTrue(addPage2.get());
        assertBufferInfo(clientBuffer, 2, 0);
        assertBufferResultEquals(TYPES, getBufferResult(clientBuffer, 1L, sizeOfPages(1), NO_WAIT), bufferResult(1L, createPage(1), new Page[0]));
        Assert.assertFalse(addPage.get());
        Assert.assertTrue(addPage2.get());
        assertBufferInfo(clientBuffer, 1, 1);
        clientBuffer.destroy();
        Assert.assertFalse(addPage.get());
        Assert.assertFalse(addPage2.get());
        assertBufferDestroyed(clientBuffer, 1);
    }

    private static void assertInvalidSequenceId(ClientBuffer clientBuffer, int i) {
        try {
            clientBuffer.getPages(i, sizeOfPages(10));
            Assert.fail("Expected Invalid sequence id");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals(e.getMessage(), INVALID_SEQUENCE_ID);
        }
    }

    private static BufferResult getBufferResult(ClientBuffer clientBuffer, long j, DataSize dataSize, Duration duration) {
        return getFuture(clientBuffer.getPages(j, dataSize), duration);
    }

    private static BufferResult getBufferResult(ClientBuffer clientBuffer, ClientBuffer.PagesSupplier pagesSupplier, long j, DataSize dataSize, Duration duration) {
        return getFuture(clientBuffer.getPages(j, dataSize, Optional.of(pagesSupplier)), duration);
    }

    private static BufferResult getFuture(ListenableFuture<BufferResult> listenableFuture, Duration duration) {
        return (BufferResult) MoreFutures.tryGetFutureValue(listenableFuture, (int) duration.toMillis(), TimeUnit.MILLISECONDS).get();
    }

    private static AtomicBoolean addPage(ClientBuffer clientBuffer, Page page) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        SerializedPageReference serializedPageReference = new SerializedPageReference(PAGES_SERDE.serialize(page), 1, () -> {
            atomicBoolean.set(false);
        });
        clientBuffer.enqueuePages(ImmutableList.of(serializedPageReference));
        serializedPageReference.dereferencePage();
        return atomicBoolean;
    }

    private static void assertBufferInfo(ClientBuffer clientBuffer, int i, int i2) {
        Assert.assertEquals(clientBuffer.getInfo(), new BufferInfo(BUFFER_ID, false, i, i2, new PageBufferInfo(BUFFER_ID.getId(), i, sizeOfPages(i).toBytes(), i + i2, i + i2)));
        Assert.assertEquals(clientBuffer.isDestroyed(), false);
    }

    private static void assertBufferDestroyed(ClientBuffer clientBuffer, int i) {
        BufferInfo info = clientBuffer.getInfo();
        Assert.assertEquals(info.getBufferedPages(), 0);
        Assert.assertEquals(info.getPagesSent(), i);
        Assert.assertEquals(info.isFinished(), true);
        Assert.assertEquals(clientBuffer.isDestroyed(), true);
    }

    private static void assertBufferResultEquals(List<? extends Type> list, BufferResult bufferResult, BufferResult bufferResult2) {
        Assert.assertEquals(bufferResult.getSerializedPages().size(), bufferResult2.getSerializedPages().size());
        Assert.assertEquals(bufferResult.getToken(), bufferResult2.getToken());
        for (int i = 0; i < bufferResult.getSerializedPages().size(); i++) {
            Page deserialize = PAGES_SERDE.deserialize((SerializedPage) bufferResult.getSerializedPages().get(i));
            Page deserialize2 = PAGES_SERDE.deserialize((SerializedPage) bufferResult2.getSerializedPages().get(i));
            Assert.assertEquals(deserialize.getChannelCount(), deserialize2.getChannelCount());
            PageAssertions.assertPageEquals(list, deserialize, deserialize2);
        }
        Assert.assertEquals(bufferResult.isBufferComplete(), bufferResult2.isBufferComplete());
    }

    private static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return bufferResult(j, ImmutableList.builder().add(page).add(pageArr).build());
    }

    private static BufferResult bufferResult(long j, List<Page> list) {
        Preconditions.checkArgument(!list.isEmpty(), "pages is empty");
        long size = j + list.size();
        Stream<Page> stream = list.stream();
        PagesSerde pagesSerde = PAGES_SERDE;
        pagesSerde.getClass();
        return new BufferResult(TASK_INSTANCE_ID, j, size, false, (List) stream.map(pagesSerde::serialize).collect(Collectors.toList()));
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock(i)});
    }

    private static DataSize sizeOfPages(int i) {
        return new DataSize(BUFFERED_PAGE_SIZE.toBytes() * i, DataSize.Unit.BYTE);
    }
}
