package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.UnpartitionedPagePartitionFunction;
import com.facebook.presto.block.Block;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.operator.Page;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/execution/TestSharedBuffer.class */
public class TestSharedBuffer {
    private static final Duration NO_WAIT = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private static final Duration MAX_WAIT = new Duration(1.0d, TimeUnit.SECONDS);
    private static final DataSize PAGE_SIZE = createPage(42).getDataSize();
    private static final TaskId TASK_ID = new TaskId("query", "stage", "task");
    private static final OutputBuffers CLOSED_OUTPUT_BUFFERS = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds();
    private ExecutorService executor;

    /* loaded from: input_file:com/facebook/presto/execution/TestSharedBuffer$AddPagesJob.class */
    private static class AddPagesJob implements Runnable {
        private final SharedBuffer sharedBuffer;
        private final ArrayBlockingQueue<Page> elements;
        private final CountDownLatch started;
        private final CountDownLatch finished;

        private AddPagesJob(SharedBuffer sharedBuffer, Page... pageArr) {
            this.started = new CountDownLatch(1);
            this.finished = new CountDownLatch(1);
            this.sharedBuffer = sharedBuffer;
            this.elements = new ArrayBlockingQueue<>(pageArr.length);
            Collections.addAll(this.elements, pageArr);
        }

        public void assertBlockedWithCount(int i) {
            Assert.assertTrue(isStarted());
            Assert.assertTrue(!isFinished());
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(this.elements.size(), i);
            Assert.assertTrue(isStarted());
            Assert.assertTrue(!isFinished());
        }

        private boolean isFinished() {
            return this.finished.getCount() == 0;
        }

        private boolean isStarted() {
            return this.started.getCount() == 0;
        }

        public void waitForStarted() throws InterruptedException {
            Assert.assertTrue(this.started.await(1L, TimeUnit.SECONDS), "Job did not start with in 1 second");
        }

        public void waitForFinished() throws InterruptedException {
            long millis = TestSharedBuffer.MAX_WAIT.toMillis() * 3;
            Assert.assertTrue(this.finished.await(millis, TimeUnit.MILLISECONDS), "Job did not finish with in " + millis + " ms");
        }

        @Override // java.lang.Runnable
        public void run() {
            this.started.countDown();
            try {
                Page peek = this.elements.peek();
                while (peek != null) {
                    try {
                        this.sharedBuffer.enqueue(peek).get();
                        Assert.assertNotNull(this.elements.poll());
                        peek = this.elements.peek();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e);
                    } catch (ExecutionException e2) {
                        throw Throwables.propagate(e2);
                    }
                }
            } finally {
                this.finished.countDown();
            }
        }
    }

    /* loaded from: input_file:com/facebook/presto/execution/TestSharedBuffer$GetPagesJob.class */
    private static class GetPagesJob implements Runnable {
        private final SharedBuffer sharedBuffer;
        private final int pagesToGet;
        private final int batchSize;
        private long sequenceId;
        private final AtomicReference<FailedQueryException> failedQueryException;
        private final CopyOnWriteArrayList<Page> elements;
        private final CountDownLatch started;
        private final CountDownLatch finished;

        private GetPagesJob(SharedBuffer sharedBuffer, long j, int i, int i2) {
            this.failedQueryException = new AtomicReference<>();
            this.elements = new CopyOnWriteArrayList<>();
            this.started = new CountDownLatch(1);
            this.finished = new CountDownLatch(1);
            this.sharedBuffer = sharedBuffer;
            this.sequenceId = j;
            this.pagesToGet = i;
            this.batchSize = i2;
        }

        public List<Page> getElements() {
            return ImmutableList.copyOf(this.elements);
        }

        public void assertBlockedWithCount(int i) {
            Assert.assertTrue(isStarted());
            Assert.assertTrue(!isFinished());
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(this.elements.size(), i);
            Assert.assertTrue(isStarted());
            Assert.assertTrue(!isFinished());
        }

        private boolean isFinished() {
            return this.finished.getCount() == 0;
        }

        private boolean isStarted() {
            return this.started.getCount() == 0;
        }

        public void waitForStarted() throws InterruptedException {
            Assert.assertTrue(this.started.await(1L, TimeUnit.SECONDS), "Job did not start with in 1 second");
        }

        public void waitForFinished() throws InterruptedException {
            long millis = TestSharedBuffer.MAX_WAIT.toMillis() * 3;
            Assert.assertTrue(this.finished.await(millis, TimeUnit.MILLISECONDS), "Job did not finish with in " + millis + " ms");
        }

        @Override // java.lang.Runnable
        public void run() {
            this.started.countDown();
            while (this.elements.size() < this.pagesToGet) {
                try {
                    try {
                        BufferResult bufferResult = this.sharedBuffer.get("queue", this.sequenceId, TestSharedBuffer.sizeOfPages(this.batchSize), TestSharedBuffer.MAX_WAIT);
                        Assert.assertTrue(!bufferResult.isEmpty());
                        this.elements.addAll(bufferResult.getPages());
                        this.sequenceId = bufferResult.getToken() + bufferResult.getPages().size();
                        this.sharedBuffer.acknowledge("queue", this.sequenceId);
                    } catch (FailedQueryException e) {
                        this.failedQueryException.set(e);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw Throwables.propagate(e2);
                    }
                } finally {
                    this.finished.countDown();
                }
            }
        }
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock(i)});
    }

    public static DataSize sizeOfPages(int i) {
        return new DataSize(PAGE_SIZE.toBytes() * i, DataSize.Unit.BYTE);
    }

    @BeforeClass
    public void setUp() throws Exception {
        this.executor = Executors.newCachedThreadPool();
    }

    @AfterClass
    public void tearDown() throws Exception {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() throws Exception {
        try {
            new SharedBuffer(TASK_ID, this.executor, new DataSize(0.0d, DataSize.Unit.BYTE), CLOSED_OUTPUT_BUFFERS);
            Assert.fail("Expected IllegalStateException");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testSimple() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS);
        for (int i = 0; i < 3; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        OutputBuffers withBuffer = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("first", new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer);
        assertQueueState(sharedBuffer, "first", 3, 0);
        assertBufferResultEquals(sharedBuffer.get("first", 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, "first", 3, 0);
        assertBufferResultEquals(sharedBuffer.get("first", 3L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(3L, false));
        assertQueueState(sharedBuffer, "first", 0, 3);
        for (int i2 = 3; i2 < 10; i2++) {
            addPage(sharedBuffer, createPage(i2));
        }
        assertQueueState(sharedBuffer, "first", 7, 3);
        ListenableFuture<?> enqueuePage = enqueuePage(sharedBuffer, createPage(10));
        assertBufferResultEquals(sharedBuffer.get("first", 3L, sizeOfPages(1), NO_WAIT), bufferResult(3L, createPage(3), new Page[0]));
        assertQueueState(sharedBuffer, "first", 7, 3);
        Assert.assertFalse(enqueuePage.isDone());
        OutputBuffers withBuffer2 = withBuffer.withBuffer("second", new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer2);
        assertQueueState(sharedBuffer, "second", 10, 0);
        assertBufferResultEquals(sharedBuffer.get("second", 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2), createPage(3), createPage(4), createPage(5), createPage(6), createPage(7), createPage(8), createPage(9)));
        assertQueueState(sharedBuffer, "second", 10, 0);
        assertBufferResultEquals(sharedBuffer.get("second", 10L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(10L, false));
        assertQueueState(sharedBuffer, "second", 0, 10);
        sharedBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        enqueuePage.get(1L, TimeUnit.SECONDS);
        addPage(sharedBuffer, createPage(11));
        addPage(sharedBuffer, createPage(12));
        ListenableFuture<?> enqueuePage2 = enqueuePage(sharedBuffer, createPage(13));
        assertQueueState(sharedBuffer, "first", 10, 3);
        assertQueueState(sharedBuffer, "second", 3, 10);
        assertBufferResultEquals(sharedBuffer.get("first", 4L, sizeOfPages(1), NO_WAIT), bufferResult(4L, createPage(4), new Page[0]));
        enqueuePage2.get(1L, TimeUnit.SECONDS);
        assertQueueState(sharedBuffer, "first", 10, 4);
        assertQueueState(sharedBuffer, "second", 4, 10);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.finish();
        assertQueueState(sharedBuffer, "first", 10, 4);
        assertQueueState(sharedBuffer, "second", 4, 10);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(sharedBuffer.get("first", 5L, sizeOfPages(1), NO_WAIT), bufferResult(5L, createPage(5), new Page[0]));
        assertQueueState(sharedBuffer, "first", 9, 5);
        assertQueueState(sharedBuffer, "second", 4, 10);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(sharedBuffer.get("first", 6L, sizeOfPages(10), NO_WAIT), bufferResult(6L, createPage(6), createPage(7), createPage(8), createPage(9), createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(sharedBuffer, "first", 8, 6);
        assertBufferResultEquals(sharedBuffer.get("first", 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, false));
        assertQueueClosed(sharedBuffer, "first", 14);
        assertQueueState(sharedBuffer, "second", 4, 10);
        Assert.assertFalse(sharedBuffer.isFinished());
        assertBufferResultEquals(sharedBuffer.get("second", 10L, sizeOfPages(10), NO_WAIT), bufferResult(10L, createPage(10), createPage(11), createPage(12), createPage(13)));
        assertQueueState(sharedBuffer, "second", 4, 10);
        assertBufferResultEquals(sharedBuffer.get("second", 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, false));
        assertQueueClosed(sharedBuffer, "first", 14);
        assertQueueClosed(sharedBuffer, "second", 14);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(sharedBuffer.get("first", 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
        assertBufferResultEquals(sharedBuffer.get("second", 14L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(14L, true));
    }

    @Test
    public void testDuplicateRequests() throws Exception {
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), outputBuffers);
        for (int i = 0; i < 3; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        sharedBuffer.setOutputBuffers(outputBuffers.withBuffer("first", new UnpartitionedPagePartitionFunction()));
        assertQueueState(sharedBuffer, "first", 3, 0);
        assertBufferResultEquals(sharedBuffer.get("first", 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, "first", 3, 0);
        assertBufferResultEquals(sharedBuffer.get("first", 0L, sizeOfPages(10), NO_WAIT), bufferResult(0L, createPage(0), createPage(1), createPage(2)));
        assertQueueState(sharedBuffer, "first", 3, 0);
        sharedBuffer.acknowledge("first", 3L);
        assertBufferResultEquals(sharedBuffer.get("first", 0L, sizeOfPages(10), NO_WAIT), BufferResult.emptyResults(3L, false));
        assertQueueState(sharedBuffer, "first", 0, 3);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() throws Exception {
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), outputBuffers);
        Assert.assertFalse(sharedBuffer.isFinished());
        OutputBuffers withNoMoreBufferIds = outputBuffers.withNoMoreBufferIds();
        sharedBuffer.setOutputBuffers(withNoMoreBufferIds);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(withNoMoreBufferIds);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(withNoMoreBufferIds);
        Assert.assertFalse(sharedBuffer.isFinished());
        try {
            sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("foo", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
            Assert.fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testAddQueueAfterDestroy() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS);
        Assert.assertFalse(sharedBuffer.isFinished());
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("foo", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
    }

    @Test
    public void testOperationsOnUnknownQueues() throws Exception {
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), outputBuffers);
        Assert.assertFalse(sharedBuffer.isFinished());
        try {
            sharedBuffer.get("unknown", 0L, sizeOfPages(1), NO_WAIT);
            Assert.fail("Expected NoSuchBufferException from operation on unknown queue");
        } catch (NoSuchBufferException e) {
        }
        sharedBuffer.abort("unknown");
        sharedBuffer.finish();
        try {
            sharedBuffer.get("unknown", 0L, sizeOfPages(1), NO_WAIT);
            Assert.fail("Expected NoSuchBufferException from operation on unknown queue");
        } catch (NoSuchBufferException e2) {
        }
        sharedBuffer.abort("unknown");
        sharedBuffer.setOutputBuffers(outputBuffers.withNoMoreBufferIds());
        try {
            sharedBuffer.get("unknown", 0L, sizeOfPages(1), NO_WAIT);
            Assert.fail("Expected NoSuchBufferException from operation on unknown queue");
        } catch (NoSuchBufferException e3) {
        }
        sharedBuffer.abort("unknown");
        sharedBuffer.destroy();
        try {
            sharedBuffer.get("unknown", 0L, sizeOfPages(1), NO_WAIT);
            Assert.fail("Expected NoSuchBufferException from operation on unknown queue");
        } catch (NoSuchBufferException e4) {
        }
        sharedBuffer.abort("unknown");
    }

    @Test
    public void testAddStateMachine() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS);
        sharedBuffer.finish();
        addPage(sharedBuffer, createPage(0));
        addPage(sharedBuffer, createPage(0));
        Assert.assertEquals(sharedBuffer.getInfo().getPagesAdded(), 0L);
        SharedBuffer sharedBuffer2 = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS);
        sharedBuffer2.destroy();
        addPage(sharedBuffer2, createPage(0));
        addPage(sharedBuffer2, createPage(0));
        Assert.assertEquals(sharedBuffer2.getInfo().getPagesAdded(), 0L);
    }

    @Test
    public void testAbort() throws Exception {
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), outputBuffers);
        for (int i = 0; i < 10; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        sharedBuffer.finish();
        OutputBuffers withBuffer = outputBuffers.withBuffer("first", new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(withBuffer);
        assertBufferResultEquals(sharedBuffer.get("first", 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        sharedBuffer.abort("first");
        assertQueueClosed(sharedBuffer, "first", 0);
        assertBufferResultEquals(sharedBuffer.get("first", 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(1L, true));
        sharedBuffer.setOutputBuffers(withBuffer.withBuffer("second", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        assertBufferResultEquals(sharedBuffer.get("second", 0L, sizeOfPages(1), NO_WAIT), bufferResult(0L, createPage(0), new Page[0]));
        sharedBuffer.abort("second");
        assertQueueClosed(sharedBuffer, "second", 0);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(sharedBuffer.get("second", 1L, sizeOfPages(1), NO_WAIT), BufferResult.emptyResults(0L, true));
    }

    @Test
    public void testFinishClosesEmptyQueues() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(10), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("first", new UnpartitionedPagePartitionFunction()).withBuffer("second", new UnpartitionedPagePartitionFunction()));
        sharedBuffer.finish();
        assertQueueClosed(sharedBuffer, "first", 0);
        assertQueueClosed(sharedBuffer, "second", 0);
    }

    @Test
    public void testAbortFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(5), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("queue", new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse(sharedBuffer.isFinished());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        GetPagesJob getPagesJob = new GetPagesJob(sharedBuffer, 0L, 2, 1);
        newCachedThreadPool.submit(getPagesJob);
        getPagesJob.waitForStarted();
        getPagesJob.assertBlockedWithCount(0);
        addPage(sharedBuffer, createPage(0));
        getPagesJob.assertBlockedWithCount(1);
        sharedBuffer.abort("queue");
        assertQueueClosed(sharedBuffer, "queue", 1);
        getPagesJob.waitForFinished();
        Assert.assertEquals(getPagesJob.getElements().size(), 1);
    }

    @Test
    public void testFinishFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(5), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("queue", new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse(sharedBuffer.isFinished());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        GetPagesJob getPagesJob = new GetPagesJob(sharedBuffer, 0L, 2, 1);
        newCachedThreadPool.submit(getPagesJob);
        getPagesJob.waitForStarted();
        getPagesJob.assertBlockedWithCount(0);
        addPage(sharedBuffer, createPage(0));
        getPagesJob.assertBlockedWithCount(1);
        sharedBuffer.finish();
        assertQueueClosed(sharedBuffer, "queue", 1);
        getPagesJob.waitForFinished();
        Assert.assertEquals(getPagesJob.getElements().size(), 1);
    }

    @Test
    public void testFinishFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(5), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("queue", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        AddPagesJob addPagesJob = new AddPagesJob(sharedBuffer, new Page[]{createPage(2), createPage(3)});
        newCachedThreadPool.submit(addPagesJob);
        addPagesJob.waitForStarted();
        addPagesJob.assertBlockedWithCount(2);
        Assert.assertEquals(sharedBuffer.get("queue", 0L, sizeOfPages(1), MAX_WAIT).size(), 1);
        sharedBuffer.acknowledge("queue", 1L);
        addPagesJob.assertBlockedWithCount(1);
        sharedBuffer.finish();
        Assert.assertFalse(sharedBuffer.isFinished());
        addPagesJob.waitForFinished();
        Assert.assertEquals(sharedBuffer.get("queue", 1L, sizeOfPages(100), MAX_WAIT).size(), 5);
        sharedBuffer.acknowledge("queue", 5L);
        assertFinished(sharedBuffer);
    }

    @Test
    public void testDestroyFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(5), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("queue", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        GetPagesJob getPagesJob = new GetPagesJob(sharedBuffer, 0L, 2, 1);
        newCachedThreadPool.submit(getPagesJob);
        getPagesJob.waitForStarted();
        getPagesJob.assertBlockedWithCount(0);
        addPage(sharedBuffer, createPage(0));
        getPagesJob.assertBlockedWithCount(1);
        sharedBuffer.destroy();
        assertQueueClosed(sharedBuffer, "queue", 1);
        getPagesJob.waitForFinished();
        Assert.assertEquals(getPagesJob.getElements().size(), 1);
    }

    @Test
    public void testDestroyFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, this.executor, sizeOfPages(5), OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("queue", new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse(sharedBuffer.isFinished());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            addPage(sharedBuffer, createPage(i));
        }
        AddPagesJob addPagesJob = new AddPagesJob(sharedBuffer, new Page[]{createPage(2), createPage(3)});
        newCachedThreadPool.submit(addPagesJob);
        addPagesJob.waitForStarted();
        addPagesJob.assertBlockedWithCount(2);
        Assert.assertEquals(sharedBuffer.get("queue", 0L, sizeOfPages(1), MAX_WAIT).size(), 1);
        sharedBuffer.acknowledge("queue", 1L);
        addPagesJob.assertBlockedWithCount(1);
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        addPagesJob.waitForFinished();
    }

    private ListenableFuture<?> enqueuePage(SharedBuffer sharedBuffer, Page page) {
        ListenableFuture<?> enqueue = sharedBuffer.enqueue(page);
        Assert.assertFalse(enqueue.isDone());
        return enqueue;
    }

    private void addPage(SharedBuffer sharedBuffer, Page page) {
        Assert.assertTrue(sharedBuffer.enqueue(page).isDone());
    }

    private void assertQueueState(SharedBuffer sharedBuffer, String str, int i, int i2) {
        Assert.assertEquals(getBufferInfo(sharedBuffer, str), new BufferInfo(str, false, i, i2));
    }

    private void assertQueueClosed(SharedBuffer sharedBuffer, String str, int i) {
        Assert.assertEquals(getBufferInfo(sharedBuffer, str), new BufferInfo(str, true, 0, i));
    }

    private BufferInfo getBufferInfo(SharedBuffer sharedBuffer, String str) {
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            if (bufferInfo.getBufferId().equals(str)) {
                return bufferInfo;
            }
        }
        return null;
    }

    private void assertFinished(SharedBuffer sharedBuffer) throws Exception {
        Assert.assertTrue(sharedBuffer.isFinished());
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            Assert.assertTrue(bufferInfo.isFinished());
            Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        }
    }

    private void assertBufferResultEquals(BufferResult bufferResult, BufferResult bufferResult2) {
        Assert.assertEquals(bufferResult.getPages().size(), bufferResult2.getPages().size());
        Assert.assertEquals(bufferResult.getToken(), bufferResult2.getToken());
        for (int i = 0; i < bufferResult.getPages().size(); i++) {
            Page page = (Page) bufferResult.getPages().get(i);
            Page page2 = (Page) bufferResult2.getPages().get(i);
            Assert.assertEquals(page.getChannelCount(), page2.getChannelCount());
            for (int i2 = 0; i2 < page.getChannelCount(); i2++) {
                BlockAssertions.assertBlockEquals(page.getBlock(i2), page2.getBlock(i2));
            }
        }
        Assert.assertEquals(bufferResult.isBufferClosed(), bufferResult2.isBufferClosed());
    }

    public static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return new BufferResult(j, j + r0.size(), false, ImmutableList.builder().add(page).add(pageArr).build());
    }
}
