package com.facebook.presto.execution;

import com.facebook.presto.HashPagePartitionFunction;
import com.facebook.presto.OutputBuffers;
import com.facebook.presto.UnpartitionedPagePartitionFunction;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/TestSharedBuffer.class */
public class TestSharedBuffer {
    private static final int DEFAULT_PARTITION = 0;
    private ScheduledExecutorService stateNotificationExecutor;
    private static final Duration NO_WAIT = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private static final Duration MAX_WAIT = new Duration(1.0d, TimeUnit.SECONDS);
    private static final DataSize PAGE_SIZE = new DataSize(createPage(42).getSizeInBytes(), DataSize.Unit.BYTE);
    private static final TaskId TASK_ID = new TaskId("query", "stage", "task");
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BigintType.BIGINT);
    public static final TaskId FIRST = new TaskId("query", "stage", "first_task");
    public static final TaskId SECOND = new TaskId("query", "stage", "second_task");
    public static final TaskId QUEUE = new TaskId("query", "stage", "queue");
    public static final TaskId FOO = new TaskId("foo", "bar", "baz");

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock(i)});
    }

    public static DataSize sizeOfPages(int i) {
        return new DataSize(PAGE_SIZE.toBytes() * i, DataSize.Unit.BYTE);
    }

    @BeforeClass
    public void setUp() throws Exception {
        this.stateNotificationExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed("test-%s"));
    }

    @AfterClass
    public void tearDown() throws Exception {
        if (this.stateNotificationExecutor != null) {
            this.stateNotificationExecutor.shutdownNow();
            this.stateNotificationExecutor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() throws Exception {
        try {
            new SharedBuffer(TASK_ID, this.stateNotificationExecutor, new DataSize(0.0d, DataSize.Unit.BYTE));
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testSimple() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        for (int i = DEFAULT_PARTITION; i < 3; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        OutputBuffers withBuffer = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        sharedBuffer.get(FIRST, 3L, sizeOfPages(10)).cancel(true);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, DEFAULT_PARTITION, 3, 3, 3, DEFAULT_PARTITION);
        for (int i2 = 3; i2 < 10; i2++) {
            addPage(sharedBuffer, createPage(i2));
        }
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, DEFAULT_PARTITION);
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(10));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, 1);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 3L, sizeOfPages(1), NO_WAIT), bufferResult(3L, createPage(3), new Page[DEFAULT_PARTITION]));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, 1);
        Assert.assertFalse(enqueuePage.isDone());
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer2);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 10, DEFAULT_PARTITION, 10, 10, 1);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2), createPage(3), createPage(4), createPage(5), createPage(6), createPage(7), createPage(8), createPage(9)));
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 10, DEFAULT_PARTITION, 10, 10, 1);
        sharedBuffer.get(SECOND, 10L, sizeOfPages(10)).cancel(true);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, DEFAULT_PARTITION, 10, 10, 10, 1);
        sharedBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        enqueuePage.get(1L, TimeUnit.SECONDS);
        addPage(sharedBuffer, createPage(11));
        addPage(sharedBuffer, createPage(12));
        ListenableFuture<?> enqueuePage2 = enqueuePage(sharedBuffer, createPage(13));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 10, 3, 10, 13, 1);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 3, 10, 10, 13, 1);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 4L, sizeOfPages(1), NO_WAIT), bufferResult(4L, createPage(4), new Page[DEFAULT_PARTITION]));
        enqueuePage2.get(1L, TimeUnit.SECONDS);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 10, 4, 10, 14, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 4, 10, 10, 14, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setNoMorePages();
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 10, 4, 10, 14, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 4, 10, 10, 14, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 5L, sizeOfPages(1), NO_WAIT), bufferResult(5L, createPage(5), new Page[DEFAULT_PARTITION]));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 9, 5, 9, 14, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 4, 10, 9, 14, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 6L, sizeOfPages(10), NO_WAIT), bufferResult(6L, createPage(6), createPage(7), createPage(8), createPage(9), createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 8, 6, 8, 14, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
        assertQueueClosed(sharedBuffer, FIRST, 14);
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 4, 10, 4, 14, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 10L, sizeOfPages(10), NO_WAIT), bufferResult(10L, createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(sharedBuffer, SECOND, DEFAULT_PARTITION, 4, 10, 4, 14, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
        assertQueueClosed(sharedBuffer, FIRST, 14);
        assertQueueClosed(sharedBuffer, SECOND, 14);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
    }

    @Test
    public void testSharedBufferFull() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(2));
        addPage(sharedBuffer, createPage(1), DEFAULT_PARTITION);
        addPage(sharedBuffer, createPage(2), 1);
        enqueuePage(sharedBuffer, createPage(3), 1);
    }

    @Test
    public void testDeqeueueOnAcknowledgement() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(2));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new HashPagePartitionFunction(DEFAULT_PARTITION, 2, Ints.asList(new int[]{DEFAULT_PARTITION}), Optional.empty(), ImmutableList.of(BigintType.BIGINT))).withBuffer(SECOND, new HashPagePartitionFunction(1, 2, Ints.asList(new int[]{DEFAULT_PARTITION}), Optional.empty(), ImmutableList.of(BigintType.BIGINT))).withNoMoreBufferIds());
        addPage(sharedBuffer, createPage(1), DEFAULT_PARTITION);
        addPage(sharedBuffer, createPage(2), DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 2, DEFAULT_PARTITION, 2, 2, DEFAULT_PARTITION);
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(3), 1);
        Assert.assertFalse(enqueuePage.isDone());
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 2, DEFAULT_PARTITION, 2, 2, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, 1, DEFAULT_PARTITION, DEFAULT_PARTITION, DEFAULT_PARTITION, DEFAULT_PARTITION, 1);
        sharedBuffer.get(FIRST, 2L, sizeOfPages(10)).cancel(true);
        Assert.assertTrue(enqueuePage.isDone());
        assertQueueState(sharedBuffer, SECOND, 1, 1, DEFAULT_PARTITION, 1, 1, DEFAULT_PARTITION);
    }

    @Test
    public void testSimplePartitioned() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(20));
        for (int i = DEFAULT_PARTITION; i < 3; i++) {
            addPage(sharedBuffer, createPage(i), DEFAULT_PARTITION);
            addPage(sharedBuffer, createPage(i), 1);
        }
        OutputBuffers withBuffer = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new HashPagePartitionFunction(DEFAULT_PARTITION, 2, Ints.asList(new int[]{DEFAULT_PARTITION}), Optional.empty(), ImmutableList.of(BigintType.BIGINT)));
        sharedBuffer.setOutputBuffers(withBuffer);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        sharedBuffer.get(FIRST, 3L, sizeOfPages(10)).cancel(true);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, DEFAULT_PARTITION, 3, 3, 3, DEFAULT_PARTITION);
        for (int i2 = 3; i2 < 10; i2++) {
            addPage(sharedBuffer, createPage(i2), DEFAULT_PARTITION);
            addPage(sharedBuffer, createPage(i2), 1);
        }
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, DEFAULT_PARTITION);
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(10), DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, 1);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 3L, sizeOfPages(1), NO_WAIT), bufferResult(3L, createPage(3), new Page[DEFAULT_PARTITION]));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 7, 3, 10, 10, 1);
        Assert.assertFalse(enqueuePage.isDone());
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, new HashPagePartitionFunction(1, 2, Ints.asList(new int[]{DEFAULT_PARTITION}), Optional.empty(), ImmutableList.of(BigintType.BIGINT)));
        sharedBuffer.setOutputBuffers(withBuffer2);
        assertQueueState(sharedBuffer, SECOND, 1, 10, DEFAULT_PARTITION, 10, 10, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2), createPage(3), createPage(4), createPage(5), createPage(6), createPage(7), createPage(8), createPage(9)));
        assertQueueState(sharedBuffer, SECOND, 1, 10, DEFAULT_PARTITION, 10, 10, DEFAULT_PARTITION);
        sharedBuffer.get(SECOND, 10L, sizeOfPages(10)).cancel(true);
        assertQueueState(sharedBuffer, SECOND, 1, DEFAULT_PARTITION, 10, 10, 10, DEFAULT_PARTITION);
        sharedBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        enqueuePage.get(1L, TimeUnit.SECONDS);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 8, 3, 8, 11, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, 1, DEFAULT_PARTITION, 10, DEFAULT_PARTITION, 10, DEFAULT_PARTITION);
        addPage(sharedBuffer, createPage(11), DEFAULT_PARTITION);
        addPage(sharedBuffer, createPage(12), DEFAULT_PARTITION);
        addPage(sharedBuffer, createPage(13), DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 11, 3, 11, 14, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, 1, DEFAULT_PARTITION, 10, DEFAULT_PARTITION, 10, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 4L, sizeOfPages(1), NO_WAIT), bufferResult(4L, createPage(4), new Page[DEFAULT_PARTITION]));
        enqueuePage.get(1L, TimeUnit.SECONDS);
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 10, 4, 10, 14, DEFAULT_PARTITION);
        assertQueueState(sharedBuffer, SECOND, 1, DEFAULT_PARTITION, 10, DEFAULT_PARTITION, 10, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setNoMorePages();
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 10, 4, 10, 14, DEFAULT_PARTITION);
        assertQueueClosed(sharedBuffer, SECOND, 10);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 5L, sizeOfPages(1), NO_WAIT), bufferResult(5L, createPage(5), new Page[DEFAULT_PARTITION]));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 9, 5, 9, 14, DEFAULT_PARTITION);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 6L, sizeOfPages(10), NO_WAIT), bufferResult(6L, createPage(6), createPage(7), createPage(8), createPage(9), createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 8, 6, 8, 14, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
        assertQueueClosed(sharedBuffer, FIRST, 14);
        assertFinished(sharedBuffer);
    }

    public static BufferResult getBufferResult(SharedBuffer sharedBuffer, TaskId taskId, long j, DataSize dataSize, Duration duration) {
        return getFuture(sharedBuffer.get(taskId, j, dataSize), duration);
    }

    public static BufferResult getFuture(ListenableFuture<BufferResult> listenableFuture, Duration duration) {
        try {
            return (BufferResult) listenableFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw Throwables.propagate(e);
        } catch (ExecutionException e2) {
            throw Throwables.propagate(e2.getCause());
        } catch (TimeoutException e3) {
            throw Throwables.propagate(e3);
        }
    }

    @Test
    public void testDuplicateRequests() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        for (int i = DEFAULT_PARTITION; i < 3; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction()));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, 3, DEFAULT_PARTITION, 3, 3, DEFAULT_PARTITION);
        sharedBuffer.get(FIRST, 3L, sizeOfPages(10)).cancel(true);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(0L, false));
        assertQueueState(sharedBuffer, FIRST, DEFAULT_PARTITION, DEFAULT_PARTITION, 3, 3, 3, DEFAULT_PARTITION);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        try {
            sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FOO, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
            Assert.fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testAddQueueAfterDestroy() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FOO, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
    }

    @Test
    public void testGetBeforeCreate() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        Assert.assertFalse(sharedBuffer.isFinished());
        ListenableFuture listenableFuture = sharedBuffer.get(FIRST, 0L, sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(sharedBuffer, createPage(33));
        Assert.assertFalse(listenableFuture.isDone());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction()));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(33), new Page[DEFAULT_PARTITION]));
    }

    @Test
    public void testAbortBeforeCreate() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        Assert.assertFalse(sharedBuffer.isFinished());
        ListenableFuture listenableFuture = sharedBuffer.get(FIRST, 0L, sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        sharedBuffer.abort(FIRST);
        addPage(sharedBuffer, createPage(33));
        Assert.assertFalse(listenableFuture.isDone());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction()));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), BufferResult.emptyResults(0L, true));
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(0L, true));
    }

    @Test
    public void testAddStateMachine() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        sharedBuffer.setNoMorePages();
        addPage(sharedBuffer, createPage(DEFAULT_PARTITION));
        addPage(sharedBuffer, createPage(DEFAULT_PARTITION));
        Assert.assertEquals(sharedBuffer.getInfo().getTotalPagesSent(), 0L);
        SharedBuffer sharedBuffer2 = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        sharedBuffer2.destroy();
        addPage(sharedBuffer2, createPage(DEFAULT_PARTITION));
        addPage(sharedBuffer2, createPage(DEFAULT_PARTITION));
        Assert.assertEquals(sharedBuffer2.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testAbort() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        for (int i = DEFAULT_PARTITION; i < 10; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        sharedBuffer.setNoMorePages();
        OutputBuffers withBuffer = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        sharedBuffer.abort(FIRST);
        assertQueueClosed(sharedBuffer, FIRST, DEFAULT_PARTITION);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, FIRST, 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(1L, true));
        sharedBuffer.setOutputBuffers(withBuffer.withBuffer(SECOND, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        sharedBuffer.abort(SECOND);
        assertQueueClosed(sharedBuffer, SECOND, DEFAULT_PARTITION);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, SECOND, 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(0L, true));
    }

    @Test
    public void testFinishClosesEmptyQueues() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(10));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, new UnpartitionedPagePartitionFunction()).withBuffer(SECOND, new UnpartitionedPagePartitionFunction()));
        sharedBuffer.setNoMorePages();
        assertQueueClosed(sharedBuffer, FIRST, DEFAULT_PARTITION);
        assertQueueClosed(sharedBuffer, SECOND, DEFAULT_PARTITION);
    }

    @Test
    public void testAbortFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse(sharedBuffer.isFinished());
        ListenableFuture listenableFuture = sharedBuffer.get(QUEUE, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(sharedBuffer, createPage(DEFAULT_PARTITION));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        ListenableFuture listenableFuture2 = sharedBuffer.get(QUEUE, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        sharedBuffer.abort(QUEUE);
        assertQueueClosed(sharedBuffer, QUEUE, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(1L, true));
    }

    @Test
    public void testFinishFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse(sharedBuffer.isFinished());
        ListenableFuture listenableFuture = sharedBuffer.get(QUEUE, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(sharedBuffer, createPage(DEFAULT_PARTITION));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        ListenableFuture listenableFuture2 = sharedBuffer.get(QUEUE, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        sharedBuffer.setNoMorePages();
        assertQueueClosed(sharedBuffer, QUEUE, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(1L, true));
    }

    @Test
    public void testFinishFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        for (int i = DEFAULT_PARTITION; i < 5; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(5));
        ListenableFuture<?> enqueuePage2 = enqueuePage(sharedBuffer, createPage(6));
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, QUEUE, 0L, sizeOfPages(1), MAX_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        sharedBuffer.get(QUEUE, 1L, sizeOfPages(1)).cancel(true);
        Assert.assertTrue(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        sharedBuffer.setNoMorePages();
        Assert.assertFalse(sharedBuffer.isFinished());
        Assert.assertTrue(enqueuePage2.isDone());
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, QUEUE, 1L, sizeOfPages(100), NO_WAIT), bufferResult(1L, createPage(1), createPage(2), createPage(3), createPage(4), createPage(5)));
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, QUEUE, 6L, sizeOfPages(100), NO_WAIT), BufferResult.emptyResults(6L, true));
        assertFinished(sharedBuffer);
    }

    @Test
    public void testDestroyFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        ListenableFuture listenableFuture = sharedBuffer.get(QUEUE, 0L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(sharedBuffer, createPage(DEFAULT_PARTITION));
        assertBufferResultEquals(TYPES, getFuture(listenableFuture, NO_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        ListenableFuture listenableFuture2 = sharedBuffer.get(QUEUE, 1L, sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        sharedBuffer.destroy();
        assertQueueClosed(sharedBuffer, QUEUE, 1);
        assertBufferResultEquals(TYPES, getFuture(listenableFuture2, NO_WAIT), BufferResult.emptyResults(1L, true));
    }

    @Test
    public void testDestroyFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.stateNotificationExecutor, sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        for (int i = DEFAULT_PARTITION; i < 5; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(5));
        ListenableFuture<?> enqueuePage2 = enqueuePage(sharedBuffer, createPage(6));
        assertBufferResultEquals(TYPES, getBufferResult(sharedBuffer, QUEUE, 0L, sizeOfPages(1), MAX_WAIT), bufferResult(0L, createPage(DEFAULT_PARTITION), new Page[DEFAULT_PARTITION]));
        sharedBuffer.get(QUEUE, 1L, sizeOfPages(1)).cancel(true);
        Assert.assertTrue(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        Assert.assertTrue(enqueuePage2.isDone());
    }

    private static ListenableFuture<?> enqueuePage(SharedBuffer sharedBuffer, Page page) {
        return enqueuePage(sharedBuffer, page, DEFAULT_PARTITION);
    }

    private static ListenableFuture<?> enqueuePage(SharedBuffer sharedBuffer, Page page, int i) {
        ListenableFuture<?> enqueue = sharedBuffer.enqueue(i, page);
        Assert.assertFalse(enqueue.isDone());
        return enqueue;
    }

    private static void addPage(SharedBuffer sharedBuffer, Page page) {
        addPage(sharedBuffer, page, DEFAULT_PARTITION);
    }

    private static void addPage(SharedBuffer sharedBuffer, Page page, int i) {
        Assert.assertTrue(sharedBuffer.enqueue(i, page).isDone());
    }

    private static void assertQueueState(SharedBuffer sharedBuffer, TaskId taskId, int i, int i2, int i3, int i4, int i5, int i6) {
        Assert.assertEquals(getBufferInfo(sharedBuffer, taskId), new BufferInfo(taskId, false, i2, i3, new PageBufferInfo(i, i4, i6, sizeOfPages(i4).toBytes(), i5)));
    }

    private static void assertQueueClosed(SharedBuffer sharedBuffer, TaskId taskId, int i) {
        BufferInfo bufferInfo = getBufferInfo(sharedBuffer, taskId);
        Assert.assertEquals(bufferInfo.getBufferedPages(), DEFAULT_PARTITION);
        Assert.assertEquals(bufferInfo.getPagesSent(), i);
        Assert.assertEquals(bufferInfo.isFinished(), true);
    }

    private static BufferInfo getBufferInfo(SharedBuffer sharedBuffer, TaskId taskId) {
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            if (bufferInfo.getBufferId().equals(taskId)) {
                return bufferInfo;
            }
        }
        return null;
    }

    private static void assertFinished(SharedBuffer sharedBuffer) throws Exception {
        Assert.assertTrue(sharedBuffer.isFinished());
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            Assert.assertTrue(bufferInfo.isFinished());
            Assert.assertEquals(bufferInfo.getBufferedPages(), DEFAULT_PARTITION);
        }
    }

    private static void assertBufferResultEquals(List<? extends Type> list, BufferResult bufferResult, BufferResult bufferResult2) {
        Assert.assertEquals(bufferResult.getPages().size(), bufferResult2.getPages().size());
        Assert.assertEquals(bufferResult.getToken(), bufferResult2.getToken());
        for (int i = DEFAULT_PARTITION; i < bufferResult.getPages().size(); i++) {
            Page page = (Page) bufferResult.getPages().get(i);
            Page page2 = (Page) bufferResult2.getPages().get(i);
            Assert.assertEquals(page.getChannelCount(), page2.getChannelCount());
            PageAssertions.assertPageEquals(list, page, page2);
        }
        Assert.assertEquals(bufferResult.isBufferClosed(), bufferResult2.isBufferClosed());
    }

    public static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return new BufferResult(j, j + r0.size(), false, ImmutableList.builder().add(page).add(pageArr).build());
    }
}
