package com.facebook.presto.execution.buffer;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.MemoryReservationHandler;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.spi.page.PagesSerde;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/buffer/TestBroadcastOutputBuffer.class */
public class TestBroadcastOutputBuffer {
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private ScheduledExecutorService stateNotificationExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final OutputBuffers.OutputBufferId FIRST = new OutputBuffers.OutputBufferId(0);
    private static final OutputBuffers.OutputBufferId SECOND = new OutputBuffers.OutputBufferId(1);
    private static final OutputBuffers.OutputBufferId THIRD = new OutputBuffers.OutputBufferId(2);

    /* loaded from: input_file:com/facebook/presto/execution/buffer/TestBroadcastOutputBuffer$MockMemoryReservationHandler.class */
    private static class MockMemoryReservationHandler implements MemoryReservationHandler {
        private ListenableFuture<?> blockedFuture;

        public MockMemoryReservationHandler(ListenableFuture<?> listenableFuture) {
            this.blockedFuture = (ListenableFuture) Objects.requireNonNull(listenableFuture, "blockedFuture is null");
        }

        public ListenableFuture<?> reserveMemory(String str, long j, boolean z) {
            return this.blockedFuture;
        }

        public boolean tryReserveMemory(String str, long j, boolean z) {
            return true;
        }

        public void updateBlockedFuture(ListenableFuture<?> listenableFuture) {
            this.blockedFuture = (ListenableFuture) Objects.requireNonNull(listenableFuture);
        }
    }

    @BeforeClass
    public void setUp() {
        this.stateNotificationExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed("test-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        if (this.stateNotificationExecutor != null) {
            this.stateNotificationExecutor.shutdownNow();
            this.stateNotificationExecutor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() {
        try {
            createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), new DataSize(0.0d, DataSize.Unit.BYTE));
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e) {
        }
        try {
            createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), new DataSize(0.0d, DataSize.Unit.BYTE));
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e2) {
        }
    }

    @Test
    public void testSimple() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        OutputBuffers withBuffer = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0);
        createBroadcastBuffer.setOutputBuffers(withBuffer);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        createBroadcastBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 0, 3);
        for (int i2 = 3; i2 < 10; i2++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i2));
        }
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 7, 3);
        ListenableFuture<?> enqueuePage = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(10));
        Assert.assertFalse(enqueuePage.isDone());
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 8, 3);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 3L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(3L, BufferTestUtils.createPage(3), new Page[0]));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 8, 3);
        Assert.assertFalse(enqueuePage.isDone());
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createBroadcastBuffer.setOutputBuffers(withBuffer2);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 11, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2), BufferTestUtils.createPage(3), BufferTestUtils.createPage(4), BufferTestUtils.createPage(5), BufferTestUtils.createPage(6), BufferTestUtils.createPage(7), BufferTestUtils.createPage(8), BufferTestUtils.createPage(9)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 11, 0);
        createBroadcastBuffer.get(SECOND, 10L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 1, 10);
        createBroadcastBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 8, 3);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 1, 10);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(11));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(12));
        ListenableFuture<?> enqueuePage2 = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(13));
        Assert.assertFalse(enqueuePage2.isDone());
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 11, 3);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 4L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(4L, BufferTestUtils.createPage(4), new Page[0]));
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 10, 4);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 10, 4);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 5L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(5L, BufferTestUtils.createPage(5), new Page[0]));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 9, 5);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 6L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(6L, BufferTestUtils.createPage(6), BufferTestUtils.createPage(7), BufferTestUtils.createPage(8), BufferTestUtils.createPage(9), BufferTestUtils.createPage(10), BufferTestUtils.createPage(11), BufferTestUtils.createPage(12), BufferTestUtils.createPage(13)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 8, 6);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 14L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 14L, true));
        createBroadcastBuffer.abort(FIRST);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 14);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 10L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(10L, BufferTestUtils.createPage(10), BufferTestUtils.createPage(11), BufferTestUtils.createPage(12), BufferTestUtils.createPage(13)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 4, 10);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 14L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 14L, true));
        createBroadcastBuffer.abort(SECOND);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 14);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, SECOND, 14);
        BufferTestUtils.assertFinished(createBroadcastBuffer);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 14L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 14L, true));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 14L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 14L, true));
    }

    @Test
    public void testAcknowledge() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.acknowledgeBufferResult(createBroadcastBuffer, FIRST, 2L);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 1, 2);
        BufferTestUtils.acknowledgeBufferResult(createBroadcastBuffer, FIRST, 3L);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 0, 3);
        try {
            BufferTestUtils.acknowledgeBufferResult(createBroadcastBuffer, FIRST, 4L);
        } catch (IllegalArgumentException e) {
            Assert.assertEquals(e.getMessage(), "Invalid sequence id");
        }
        for (int i2 = 3; i2 < 6; i2++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i2));
        }
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 3);
        createBroadcastBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 3);
    }

    @Test
    public void testSharedBufferFull() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(2));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(1));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(2));
        BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(3));
    }

    @Test
    public void testDuplicateRequests() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        createBroadcastBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 0, 3);
    }

    @Test
    public void testAddQueueAfterCreation() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        try {
            createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds());
            Assert.fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testAddAfterFinish() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        Assert.assertEquals(createBroadcastBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withNoMoreBufferIds());
        Assert.assertTrue(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withNoMoreBufferIds());
        Assert.assertTrue(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withNoMoreBufferIds());
        Assert.assertTrue(createBroadcastBuffer.isFinished());
    }

    @Test
    public void testAddAfterDestroy() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createBroadcastBuffer.destroy();
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        Assert.assertEquals(createBroadcastBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testGetBeforeCreate() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(33));
        Assert.assertTrue(listenableFuture.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(33), new Page[0]));
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = ".*does not contain.*\\[0]")
    public void testSetFinalBuffersWihtoutDeclaringUsedBuffer() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(33));
        createBroadcastBuffer.setNoMorePages();
        Assert.assertTrue(listenableFuture.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(33), new Page[0]));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, true));
        createBroadcastBuffer.abort(FIRST);
        createBroadcastBuffer.setOutputBuffers(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withNoMoreBufferIds());
    }

    @Test(expectedExceptions = {IllegalStateException.class}, expectedExceptionsMessageRegExp = "No more buffers already set")
    public void testUseUndeclaredBufferAfterFinalBuffersSet() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(1));
    }

    @Test
    public void testAbortBeforeCreate() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), BufferTestUtils.sizeOfPages(2));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        createBroadcastBuffer.abort(FIRST);
        Assert.assertTrue(listenableFuture.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFullBufferBlocksWriter() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(2));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(1));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(2));
        BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(3));
    }

    @Test
    public void testAcknowledgementFreesWriters() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(2));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(1));
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(2));
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 2, 0);
        ListenableFuture<?> enqueuePage = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(3));
        Assert.assertFalse(enqueuePage.isDone());
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 3, 0);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 3, 0);
        createBroadcastBuffer.get(FIRST, 2L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        createBroadcastBuffer.get(SECOND, 2L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 1, 2);
    }

    @Test
    public void testAbort() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 10; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.abort(FIRST);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.abort(SECOND);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, SECOND, 0);
        BufferTestUtils.assertFinished(createBroadcastBuffer);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, SECOND, 1L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFinishClosesEmptyQueues() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 0, 0);
        BufferTestUtils.assertQueueState(createBroadcastBuffer, SECOND, 0, 0);
        createBroadcastBuffer.abort(FIRST);
        createBroadcastBuffer.abort(SECOND);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 0);
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, SECOND, 0);
    }

    @Test
    public void testAbortFreesReader() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        Assert.assertTrue(listenableFuture.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createBroadcastBuffer.abort(FIRST);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 1);
    }

    @Test
    public void testFinishFreesReader() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.assertQueueState(createBroadcastBuffer, FIRST, 0, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, true));
    }

    @Test
    public void testFinishFreesWriter() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<?> enqueuePage = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<?> enqueuePage2 = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createBroadcastBuffer.setNoMorePages();
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(100), BufferTestUtils.NO_WAIT), bufferResult(1L, BufferTestUtils.createPage(1), BufferTestUtils.createPage(2), BufferTestUtils.createPage(3), BufferTestUtils.createPage(4), BufferTestUtils.createPage(5), BufferTestUtils.createPage(6)));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 7L, BufferTestUtils.sizeOfPages(100), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 7L, true));
        createBroadcastBuffer.abort(FIRST);
        BufferTestUtils.assertFinished(createBroadcastBuffer);
    }

    @Test
    public void testDestroyFreesReader() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createBroadcastBuffer.destroy();
        BufferTestUtils.assertQueueClosed(createBroadcastBuffer, FIRST, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testDestroyFreesWriter() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<?> enqueuePage = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<?> enqueuePage2 = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createBroadcastBuffer.destroy();
        BufferTestUtils.assertFinished(createBroadcastBuffer);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testFailDoesNotFreeReader() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createBroadcastBuffer.fail();
        Assert.assertFalse(listenableFuture2.isDone());
        Assert.assertFalse(createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
    }

    @Test
    public void testFailFreesWriter() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        for (int i = 0; i < 5; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<?> enqueuePage = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<?> enqueuePage2 = BufferTestUtils.enqueuePage(createBroadcastBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createBroadcastBuffer.fail();
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testAddBufferAfterFail() {
        OutputBuffers withBuffer = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0);
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(withBuffer, BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ListenableFuture listenableFuture = createBroadcastBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createBroadcastBuffer.fail();
        OutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createBroadcastBuffer.setOutputBuffers(withBuffer2);
        Assert.assertFalse(createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
        Assert.assertFalse(createBroadcastBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(10)).isDone());
        createBroadcastBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        Assert.assertFalse(createBroadcastBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
        Assert.assertFalse(createBroadcastBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(10)).isDone());
    }

    @Test
    public void testBufferCompletion() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Page createPage = BufferTestUtils.createPage(i);
            BufferTestUtils.addPage(createBroadcastBuffer, createPage);
            arrayList.add(createPage);
        }
        createBroadcastBuffer.setNoMorePages();
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getBufferResult(createBroadcastBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(5), BufferTestUtils.MAX_WAIT), BufferTestUtils.createBufferResult(TASK_INSTANCE_ID, 0L, arrayList));
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.abort(FIRST);
        Assert.assertTrue(createBroadcastBuffer.isFinished());
    }

    @Test
    public void testSharedBufferBlocking() {
        SettableFuture create = SettableFuture.create();
        AggregatedMemoryContext newRootAggregatedMemoryContext = AggregatedMemoryContext.newRootAggregatedMemoryContext(new MockMemoryReservationHandler(create), 0L);
        Page createPage = BufferTestUtils.createPage(1);
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), new DataSize(PAGES_SERDE.serialize(createPage).getRetainedSizeInBytes() * 2, DataSize.Unit.BYTE), newRootAggregatedMemoryContext, MoreExecutors.directExecutor());
        OutputBufferMemoryManager memoryManager = createBroadcastBuffer.getMemoryManager();
        BufferTestUtils.enqueuePage(createBroadcastBuffer, createPage);
        create.set((Object) null);
        memoryManager.onMemoryAvailable();
        Assert.assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        BufferTestUtils.enqueuePage(createBroadcastBuffer, createPage);
    }

    @Test
    public void testSharedBufferBlocking2() {
        SettableFuture create = SettableFuture.create();
        create.set((Object) null);
        MockMemoryReservationHandler mockMemoryReservationHandler = new MockMemoryReservationHandler(create);
        AggregatedMemoryContext newRootAggregatedMemoryContext = AggregatedMemoryContext.newRootAggregatedMemoryContext(mockMemoryReservationHandler, 0L);
        Page createPage = BufferTestUtils.createPage(1);
        long retainedSizeInBytes = PAGES_SERDE.serialize(createPage).getRetainedSizeInBytes();
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), new DataSize(retainedSizeInBytes * 2, DataSize.Unit.BYTE), newRootAggregatedMemoryContext, MoreExecutors.directExecutor());
        OutputBufferMemoryManager memoryManager = createBroadcastBuffer.getMemoryManager();
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        SettableFuture create2 = SettableFuture.create();
        mockMemoryReservationHandler.updateBlockedFuture(create2);
        memoryManager.updateMemoryUsage(1L);
        create2.set((Object) null);
        memoryManager.onMemoryAvailable();
        Assert.assertFalse(memoryManager.getBufferBlockedFuture().isDone(), "buffer should be blocked");
        memoryManager.updateMemoryUsage(((-retainedSizeInBytes) * 2) - 1);
        Assert.assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        BufferTestUtils.enqueuePage(createBroadcastBuffer, createPage);
    }

    @Test
    public void testSharedBufferBlockingNoBlockOnFull() {
        SettableFuture create = SettableFuture.create();
        AggregatedMemoryContext newRootAggregatedMemoryContext = AggregatedMemoryContext.newRootAggregatedMemoryContext(new MockMemoryReservationHandler(create), 0L);
        Page createPage = BufferTestUtils.createPage(1);
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST), new DataSize(PAGES_SERDE.serialize(createPage).getRetainedSizeInBytes() * 2, DataSize.Unit.BYTE), newRootAggregatedMemoryContext, MoreExecutors.directExecutor());
        OutputBufferMemoryManager memoryManager = createBroadcastBuffer.getMemoryManager();
        memoryManager.setNoBlockOnFull();
        BufferTestUtils.enqueuePage(createBroadcastBuffer, createPage);
        create.set((Object) null);
        memoryManager.onMemoryAvailable();
        Assert.assertTrue(memoryManager.getBufferBlockedFuture().isDone(), "buffer shouldn't be blocked");
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
        BufferTestUtils.addPage(createBroadcastBuffer, createPage);
    }

    private BroadcastOutputBuffer createBroadcastBuffer(OutputBuffers outputBuffers, DataSize dataSize, AggregatedMemoryContext aggregatedMemoryContext, Executor executor) {
        BroadcastOutputBuffer broadcastOutputBuffer = new BroadcastOutputBuffer(TASK_INSTANCE_ID, new StateMachine("bufferState", this.stateNotificationExecutor, BufferState.OPEN, BufferState.TERMINAL_BUFFER_STATES), dataSize, () -> {
            return aggregatedMemoryContext.newLocalMemoryContext("test");
        }, executor);
        broadcastOutputBuffer.setOutputBuffers(outputBuffers);
        broadcastOutputBuffer.registerLifespanCompletionCallback(lifespan -> {
        });
        return broadcastOutputBuffer;
    }

    @Test
    public void testBufferFinishesWhenClientBuffersDestroyed() {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withBuffer(THIRD, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        for (int i = 0; i < 5; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(i));
        }
        createBroadcastBuffer.abort(FIRST);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.abort(SECOND);
        Assert.assertFalse(createBroadcastBuffer.isFinished());
        createBroadcastBuffer.abort(THIRD);
        Assert.assertTrue(createBroadcastBuffer.isFinished());
    }

    @Test
    public void testForceFreeMemory() throws Throwable {
        BroadcastOutputBuffer createBroadcastBuffer = createBroadcastBuffer(OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.BROADCAST).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        for (int i = 0; i < 3; i++) {
            BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(1), 0);
        }
        OutputBufferMemoryManager memoryManager = createBroadcastBuffer.getMemoryManager();
        Assert.assertTrue(memoryManager.getBufferedBytes() > 0);
        createBroadcastBuffer.forceFreeMemory();
        Assert.assertEquals(memoryManager.getBufferedBytes(), 0L);
        BufferTestUtils.addPage(createBroadcastBuffer, BufferTestUtils.createPage(1));
        Assert.assertEquals(memoryManager.getBufferedBytes(), 0L);
    }

    private BroadcastOutputBuffer createBroadcastBuffer(OutputBuffers outputBuffers, DataSize dataSize) {
        BroadcastOutputBuffer broadcastOutputBuffer = new BroadcastOutputBuffer(TASK_INSTANCE_ID, new StateMachine("bufferState", this.stateNotificationExecutor, BufferState.OPEN, BufferState.TERMINAL_BUFFER_STATES), dataSize, () -> {
            return new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test");
        }, this.stateNotificationExecutor);
        broadcastOutputBuffer.setOutputBuffers(outputBuffers);
        broadcastOutputBuffer.registerLifespanCompletionCallback(lifespan -> {
        });
        return broadcastOutputBuffer;
    }

    private static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return BufferTestUtils.createBufferResult(TASK_INSTANCE_ID, j, ImmutableList.builder().add(page).add(pageArr).build());
    }
}
