package com.facebook.presto.operator.exchange;

import com.facebook.presto.SequencePageBuilder;
import com.facebook.presto.operator.InterpretedHashGenerator;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.operator.exchange.LocalExchange;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import java.util.List;
import java.util.Optional;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/facebook/presto/operator/exchange/TestLocalExchange.class */
public class TestLocalExchange {
    private static final List<Type> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final DataSize RETAINED_PAGE_SIZE = new DataSize(createPage(42).getRetainedSizeInBytes(), DataSize.Unit.BYTE);

    @Test
    public void testGatherSingleWriter() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.SINGLE_DISTRIBUTION, 8, TYPES, ImmutableList.of(), Optional.empty(), new DataSize(retainedSizeOfPages(99), DataSize.Unit.BYTE));
        Assert.assertEquals(localExchange.getBufferCount(), 1);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSink createSink = createSinkFactory.createSink();
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        assertSinkCanWrite(createSink);
        assertSource(source, 0);
        ListenableFuture waitForReading = source.waitForReading();
        Assert.assertFalse(waitForReading.isDone());
        createSink.addPage(createPage(0));
        Assert.assertTrue(waitForReading.isDone());
        assertExchangeTotalBufferedBytes(localExchange, 1);
        assertSource(source, 1);
        createSink.addPage(createPage(1));
        assertSource(source, 2);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        assertRemovePage(source, createPage(0));
        assertSource(source, 1);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        assertRemovePage(source, createPage(1));
        assertSource(source, 0);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        createSink.addPage(createPage(2));
        createSink.addPage(createPage(3));
        assertSource(source, 2);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        createSink.finish();
        assertSinkFinished(createSink);
        assertSource(source, 2);
        assertRemovePage(source, createPage(2));
        assertSource(source, 1);
        assertSinkFinished(createSink);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        assertRemovePage(source, createPage(3));
        assertSourceFinished(source);
        assertExchangeTotalBufferedBytes(localExchange, 0);
    }

    @Test
    public void testBroadcast() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty());
        Assert.assertEquals(localExchange.getBufferCount(), 2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSink createSink = createSinkFactory.createSink();
        assertSinkCanWrite(createSink);
        LocalExchangeSink createSink2 = createSinkFactory.createSink();
        assertSinkCanWrite(createSink2);
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSource source2 = localExchange.getSource(1);
        assertSource(source2, 0);
        createSink.addPage(createPage(0));
        assertSource(source, 1);
        assertSource(source2, 1);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        createSink.addPage(createPage(0));
        assertSource(source, 2);
        assertSource(source2, 2);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        assertRemovePage(source, createPage(0));
        assertSource(source, 1);
        assertSource(source2, 2);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        assertRemovePage(source, createPage(0));
        assertSource(source, 0);
        assertSource(source2, 2);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        createSink.finish();
        assertSinkFinished(createSink);
        assertExchangeTotalBufferedBytes(localExchange, 2);
        createSink2.addPage(createPage(0));
        assertSource(source, 1);
        assertSource(source2, 3);
        assertExchangeTotalBufferedBytes(localExchange, 3);
        createSink2.finish();
        assertSinkFinished(createSink2);
        assertSource(source, 1);
        assertSource(source2, 3);
        assertExchangeTotalBufferedBytes(localExchange, 3);
        assertRemovePage(source, createPage(0));
        assertSourceFinished(source);
        assertSource(source2, 3);
        assertExchangeTotalBufferedBytes(localExchange, 3);
        assertRemovePage(source2, createPage(0));
        assertRemovePage(source2, createPage(0));
        assertSourceFinished(source);
        assertSource(source2, 1);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        assertRemovePage(source2, createPage(0));
        assertSourceFinished(source);
        assertSourceFinished(source2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
    }

    @Test
    public void testRandom() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.FIXED_RANDOM_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty());
        Assert.assertEquals(localExchange.getBufferCount(), 2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSink createSink = createSinkFactory.createSink();
        assertSinkCanWrite(createSink);
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSource source2 = localExchange.getSource(1);
        assertSource(source2, 0);
        for (int i = 0; i < 100; i++) {
            createSink.addPage(createPage(0));
            assertExchangeTotalBufferedBytes(localExchange, i + 1);
            LocalExchangeBufferInfo bufferInfo = source.getBufferInfo();
            LocalExchangeBufferInfo bufferInfo2 = source2.getBufferInfo();
            Assert.assertEquals(bufferInfo.getBufferedBytes() + bufferInfo2.getBufferedBytes(), retainedSizeOfPages(i + 1));
            Assert.assertEquals(bufferInfo.getBufferedPages() + bufferInfo2.getBufferedPages(), i + 1);
        }
        Assert.assertTrue(source.getBufferInfo().getBufferedPages() > 0);
        Assert.assertTrue(source2.getBufferInfo().getBufferedPages() > 0);
        assertExchangeTotalBufferedBytes(localExchange, 100);
    }

    @Test
    public void testPartition() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, 2, TYPES, ImmutableList.of(0), Optional.empty());
        Assert.assertEquals(localExchange.getBufferCount(), 2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSink createSink = createSinkFactory.createSink();
        assertSinkCanWrite(createSink);
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSource source2 = localExchange.getSource(1);
        assertSource(source2, 0);
        createSink.addPage(createPage(0));
        assertSource(source, 1);
        assertSource(source2, 1);
        Assert.assertTrue(localExchange.getBufferedBytes() >= retainedSizeOfPages(1));
        createSink.addPage(createPage(0));
        assertSource(source, 2);
        assertSource(source2, 2);
        Assert.assertTrue(localExchange.getBufferedBytes() >= retainedSizeOfPages(2));
        assertPartitionedRemovePage(source, 0, 2);
        assertSource(source, 1);
        assertSource(source2, 2);
        assertPartitionedRemovePage(source, 0, 2);
        assertSource(source, 0);
        assertSource(source2, 2);
        createSink.finish();
        assertSinkFinished(createSink);
        assertSourceFinished(source);
        assertSource(source2, 2);
        assertPartitionedRemovePage(source2, 1, 2);
        assertSourceFinished(source);
        assertSource(source2, 1);
        assertPartitionedRemovePage(source2, 1, 2);
        assertSourceFinished(source);
        assertSourceFinished(source2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
    }

    @Test
    public void writeUnblockWhenAllReadersFinish() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, ImmutableList.of(BigintType.BIGINT), ImmutableList.of(), Optional.empty());
        Assert.assertEquals(localExchange.getBufferCount(), 2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSink createSink = createSinkFactory.createSink();
        assertSinkCanWrite(createSink);
        LocalExchangeSink createSink2 = createSinkFactory.createSink();
        assertSinkCanWrite(createSink2);
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSource source2 = localExchange.getSource(1);
        assertSource(source2, 0);
        source.finish();
        assertSourceFinished(source);
        assertSinkCanWrite(createSink);
        assertSinkCanWrite(createSink2);
        source2.finish();
        assertSourceFinished(source2);
        assertSinkFinished(createSink);
        assertSinkFinished(createSink2);
    }

    @Test
    public void writeUnblockWhenAllReadersFinishAndPagesConsumed() {
        LocalExchange localExchange = new LocalExchange(SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION, 2, TYPES, ImmutableList.of(), Optional.empty(), new DataSize(1.0d, DataSize.Unit.BYTE));
        Assert.assertEquals(localExchange.getBufferCount(), 2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
        LocalExchangeSink createSink = createSinkFactory.createSink();
        assertSinkCanWrite(createSink);
        LocalExchangeSink createSink2 = createSinkFactory.createSink();
        assertSinkCanWrite(createSink2);
        createSinkFactory.close();
        createSinkFactory.noMoreSinkFactories();
        LocalExchangeSource source = localExchange.getSource(0);
        assertSource(source, 0);
        LocalExchangeSource source2 = localExchange.getSource(1);
        assertSource(source2, 0);
        createSink.addPage(createPage(0));
        ListenableFuture<?> assertSinkWriteBlocked = assertSinkWriteBlocked(createSink);
        ListenableFuture<?> assertSinkWriteBlocked2 = assertSinkWriteBlocked(createSink2);
        assertSource(source, 1);
        assertSource(source2, 1);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        source.finish();
        assertSource(source, 1);
        assertRemovePage(source, createPage(0));
        assertSourceFinished(source);
        assertExchangeTotalBufferedBytes(localExchange, 1);
        assertSource(source2, 1);
        assertSinkWriteBlocked(createSink);
        assertSinkWriteBlocked(createSink2);
        source2.finish();
        assertSource(source2, 1);
        assertRemovePage(source2, createPage(0));
        assertSourceFinished(source2);
        assertExchangeTotalBufferedBytes(localExchange, 0);
        Assert.assertTrue(assertSinkWriteBlocked.isDone());
        Assert.assertTrue(assertSinkWriteBlocked2.isDone());
        assertSinkFinished(createSink);
        assertSinkFinished(createSink2);
    }

    private static void assertSource(LocalExchangeSource localExchangeSource, int i) {
        Assert.assertEquals(localExchangeSource.getTypes(), TYPES);
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), i);
        Assert.assertFalse(localExchangeSource.isFinished());
        if (i != 0) {
            Assert.assertTrue(localExchangeSource.waitForReading().isDone());
            Assert.assertTrue(bufferInfo.getBufferedBytes() > 0);
            return;
        }
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertFalse(localExchangeSource.isFinished());
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
    }

    private static void assertSourceFinished(LocalExchangeSource localExchangeSource) {
        Assert.assertEquals(localExchangeSource.getTypes(), TYPES);
        Assert.assertTrue(localExchangeSource.isFinished());
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertTrue(localExchangeSource.isFinished());
    }

    private static void assertRemovePage(LocalExchangeSource localExchangeSource, Page page) {
        Assert.assertEquals(localExchangeSource.getTypes(), TYPES);
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        Assert.assertEquals(removePage.getChannelCount(), page.getChannelCount());
        PageAssertions.assertPageEquals(TYPES, removePage, page);
    }

    private static void assertPartitionedRemovePage(LocalExchangeSource localExchangeSource, int i, int i2) {
        Assert.assertEquals(localExchangeSource.getTypes(), TYPES);
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        LocalPartitionGenerator localPartitionGenerator = new LocalPartitionGenerator(new InterpretedHashGenerator(TYPES, new int[]{0}), i2);
        for (int i3 = 0; i3 < removePage.getPositionCount(); i3++) {
            Assert.assertEquals(localPartitionGenerator.getPartition(i3, removePage), i);
        }
    }

    private static void assertSinkCanWrite(LocalExchangeSink localExchangeSink) {
        Assert.assertEquals(localExchangeSink.getTypes(), TYPES);
        Assert.assertFalse(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static ListenableFuture<?> assertSinkWriteBlocked(LocalExchangeSink localExchangeSink) {
        Assert.assertEquals(localExchangeSink.getTypes(), TYPES);
        Assert.assertFalse(localExchangeSink.isFinished());
        ListenableFuture<?> waitForWriting = localExchangeSink.waitForWriting();
        Assert.assertFalse(waitForWriting.isDone());
        return waitForWriting;
    }

    private static void assertSinkFinished(LocalExchangeSink localExchangeSink) {
        Assert.assertEquals(localExchangeSink.getTypes(), TYPES);
        Assert.assertTrue(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
        localExchangeSink.addPage(createPage(0));
        Assert.assertTrue(localExchangeSink.isFinished());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static void assertExchangeTotalBufferedBytes(LocalExchange localExchange, int i) {
        Assert.assertEquals(localExchange.getBufferedBytes(), retainedSizeOfPages(i));
    }

    private static void assertExchangeTotalBufferedPages(LocalExchange localExchange, int i) {
        Assert.assertEquals(localExchange, Long.valueOf(retainedSizeOfPages(i)));
    }

    private static Page createPage(int i) {
        return SequencePageBuilder.createSequencePage(TYPES, 100, i);
    }

    public static long retainedSizeOfPages(int i) {
        return RETAINED_PAGE_SIZE.toBytes() * i;
    }
}
