package com.facebook.presto.execution.buffer;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/buffer/TestArbitraryOutputBuffer.class */
public class TestArbitraryOutputBuffer {
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private ScheduledExecutorService stateNotificationExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();
    private static final Duration NO_WAIT = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private static final Duration MAX_WAIT = new Duration(1.0d, TimeUnit.SECONDS);
    private static final DataSize BUFFERED_PAGE_SIZE = new DataSize(PAGES_SERDE.serialize(createPage(42)).getRetainedSizeInBytes(), DataSize.Unit.BYTE);
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final OutputBuffers.OutputBufferId FIRST = new OutputBuffers.OutputBufferId(0);
    private static final OutputBuffers.OutputBufferId SECOND = new OutputBuffers.OutputBufferId(1);

    @BeforeClass
    public void setUp() throws Exception {
        this.stateNotificationExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed("test-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() throws Exception {
        if (this.stateNotificationExecutor != null) {
            this.stateNotificationExecutor.shutdownNow();
            this.stateNotificationExecutor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() throws Exception {
        try {
            createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), new DataSize(0.0d, DataSize.Unit.BYTE));
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e) {
        }
        try {
            createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), new DataSize(0.0d, DataSize.Unit.BYTE));
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e2) {
        }
    }

    @Test
    public void testSimple() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        OutputBuffers withBuffer = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer);
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 0);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        createArbitraryBuffer.get(FIRST, 3L, sizeOfPages(1)).cancel(true);
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 3);
        for (int i2 = 3; i2 < 13; i2++) {
            addPage(createArbitraryBuffer, createPage(i2));
        }
        assertQueueState(createArbitraryBuffer, 9, FIRST, 1, 3);
        ListenableFuture<?> enqueuePage = enqueuePage(createArbitraryBuffer, createPage(13));
        Assert.assertFalse(enqueuePage.isDone());
        assertQueueState(createArbitraryBuffer, 10, FIRST, 1, 3);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 3L, sizeOfPages(1), NO_WAIT), bufferResult(3L, createPage(3), new Page[0]));
        assertQueueState(createArbitraryBuffer, 10, FIRST, 1, 3);
        Assert.assertFalse(enqueuePage.isDone());
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer2);
        assertQueueState(createArbitraryBuffer, 10, SECOND, 0, 0);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(4), createPage(5), createPage(6), createPage(7), createPage(8), createPage(9), createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(createArbitraryBuffer, 0, SECOND, 10, 0);
        createArbitraryBuffer.get(SECOND, 10L, sizeOfPages(10)).cancel(true);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 10);
        createArbitraryBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        assertQueueState(createArbitraryBuffer, 0, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 10);
        assertFutureIsDone(enqueuePage);
        addPage(createArbitraryBuffer, createPage(14));
        addPage(createArbitraryBuffer, createPage(15));
        addPage(createArbitraryBuffer, createPage(16));
        assertQueueState(createArbitraryBuffer, 2, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 2, SECOND, 1, 10);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 10L, sizeOfPages(1), NO_WAIT), bufferResult(10L, createPage(14), new Page[0]));
        assertQueueState(createArbitraryBuffer, 2, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 2, SECOND, 1, 10);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 4L, sizeOfPages(10), NO_WAIT), bufferResult(4L, createPage(15), createPage(16)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 2, 4);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.setNoMorePages();
        assertQueueState(createArbitraryBuffer, 0, FIRST, 2, 4);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 6L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 6);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.abort(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 6);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 11L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 11L, true));
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 11);
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.abort(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 6);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 11);
        assertFinished(createArbitraryBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 6L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 11L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 11L, true));
    }

    @Test
    public void testBufferFull() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(2));
        addPage(createArbitraryBuffer, createPage(1));
        addPage(createArbitraryBuffer, createPage(2));
        enqueuePage(createArbitraryBuffer, createPage(3));
    }

    @Test
    public void testDuplicateRequests() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 0);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        createArbitraryBuffer.get(FIRST, 3L, sizeOfPages(10)).cancel(true);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 3);
    }

    @Test
    public void testAddQueueAfterCreation() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(10));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        try {
            createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds());
            Assert.fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testAddAfterFinish() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(10));
        createArbitraryBuffer.setNoMorePages();
        addPage(createArbitraryBuffer, createPage(0));
        addPage(createArbitraryBuffer, createPage(1));
        Assert.assertEquals(createArbitraryBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        try {
            createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
            Assert.fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testAddAfterDestroy() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(10));
        createArbitraryBuffer.destroy();
        addPage(createArbitraryBuffer, createPage(0));
        addPage(createArbitraryBuffer, createPage(1));
        Assert.assertEquals(createArbitraryBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testGetBeforeCreate() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(33));
        Assert.assertTrue(listenableFuture.isDone());
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(33), new Page[0]));
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = "No more buffers already set")
    public void testUseUndeclaredBufferAfterFinalBuffersSet() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(10));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.get(SECOND, 0L, sizeOfPages(1));
    }

    @Test
    public void testAbortBeforeCreate() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        createArbitraryBuffer.abort(FIRST);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
        addPage(createArbitraryBuffer, createPage(33));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFullBufferBlocksWriter() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), sizeOfPages(2));
        addPage(createArbitraryBuffer, createPage(1));
        addPage(createArbitraryBuffer, createPage(2));
        enqueuePage(createArbitraryBuffer, createPage(3));
    }

    @Test
    public void testAbort() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        for (int i = 0; i < 10; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        createArbitraryBuffer.setNoMorePages();
        OutputBuffers withBuffer = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        createArbitraryBuffer.abort(FIRST);
        assertQueueClosed(createArbitraryBuffer, 9, FIRST, 0);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
        createArbitraryBuffer.setOutputBuffers(withBuffer.withBuffer(SECOND, 0).withNoMoreBufferIds());
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(1), new Page[0]));
        createArbitraryBuffer.abort(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 0);
        assertFinished(createArbitraryBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFinishClosesEmptyQueues() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), sizeOfPages(10));
        createArbitraryBuffer.setNoMorePages();
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 0);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 0);
        createArbitraryBuffer.abort(FIRST);
        createArbitraryBuffer.abort(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 0);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 0);
    }

    @Test
    public void testAbortFreesReader() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.abort(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testFinishFreesReader() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 1);
        createArbitraryBuffer.abort(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testFinishFreesWriter() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        ListenableFuture<?> enqueuePage = enqueuePage(createArbitraryBuffer, createPage(5));
        ListenableFuture<?> enqueuePage2 = enqueuePage(createArbitraryBuffer, createPage(6));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(1), MAX_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(100)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.setNoMorePages();
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        assertFutureIsDone(enqueuePage);
        assertFutureIsDone(enqueuePage2);
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 1L, sizeOfPages(100), NO_WAIT), bufferResult(1L, createPage(1), createPage(2), createPage(3), createPage(4), createPage(5), createPage(6)));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 7L, sizeOfPages(100), NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 7L, true));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.abort(FIRST);
        assertFinished(createArbitraryBuffer);
    }

    @Test
    public void testDestroyFreesReader() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.destroy();
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testDestroyFreesWriter() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        ListenableFuture<?> enqueuePage = enqueuePage(createArbitraryBuffer, createPage(5));
        ListenableFuture<?> enqueuePage2 = enqueuePage(createArbitraryBuffer, createPage(6));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(1), MAX_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.destroy();
        assertFinished(createArbitraryBuffer);
        assertFutureIsDone(enqueuePage);
        assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testFailDoesNotFreeReader() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(5));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.fail();
        Assert.assertFalse(listenableFuture2.isDone());
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10)).isDone());
    }

    @Test
    public void testFailFreesWriter() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), sizeOfPages(5));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, createPage(i));
        }
        ListenableFuture<?> enqueuePage = enqueuePage(createArbitraryBuffer, createPage(5));
        ListenableFuture<?> enqueuePage2 = enqueuePage(createArbitraryBuffer, createPage(6));
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(1), MAX_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.fail();
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        assertFutureIsDone(enqueuePage);
        assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testAddBufferAfterFail() throws Exception {
        OutputBuffers withBuffer = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(withBuffer, sizeOfPages(5));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, createPage(0));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        createArbitraryBuffer.fail();
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer2);
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10)).isDone());
        Assert.assertFalse(createArbitraryBuffer.get(SECOND, 0L, sizeOfPages(10)).isDone());
        createArbitraryBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, sizeOfPages(10)).isDone());
        Assert.assertFalse(createArbitraryBuffer.get(SECOND, 0L, sizeOfPages(10)).isDone());
    }

    @Test
    public void testBufferCompletion() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Page createPage = createPage(i);
            addPage(createArbitraryBuffer, createPage);
            arrayList.add(createPage);
        }
        createArbitraryBuffer.setNoMorePages();
        assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, sizeOfPages(5), MAX_WAIT), bufferResult(0L, arrayList));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        createArbitraryBuffer.abort(FIRST);
        Assert.assertTrue(createArbitraryBuffer.isFinished());
    }

    @Test
    public void testNoMorePagesFreesReader() throws Exception {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY), sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertFalse(createArbitraryBuffer.isFinished());
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        createArbitraryBuffer.setNoMorePages();
        Assert.assertTrue(listenableFuture.isDone());
        Assert.assertTrue(createArbitraryBuffer.get(FIRST, 0L, sizeOfPages(10)).isDone());
    }

    private static BufferResult getBufferResult(OutputBuffer outputBuffer, OutputBuffers.OutputBufferId outputBufferId, long j, DataSize dataSize, Duration duration) {
        return getFuture(outputBuffer.get(outputBufferId, j, dataSize), duration);
    }

    private static BufferResult getFuture(ListenableFuture<BufferResult> listenableFuture, Duration duration) {
        return (BufferResult) MoreFutures.tryGetFutureValue(listenableFuture, (int) duration.toMillis(), TimeUnit.MILLISECONDS).get();
    }

    private static ListenableFuture<?> enqueuePage(OutputBuffer outputBuffer, Page page) {
        ListenableFuture<?> enqueue = outputBuffer.enqueue(ImmutableList.of(PAGES_SERDE.serialize(page)));
        Assert.assertFalse(enqueue.isDone());
        return enqueue;
    }

    private static void addPage(OutputBuffer outputBuffer, Page page) {
        Assert.assertTrue(outputBuffer.enqueue(ImmutableList.of(PAGES_SERDE.serialize(page))).isDone(), "Expected add page to not block");
    }

    private static void assertQueueState(OutputBuffer outputBuffer, int i, OutputBuffers.OutputBufferId outputBufferId, int i2, int i3) {
        OutputBufferInfo info = outputBuffer.getInfo();
        Assert.assertEquals(info.getTotalBufferedPages() - info.getBuffers().stream().mapToInt((v0) -> {
            return v0.getBufferedPages();
        }).sum(), i, "unassignedPages");
        Assert.assertEquals((BufferInfo) info.getBuffers().stream().filter(bufferInfo -> {
            return bufferInfo.getBufferId().equals(outputBufferId);
        }).findAny().orElse(null), new BufferInfo(outputBufferId, false, i2, i3, new PageBufferInfo(outputBufferId.getId(), i2, sizeOfPages(i2).toBytes(), i2 + i3, i2 + i3)));
    }

    private static void assertQueueClosed(OutputBuffer outputBuffer, int i, OutputBuffers.OutputBufferId outputBufferId, int i2) {
        OutputBufferInfo info = outputBuffer.getInfo();
        Assert.assertEquals(info.getTotalBufferedPages() - info.getBuffers().stream().mapToInt((v0) -> {
            return v0.getBufferedPages();
        }).sum(), i, "unassignedPages");
        BufferInfo bufferInfo = (BufferInfo) info.getBuffers().stream().filter(bufferInfo2 -> {
            return bufferInfo2.getBufferId().equals(outputBufferId);
        }).findAny().orElse(null);
        Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        Assert.assertEquals(bufferInfo.getPagesSent(), i2);
        Assert.assertEquals(bufferInfo.isFinished(), true);
    }

    private ArbitraryOutputBuffer createArbitraryBuffer(OutputBuffers outputBuffers, DataSize dataSize) {
        ArbitraryOutputBuffer arbitraryOutputBuffer = new ArbitraryOutputBuffer(TASK_INSTANCE_ID, new StateMachine("bufferState", this.stateNotificationExecutor, BufferState.OPEN, BufferState.TERMINAL_BUFFER_STATES), dataSize, j -> {
        }, this.stateNotificationExecutor);
        arbitraryOutputBuffer.setOutputBuffers(outputBuffers);
        return arbitraryOutputBuffer;
    }

    private static void assertFinished(OutputBuffer outputBuffer) throws Exception {
        Assert.assertTrue(outputBuffer.isFinished());
        for (BufferInfo bufferInfo : outputBuffer.getInfo().getBuffers()) {
            Assert.assertTrue(bufferInfo.isFinished());
            Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        }
    }

    private static void assertBufferResultEquals(List<? extends Type> list, BufferResult bufferResult, BufferResult bufferResult2) {
        Assert.assertEquals(bufferResult.getSerializedPages().size(), bufferResult2.getSerializedPages().size(), "page count");
        Assert.assertEquals(bufferResult.getToken(), bufferResult2.getToken(), "token");
        for (int i = 0; i < bufferResult.getSerializedPages().size(); i++) {
            Page deserialize = PAGES_SERDE.deserialize((SerializedPage) bufferResult.getSerializedPages().get(i));
            Page deserialize2 = PAGES_SERDE.deserialize((SerializedPage) bufferResult2.getSerializedPages().get(i));
            Assert.assertEquals(deserialize.getChannelCount(), deserialize2.getChannelCount());
            PageAssertions.assertPageEquals(list, deserialize, deserialize2);
        }
        Assert.assertEquals(bufferResult.isBufferComplete(), bufferResult2.isBufferComplete(), "buffer complete");
    }

    private static void assertFutureIsDone(Future<?> future) {
        MoreFutures.tryGetFutureValue(future, 5, TimeUnit.SECONDS);
        Assert.assertTrue(future.isDone());
    }

    private static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return bufferResult(j, ImmutableList.builder().add(page).add(pageArr).build());
    }

    private static BufferResult bufferResult(long j, List<Page> list) {
        Preconditions.checkArgument(!list.isEmpty(), "pages is empty");
        long size = j + list.size();
        Stream<Page> stream = list.stream();
        PagesSerde pagesSerde = PAGES_SERDE;
        pagesSerde.getClass();
        return new BufferResult(TASK_INSTANCE_ID, j, size, false, (List) stream.map(pagesSerde::serialize).collect(Collectors.toList()));
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock(i)});
    }

    private static DataSize sizeOfPages(int i) {
        return new DataSize(BUFFERED_PAGE_SIZE.toBytes() * i, DataSize.Unit.BYTE);
    }
}
