package com.baidu.hugegraph.computer.core.sender;

import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.testutil.Assert;
import com.google.common.collect.ImmutableSet;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/sender/MultiQueueTest.class */
public class MultiQueueTest {
    @Test
    public void testPutAndTake() throws InterruptedException {
        MultiQueue multiQueue = new MultiQueue(2);
        Throwable[] thArr = new Throwable[3];
        CountDownLatch[] countDownLatchArr = new CountDownLatch[3];
        for (int i = 0; i < countDownLatchArr.length; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        Thread thread = new Thread(() -> {
            try {
                countDownLatchArr[0].await();
                multiQueue.put(0, new QueuedMessage(1, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[1].await();
                multiQueue.put(0, new QueuedMessage(3, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[2].await();
                multiQueue.put(0, new QueuedMessage(5, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[0] = th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                countDownLatchArr[0].await();
                multiQueue.put(1, new QueuedMessage(2, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[1].await();
                multiQueue.put(1, new QueuedMessage(4, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[2].await();
                multiQueue.put(1, new QueuedMessage(6, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[1] = th;
            }
        });
        Thread thread3 = new Thread(() -> {
            try {
                countDownLatchArr[0].countDown();
                Assert.assertTrue(ImmutableSet.of(1, 2).contains(Integer.valueOf(multiQueue.take().partitionId())));
                Assert.assertTrue(ImmutableSet.of(1, 2).contains(Integer.valueOf(multiQueue.take().partitionId())));
                countDownLatchArr[1].countDown();
                Assert.assertTrue(ImmutableSet.of(3, 4).contains(Integer.valueOf(multiQueue.take().partitionId())));
                Assert.assertTrue(ImmutableSet.of(3, 4).contains(Integer.valueOf(multiQueue.take().partitionId())));
                countDownLatchArr[2].countDown();
                Assert.assertTrue(ImmutableSet.of(5, 6).contains(Integer.valueOf(multiQueue.take().partitionId())));
                Assert.assertTrue(ImmutableSet.of(5, 6).contains(Integer.valueOf(multiQueue.take().partitionId())));
            } catch (Throwable th) {
                thArr[2] = th;
            }
        });
        thread.start();
        thread2.start();
        thread3.start();
        thread.join();
        thread2.join();
        thread3.join();
        for (Throwable th : thArr) {
            Assert.assertNull(th);
        }
    }

    @Test
    public void testPutAndTakeWithPutAtFront() throws InterruptedException {
        MultiQueue multiQueue = new MultiQueue(2);
        Throwable[] thArr = new Throwable[3];
        CountDownLatch[] countDownLatchArr = new CountDownLatch[3];
        for (int i = 0; i < countDownLatchArr.length; i++) {
            countDownLatchArr[i] = new CountDownLatch(1);
        }
        Thread thread = new Thread(() -> {
            try {
                countDownLatchArr[0].await();
                multiQueue.put(0, new QueuedMessage(1, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[1].await();
                multiQueue.put(0, new QueuedMessage(3, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[2].await();
                multiQueue.put(0, new QueuedMessage(5, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[0] = th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                countDownLatchArr[0].await();
                multiQueue.put(1, new QueuedMessage(2, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[1].await();
                multiQueue.put(1, new QueuedMessage(4, MessageType.VERTEX, (ByteBuffer) null));
                countDownLatchArr[2].await();
                multiQueue.put(1, new QueuedMessage(6, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[1] = th;
            }
        });
        Thread thread3 = new Thread(() -> {
            try {
                countDownLatchArr[0].countDown();
                QueuedMessage take = multiQueue.take();
                QueuedMessage take2 = multiQueue.take();
                Assert.assertTrue(ImmutableSet.of(1, 2).contains(Integer.valueOf(take.partitionId())));
                Assert.assertTrue(ImmutableSet.of(1, 2).contains(Integer.valueOf(take2.partitionId())));
                countDownLatchArr[1].countDown();
                QueuedMessage take3 = multiQueue.take();
                Assert.assertTrue(ImmutableSet.of(3, 4).contains(Integer.valueOf(take3.partitionId())));
                if ((take3.partitionId() & 1) == 1) {
                    multiQueue.putAtFront(0, take3);
                } else {
                    multiQueue.putAtFront(1, take3);
                }
                Assert.assertTrue(ImmutableSet.of(3, 4).contains(Integer.valueOf(multiQueue.take().partitionId())));
                Assert.assertTrue(ImmutableSet.of(3, 4).contains(Integer.valueOf(multiQueue.take().partitionId())));
                countDownLatchArr[2].countDown();
                Assert.assertTrue(ImmutableSet.of(5, 6).contains(Integer.valueOf(multiQueue.take().partitionId())));
                Assert.assertTrue(ImmutableSet.of(5, 6).contains(Integer.valueOf(multiQueue.take().partitionId())));
            } catch (Throwable th) {
                thArr[2] = th;
            }
        });
        thread.start();
        thread2.start();
        thread3.start();
        thread.join();
        thread2.join();
        thread3.join();
        for (Throwable th : thArr) {
            Assert.assertNull(th);
        }
    }

    @Test
    public void testTakeWithWait() throws InterruptedException {
        MultiQueue multiQueue = new MultiQueue(2);
        Throwable[] thArr = new Throwable[3];
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(100L);
                multiQueue.put(0, new QueuedMessage(1, MessageType.VERTEX, (ByteBuffer) null));
                Thread.sleep(200L);
                multiQueue.put(0, new QueuedMessage(3, MessageType.VERTEX, (ByteBuffer) null));
                Thread.sleep(300L);
                multiQueue.put(0, new QueuedMessage(5, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[0] = th;
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(100L);
                multiQueue.put(1, new QueuedMessage(2, MessageType.VERTEX, (ByteBuffer) null));
                Thread.sleep(200L);
                multiQueue.put(1, new QueuedMessage(4, MessageType.VERTEX, (ByteBuffer) null));
                Thread.sleep(300L);
                multiQueue.put(1, new QueuedMessage(6, MessageType.VERTEX, (ByteBuffer) null));
            } catch (Throwable th) {
                thArr[1] = th;
            }
        });
        Thread thread3 = new Thread(() -> {
            try {
                multiQueue.take();
                multiQueue.take();
                multiQueue.take();
                multiQueue.take();
                multiQueue.take();
                multiQueue.take();
            } catch (Throwable th) {
                thArr[2] = th;
            }
        });
        thread.start();
        thread2.start();
        thread3.start();
        thread.join();
        thread2.join();
        thread3.join();
        for (Throwable th : thArr) {
            Assert.assertNull(th);
        }
    }
}
