package co.cask.tigon.data.transaction.queue;

import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tigon.api.common.Bytes;
import co.cask.tigon.data.queue.ConsumerConfig;
import co.cask.tigon.data.queue.DequeueResult;
import co.cask.tigon.data.queue.DequeueStrategy;
import co.cask.tigon.data.queue.QueueClientFactory;
import co.cask.tigon.data.queue.QueueConsumer;
import co.cask.tigon.data.queue.QueueEntry;
import co.cask.tigon.data.queue.QueueName;
import co.cask.tigon.data.queue.QueueProducer;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tigon/data/transaction/queue/QueueTest.class */
public abstract class QueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueTest.class);
    private static final int ROUNDS = 1000;
    private static final long TIMEOUT_MS = 120000;
    protected static TransactionSystemClient txSystemClient;
    protected static QueueClientFactory queueClientFactory;
    protected static QueueAdmin queueAdmin;
    protected static TransactionManager transactionManager;
    protected static TransactionExecutorFactory executorFactory;

    @AfterClass
    public static void shutdownTx() {
        if (transactionManager != null) {
            transactionManager.stopAndWait();
        }
    }

    @Test
    @Ignore
    public void testCreateProducerWithMetricsEnsuresTableExists() throws Exception {
        Assert.assertNotNull(queueClientFactory.createProducer(QueueName.fromStream("someStream"), new QueueMetrics() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.1
            public void emitEnqueue(int i) {
            }

            public void emitEnqueueBytes(int i) {
            }
        }));
    }

    @Test
    @Ignore
    public void testDropAllQueues() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("myApp", "myFlow", "myFlowlet", "tDAQ");
        QueueName fromStream = QueueName.fromStream("tDAQStream");
        final TransactionAware createProducer = queueClientFactory.createProducer(fromFlowlet);
        final TransactionAware createProducer2 = queueClientFactory.createProducer(fromStream);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createProducer, createProducer2})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.2
            public void apply() throws Exception {
                createProducer.enqueue(new QueueEntry(Bytes.toBytes("q42")));
                createProducer2.enqueue(new QueueEntry(Bytes.toBytes("s42")));
            }
        });
        queueAdmin.dropAll();
        final TransactionAware createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        final TransactionAware createConsumer2 = queueClientFactory.createConsumer(fromStream, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createConsumer, createConsumer2})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.3
            public void apply() throws Exception {
                Assert.assertTrue(createConsumer.dequeue().isEmpty());
                DequeueResult dequeue = createConsumer2.dequeue();
                Assert.assertFalse(dequeue.isEmpty());
                Iterator it = dequeue.iterator();
                Assert.assertTrue(it.hasNext());
                Assert.assertArrayEquals(Bytes.toBytes("s42"), (byte[]) it.next());
            }
        });
    }

    @Test
    @Ignore
    public void testDropAllStreams() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("myApp", "myFlow", "myFlowlet", "tDAS");
        QueueName fromStream = QueueName.fromStream("tDASStream");
        final TransactionAware createProducer = queueClientFactory.createProducer(fromFlowlet);
        final TransactionAware createProducer2 = queueClientFactory.createProducer(fromStream);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createProducer, createProducer2})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.4
            public void apply() throws Exception {
                createProducer.enqueue(new QueueEntry(Bytes.toBytes("q42")));
                createProducer2.enqueue(new QueueEntry(Bytes.toBytes("s42")));
            }
        });
        final TransactionAware createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        final TransactionAware createConsumer2 = queueClientFactory.createConsumer(fromStream, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createConsumer, createConsumer2})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.5
            public void apply() throws Exception {
                Assert.assertTrue(createConsumer2.dequeue().isEmpty());
                DequeueResult dequeue = createConsumer.dequeue();
                Assert.assertFalse(dequeue.isEmpty());
                Iterator it = dequeue.iterator();
                Assert.assertTrue(it.hasNext());
                Assert.assertArrayEquals(Bytes.toBytes("q42"), (byte[]) it.next());
            }
        });
    }

    @Test
    @Ignore
    public void testStreamQueue() throws Exception {
        QueueName fromStream = QueueName.fromStream("my_stream");
        final TransactionAware createProducer = queueClientFactory.createProducer(fromStream);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createProducer})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.6
            public void apply() throws Exception {
                createProducer.enqueue(new QueueEntry(Bytes.toBytes("my_data")));
            }
        });
        final TransactionAware createConsumer = queueClientFactory.createConsumer(fromStream, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        executorFactory.createExecutor(Lists.newArrayList(new TransactionAware[]{createConsumer})).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.7
            public void apply() throws Exception {
                DequeueResult dequeue = createConsumer.dequeue();
                Assert.assertTrue((dequeue == null || dequeue.isEmpty()) ? false : true);
                Iterator it = dequeue.iterator();
                Assert.assertTrue(it.hasNext());
                Assert.assertArrayEquals(Bytes.toBytes("my_data"), (byte[]) it.next());
                Assert.assertFalse(it.hasNext());
            }
        });
    }

    @Test(timeout = TIMEOUT_MS)
    public void testSingleFifo() throws Exception {
        enqueueDequeue(QueueName.fromFlowlet("app", "flow", "flowlet", "singlefifo"), ROUNDS, ROUNDS, 1, 1, DequeueStrategy.FIFO, 1);
    }

    @Test(timeout = TIMEOUT_MS)
    public void testMultiFifo() throws Exception {
        enqueueDequeue(QueueName.fromFlowlet("app", "flow", "flowlet", "multififo"), ROUNDS, ROUNDS, 1, 3, DequeueStrategy.FIFO, 1);
    }

    @Test(timeout = TIMEOUT_MS)
    public void testSingleHash() throws Exception {
        enqueueDequeue(QueueName.fromFlowlet("app", "flow", "flowlet", "singlehash"), 2000, ROUNDS, 1, 1, DequeueStrategy.HASH, 1);
    }

    @Test(timeout = TIMEOUT_MS)
    @Ignore
    public void testMultiHash() throws Exception {
        enqueueDequeue(QueueName.fromStream("bingoBang"), 2000, ROUNDS, 1, 3, DequeueStrategy.HASH, 1);
    }

    @Test(timeout = TIMEOUT_MS)
    public void testBatchHash() throws Exception {
        enqueueDequeue(QueueName.fromFlowlet("app", "flow", "flowlet", "batchhash"), 2000, ROUNDS, 10, 1, DequeueStrategy.HASH, 50);
    }

    @Test(timeout = TIMEOUT_MS)
    public void testQueueAbortRetrySkip() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "queuefailure");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 1, 1L, 1));
        createEnqueueRunnable(fromFlowlet, 5, 1, null).run();
        Closeable createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 2);
        Closeable createConsumer2 = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(1L, 0, 1, DequeueStrategy.HASH, "key"), 2);
        TransactionContext createTxContext = createTxContext(createConsumer, createConsumer2);
        createTxContext.start();
        Assert.assertEquals(0L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        Assert.assertEquals(0L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext.abort();
        createTxContext.start();
        Assert.assertEquals(0L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        Assert.assertEquals(0L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext.finish();
        createTxContext.start();
        Assert.assertEquals(1L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        Assert.assertEquals(1L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext.finish();
        createTxContext.start();
        DequeueResult dequeue = createConsumer.dequeue();
        DequeueResult dequeue2 = createConsumer2.dequeue();
        Assert.assertEquals(2L, Bytes.toInt((byte[]) dequeue.iterator().next()));
        Assert.assertEquals(2L, Bytes.toInt((byte[]) dequeue2.iterator().next()));
        createTxContext.abort();
        createTxContext.start();
        dequeue.reclaim();
        dequeue2.reclaim();
        createTxContext.finish();
        createTxContext.start();
        Assert.assertEquals(3L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        Assert.assertEquals(3L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext.finish();
        createTxContext.start();
        Assert.assertEquals(4L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        Assert.assertEquals(4L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext.finish();
        createTxContext.start();
        if (createConsumer instanceof Closeable) {
            createConsumer.close();
        }
        if (createConsumer2 instanceof Closeable) {
            createConsumer2.close();
        }
        createTxContext.finish();
        verifyQueueIsEmpty(fromFlowlet, 2, 1);
    }

    @Test(timeout = TIMEOUT_MS)
    public void testRollback() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "queuerollback");
        QueueProducer createProducer = queueClientFactory.createProducer(fromFlowlet);
        QueueConsumer createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        TransactionContext createTxContext = createTxContext(createProducer, createConsumer, new TransactionAware() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.8
            boolean canCommit = false;

            public void startTx(Transaction transaction) {
            }

            public Collection<byte[]> getTxChanges() {
                return ImmutableList.of();
            }

            public boolean commitTx() throws Exception {
                boolean z = this.canCommit;
                this.canCommit = !this.canCommit;
                return z;
            }

            public void postTxCommit() {
            }

            public boolean rollbackTx() throws Exception {
                return true;
            }

            public String getTransactionAwareName() {
                return "test";
            }
        });
        createTxContext.start();
        try {
            createProducer.enqueue(new QueueEntry(Bytes.toBytes(1)));
            createTxContext.finish();
            Assert.assertTrue(false);
        } catch (TransactionFailureException e) {
            createTxContext.abort();
        }
        createTxContext.start();
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(1)));
        Assert.assertTrue(createConsumer.dequeue().isEmpty());
        createTxContext.finish();
        createTxContext.start();
        try {
            Assert.assertEquals(1L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
            createTxContext.finish();
            Assert.assertTrue(false);
        } catch (TransactionFailureException e2) {
            createTxContext.abort();
        }
        createTxContext.start();
        Assert.assertEquals(1L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        createTxContext.finish();
    }

    @Test
    public void testOneFIFOEnqueueDequeue() throws Exception {
        testOneEnqueueDequeue(DequeueStrategy.FIFO);
    }

    @Test
    public void testOneRoundRobinEnqueueDequeue() throws Exception {
        testOneEnqueueDequeue(DequeueStrategy.ROUND_ROBIN);
    }

    @Test
    public void testClearAllForFlow() throws Exception {
        testClearOrDropAllForFlow(false);
    }

    @Test
    public void testDropAllForFlow() throws Exception {
        testClearOrDropAllForFlow(true);
    }

    @Test
    public void testClearAllForFlowWithNoQueues() throws Exception {
        queueAdmin.dropAll();
        queueAdmin.clearAllForFlow("app", "flow");
    }

    @Test
    public void testDropAllForFlowWithNoQueues() throws Exception {
        queueAdmin.dropAll();
        queueAdmin.dropAllForFlow("app", "flow");
    }

    private void testClearOrDropAllForFlow(boolean z) throws Exception {
        String str = z ? "tDAFF" : "tCAFF";
        QueueName fromFlowlet = QueueName.fromFlowlet(str, "flow1", "flowlet1", "out1");
        QueueName fromFlowlet2 = QueueName.fromFlowlet(str, "flow1", "flowlet2", "out2");
        QueueName fromFlowlet3 = QueueName.fromFlowlet(str, "flow2", "flowlet1", "out");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 1, 1L, 1));
        configureGroups(fromFlowlet2, ImmutableMap.of(0L, 1, 1L, 1));
        configureGroups(fromFlowlet3, ImmutableMap.of(0L, 1, 1L, 1));
        Object createProducer = queueClientFactory.createProducer(fromFlowlet);
        Object createProducer2 = queueClientFactory.createProducer(fromFlowlet2);
        Object createProducer3 = queueClientFactory.createProducer(fromFlowlet3);
        TransactionContext createTxContext = createTxContext(createProducer, createProducer2, createProducer3);
        createTxContext.start();
        for (int i = 0; i < 10; i++) {
            Iterator it = Arrays.asList(createProducer, createProducer2, createProducer3).iterator();
            while (it.hasNext()) {
                ((QueueProducer) it.next()).enqueue(new QueueEntry(Bytes.toBytes(i)));
            }
        }
        createTxContext.finish();
        ConsumerConfig consumerConfig = new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null);
        Object createConsumer = queueClientFactory.createConsumer(fromFlowlet, consumerConfig, 1);
        Object createConsumer2 = queueClientFactory.createConsumer(fromFlowlet2, consumerConfig, 1);
        Object createConsumer3 = queueClientFactory.createConsumer(fromFlowlet3, consumerConfig, 1);
        TransactionContext createTxContext2 = createTxContext(createConsumer, createConsumer2, createConsumer3);
        createTxContext2.start();
        Iterator it2 = Arrays.asList(createConsumer, createConsumer2, createConsumer3).iterator();
        while (it2.hasNext()) {
            DequeueResult dequeue = ((QueueConsumer) it2.next()).dequeue(1);
            Assert.assertFalse(dequeue.isEmpty());
            Assert.assertArrayEquals(Bytes.toBytes(0), (byte[]) dequeue.iterator().next());
        }
        createTxContext2.finish();
        verifyConsumerConfigExists(fromFlowlet, fromFlowlet2);
        if (z) {
            queueAdmin.dropAllForFlow(str, "flow1");
        } else {
            queueAdmin.clearAllForFlow(str, "flow1");
        }
        if (z) {
            Assert.assertFalse(queueAdmin.exists(fromFlowlet.toString()));
            Assert.assertFalse(queueAdmin.exists(fromFlowlet2.toString()));
            Assert.assertTrue(queueAdmin.exists(fromFlowlet3.toString()));
        } else {
            Assert.assertTrue(queueAdmin.exists(fromFlowlet.toString()));
            Assert.assertTrue(queueAdmin.exists(fromFlowlet2.toString()));
            Assert.assertTrue(queueAdmin.exists(fromFlowlet3.toString()));
        }
        verifyConsumerConfigIsDeleted(fromFlowlet, fromFlowlet2);
        Object createConsumer4 = queueClientFactory.createConsumer(fromFlowlet, consumerConfig, 1);
        Object createConsumer5 = queueClientFactory.createConsumer(fromFlowlet2, consumerConfig, 1);
        QueueConsumer createConsumer6 = queueClientFactory.createConsumer(fromFlowlet3, consumerConfig, 1);
        TransactionContext createTxContext3 = createTxContext(createConsumer4, createConsumer5, createConsumer6);
        createTxContext3.start();
        Iterator it3 = Arrays.asList(createConsumer4, createConsumer5).iterator();
        while (it3.hasNext()) {
            Assert.assertTrue(((QueueConsumer) it3.next()).dequeue(1).isEmpty());
        }
        DequeueResult dequeue2 = createConsumer6.dequeue(1);
        Assert.assertFalse(dequeue2.isEmpty());
        Assert.assertArrayEquals(Bytes.toBytes(1), (byte[]) dequeue2.iterator().next());
        createTxContext3.finish();
    }

    protected void verifyConsumerConfigExists(QueueName... queueNameArr) throws InterruptedException {
    }

    protected void verifyConsumerConfigIsDeleted(QueueName... queueNameArr) throws InterruptedException {
    }

    @Test
    public void testReset() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "queueReset");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 1, 1L, 1));
        QueueProducer createProducer = queueClientFactory.createProducer(fromFlowlet);
        TransactionContext createTxContext = createTxContext(createProducer);
        createTxContext.start();
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(0)));
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(1)));
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(2)));
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(3)));
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(4)));
        createTxContext.finish();
        TransactionContext createTxContext2 = createTxContext(queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 2));
        createTxContext2.start();
        Assert.assertEquals(0L, Bytes.toInt((byte[]) r0.dequeue().iterator().next()));
        createTxContext2.finish();
        queueAdmin.dropAll();
        QueueConsumer createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(1L, 0, 1, DequeueStrategy.FIFO, (String) null), 2);
        TransactionContext createTxContext3 = createTxContext(createConsumer);
        createTxContext3.start();
        Assert.assertTrue(createConsumer.dequeue().isEmpty());
        createTxContext3.finish();
        TransactionContext createTxContext4 = createTxContext(createProducer);
        createTxContext4.start();
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(5)));
        createTxContext4.finish();
        TransactionContext createTxContext5 = createTxContext(createConsumer);
        createTxContext5.start();
        Assert.assertEquals(5L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        createTxContext5.finish();
    }

    @Test
    public void testConcurrentEnqueue() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "concurrent");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 1));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            final QueueProducer createProducer = queueClientFactory.createProducer(fromFlowlet);
            final int i2 = i + 1;
            newFixedThreadPool.execute(new Runnable() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.9
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            cyclicBarrier.await();
                            for (int i3 = 0; i3 < 50; i3++) {
                                TransactionContext createTxContext = QueueTest.this.createTxContext(createProducer);
                                createTxContext.start();
                                TimeUnit.MILLISECONDS.sleep(i2 * 50);
                                createProducer.enqueue(new QueueEntry(Bytes.toBytes(i3)));
                                createTxContext.finish();
                            }
                        } catch (Exception e) {
                            QueueTest.LOG.error(e.getMessage(), e);
                            if (createProducer instanceof Closeable) {
                                Closeables.closeQuietly(createProducer);
                            }
                        }
                    } finally {
                        if (createProducer instanceof Closeable) {
                            Closeables.closeQuietly(createProducer);
                        }
                    }
                }
            });
        }
        Closeable createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, DequeueStrategy.FIFO, (String) null), 1);
        cyclicBarrier.await();
        int i3 = 0;
        int i4 = 0;
        while (i3 != 3675 && i4 < 200) {
            TransactionContext createTxContext = createTxContext(createConsumer);
            createTxContext.start();
            DequeueResult dequeue = createConsumer.dequeue();
            if (dequeue.isEmpty()) {
                i4++;
                TimeUnit.MILLISECONDS.sleep(10L);
            } else {
                i4 = 0;
                i3 += Bytes.toInt((byte[]) dequeue.iterator().next());
            }
            createTxContext.finish();
        }
        if (createConsumer instanceof Closeable) {
            Closeables.closeQuietly(createConsumer);
        }
        Assert.assertEquals(3675, i3);
    }

    @Test
    public void testMultiStageConsumer() throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "multistage");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 2));
        QueueProducer createProducer = queueClientFactory.createProducer(fromFlowlet);
        for (int i = 0; i < 10; i++) {
            TransactionContext createTxContext = createTxContext(createProducer);
            createTxContext.start();
            createProducer.enqueue(new QueueEntry("key", i, Bytes.toBytes(i)));
            createTxContext.finish();
        }
        Closeable createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 2, DequeueStrategy.HASH, "key"), 1);
        for (int i2 = 0; i2 < 5; i2++) {
            TransactionContext createTxContext2 = createTxContext(createConsumer);
            createTxContext2.start();
            Assert.assertTrue(!createConsumer.dequeue().isEmpty());
            Assert.assertEquals(i2 * 2, Bytes.toInt((byte[]) r0.iterator().next()));
            createTxContext2.finish();
        }
        if (createConsumer instanceof Closeable) {
            createConsumer.close();
        }
        Closeable createConsumer2 = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 1, 2, DequeueStrategy.HASH, "key"), 1);
        TransactionContext createTxContext3 = createTxContext(createConsumer2);
        createTxContext3.start();
        DequeueResult dequeue = createConsumer2.dequeue(2);
        Assert.assertEquals(2L, dequeue.size());
        Iterator it = dequeue.iterator();
        for (int i3 = 0; i3 < 2; i3++) {
            Assert.assertEquals((i3 * 2) + 1, Bytes.toInt((byte[]) it.next()));
        }
        createTxContext3.finish();
        if (createConsumer2 instanceof Closeable) {
            createConsumer2.close();
        }
        QueueConsumer createConsumer3 = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 1, 2, DequeueStrategy.HASH, "key"), 1);
        for (int i4 = 2; i4 < 5; i4++) {
            TransactionContext createTxContext4 = createTxContext(createConsumer3);
            createTxContext4.start();
            Assert.assertTrue(!createConsumer3.dequeue().isEmpty());
            Assert.assertEquals((i4 * 2) + 1, Bytes.toInt((byte[]) r0.iterator().next()));
            createTxContext4.finish();
        }
    }

    private void testOneEnqueueDequeue(DequeueStrategy dequeueStrategy) throws Exception {
        QueueName fromFlowlet = QueueName.fromFlowlet("app", "flow", "flowlet", "queue1");
        configureGroups(fromFlowlet, ImmutableMap.of(0L, 1, 1L, 1));
        Closeable createProducer = queueClientFactory.createProducer(fromFlowlet);
        TransactionContext createTxContext = createTxContext(createProducer);
        createTxContext.start();
        createProducer.enqueue(new QueueEntry(Bytes.toBytes(55)));
        createTxContext.finish();
        Closeable createConsumer = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(0L, 0, 1, dequeueStrategy, (String) null), 2);
        TransactionContext createTxContext2 = createTxContext(createConsumer);
        createTxContext2.start();
        Assert.assertEquals(55L, Bytes.toInt((byte[]) createConsumer.dequeue().iterator().next()));
        createTxContext2.finish();
        if (createProducer instanceof Closeable) {
            createProducer.close();
        }
        if (createConsumer instanceof Closeable) {
            createConsumer.close();
        }
        forceEviction(fromFlowlet);
        Closeable createConsumer2 = queueClientFactory.createConsumer(fromFlowlet, new ConsumerConfig(1L, 0, 1, dequeueStrategy, (String) null), 2);
        TransactionContext createTxContext3 = createTxContext(createConsumer2);
        createTxContext3.start();
        Assert.assertEquals(55L, Bytes.toInt((byte[]) createConsumer2.dequeue().iterator().next()));
        createTxContext3.finish();
        if (createConsumer2 instanceof Closeable) {
            createConsumer2.close();
        }
        verifyQueueIsEmpty(fromFlowlet, 2, 1);
    }

    private void enqueueDequeue(final QueueName queueName, int i, int i2, int i3, final int i4, final DequeueStrategy dequeueStrategy, final int i5) throws Exception {
        configureGroups(queueName, ImmutableMap.of(0L, Integer.valueOf(i4)));
        Preconditions.checkArgument(i % i3 == 0, "Count must be divisible by enqueueBatchSize");
        Preconditions.checkArgument(i2 % i3 == 0, "Count must be divisible by enqueueBatchSize");
        createEnqueueRunnable(queueName, i, i3, null).run();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(i4 + 2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i4 + 1);
        newFixedThreadPool.submit(createEnqueueRunnable(queueName, i2, i3, cyclicBarrier));
        final long j = ((i / 2) * (i - 1)) + ((i2 / 2) * (i2 - 1));
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(i4);
        for (int i6 = 0; i6 < i4; i6++) {
            final int i7 = i6;
            newFixedThreadPool.submit(new Runnable() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.10
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        cyclicBarrier.await();
                        QueueTest.LOG.info("Consumer {} starts consuming {}", Integer.valueOf(i7), queueName.getSimpleName());
                        Closeable createConsumer = QueueTest.queueClientFactory.createConsumer(queueName, new ConsumerConfig(0L, i7, i4, dequeueStrategy, "key"), 1);
                        try {
                            TransactionContext createTxContext = QueueTest.this.createTxContext(createConsumer);
                            Stopwatch stopwatch = new Stopwatch();
                            stopwatch.start();
                            int i8 = 0;
                            while (atomicLong.get() < j) {
                                createTxContext.start();
                                try {
                                    DequeueResult dequeue = createConsumer.dequeue(i5);
                                    createTxContext.finish();
                                    if (!dequeue.isEmpty()) {
                                        Iterator it = dequeue.iterator();
                                        while (it.hasNext()) {
                                            atomicLong.addAndGet(Bytes.toInt((byte[]) it.next()));
                                            i8++;
                                        }
                                    }
                                } catch (TransactionFailureException e) {
                                    QueueTest.LOG.error("Operation error", e);
                                    createTxContext.abort();
                                    throw Throwables.propagate(e);
                                }
                            }
                            long elapsedTime = stopwatch.elapsedTime(TimeUnit.MILLISECONDS);
                            QueueTest.LOG.info("Dequeue {} entries in {} ms for {}", new Object[]{Integer.valueOf(i8), Long.valueOf(elapsedTime), queueName.getSimpleName()});
                            QueueTest.LOG.info("Dequeue avg {} entries per seconds for {}", Double.valueOf((i8 * 1000.0d) / elapsedTime), queueName.getSimpleName());
                            if (createConsumer instanceof Closeable) {
                                createTxContext.start();
                                createConsumer.close();
                                createTxContext.finish();
                            }
                            countDownLatch.countDown();
                            if (createConsumer instanceof Closeable) {
                                createConsumer.close();
                            }
                        } catch (Throwable th) {
                            if (createConsumer instanceof Closeable) {
                                createConsumer.close();
                            }
                            throw th;
                        }
                    } catch (Exception e2) {
                        QueueTest.LOG.error(e2.getMessage(), e2);
                    }
                }
            });
        }
        cyclicBarrier.await();
        countDownLatch.await();
        TimeUnit.SECONDS.sleep(2L);
        Assert.assertEquals(j, atomicLong.get());
        if (!queueName.isStream()) {
            verifyQueueIsEmpty(queueName, 1, i4);
        }
        newFixedThreadPool.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TransactionContext createTxContext(Object... objArr) {
        TransactionAware[] transactionAwareArr = new TransactionAware[objArr.length];
        for (int i = 0; i < objArr.length; i++) {
            transactionAwareArr[i] = (TransactionAware) objArr[i];
        }
        return new TransactionContext(txSystemClient, transactionAwareArr);
    }

    private Runnable createEnqueueRunnable(final QueueName queueName, final int i, final int i2, final CyclicBarrier cyclicBarrier) {
        return new Runnable() { // from class: co.cask.tigon.data.transaction.queue.QueueTest.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (cyclicBarrier != null) {
                        cyclicBarrier.await();
                    }
                    Closeable createProducer = QueueTest.queueClientFactory.createProducer(queueName);
                    try {
                        TransactionContext createTxContext = QueueTest.this.createTxContext(createProducer);
                        QueueTest.LOG.info("Start enqueue {} entries.", Integer.valueOf(i));
                        Stopwatch stopwatch = new Stopwatch();
                        stopwatch.start();
                        int i3 = i / i2;
                        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i2);
                        int i4 = -1;
                        for (int i5 = 0; i5 < i3; i5++) {
                            createTxContext.start();
                            try {
                                newArrayListWithCapacity.clear();
                                for (int i6 = 0; i6 < i2; i6++) {
                                    int i7 = (i5 * i2) + i6;
                                    newArrayListWithCapacity.add(new QueueEntry("key", i4 * i7, Bytes.toBytes(i7)));
                                    i4 *= -1;
                                }
                                createProducer.enqueue(newArrayListWithCapacity);
                                createTxContext.finish();
                            } catch (TransactionFailureException e) {
                                QueueTest.LOG.error("Operation error", e);
                                createTxContext.abort();
                                throw Throwables.propagate(e);
                            }
                        }
                        long elapsedTime = stopwatch.elapsedTime(TimeUnit.MILLISECONDS);
                        QueueTest.LOG.info("Enqueue {} entries in {} ms for {}", new Object[]{Integer.valueOf(i), Long.valueOf(elapsedTime), queueName.getSimpleName()});
                        QueueTest.LOG.info("Enqueue avg {} entries per seconds for {}", Double.valueOf((i * 1000.0d) / elapsedTime), queueName.getSimpleName());
                        stopwatch.stop();
                        if (createProducer instanceof Closeable) {
                            createProducer.close();
                        }
                    } catch (Throwable th) {
                        if (createProducer instanceof Closeable) {
                            createProducer.close();
                        }
                        throw th;
                    }
                } catch (Exception e2) {
                    QueueTest.LOG.error(e2.getMessage(), e2);
                }
            }
        };
    }

    protected void configureGroups(QueueName queueName, Map<Long, Integer> map) throws Exception {
    }

    protected void verifyQueueIsEmpty(QueueName queueName, int i, int i2) throws Exception {
        for (int i3 = 0; i3 < i; i3++) {
            for (int i4 = 0; i4 < i2; i4++) {
                Closeable createConsumer = queueClientFactory.createConsumer(queueName, new ConsumerConfig(i3, i4, i2, DequeueStrategy.FIFO, (String) null), -1);
                TransactionContext createTxContext = createTxContext(createConsumer);
                try {
                    try {
                        createTxContext.start();
                        Assert.assertTrue(createConsumer.dequeue().isEmpty());
                        createTxContext.finish();
                        if (createConsumer instanceof Closeable) {
                            createConsumer.close();
                        }
                    } catch (TransactionFailureException e) {
                        createTxContext.abort();
                        throw Throwables.propagate(e);
                    }
                } catch (Throwable th) {
                    if (createConsumer instanceof Closeable) {
                        createConsumer.close();
                    }
                    throw th;
                }
            }
        }
        forceEviction(queueName);
        QueueConsumer createConsumer2 = queueClientFactory.createConsumer(queueName, new ConsumerConfig(i + 1, 0, 1, DequeueStrategy.FIFO, (String) null), -1);
        TransactionContext createTxContext2 = createTxContext(createConsumer2);
        createTxContext2.start();
        DequeueResult dequeue = createConsumer2.dequeue();
        if (!dequeue.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            Iterator it = dequeue.iterator();
            while (it.hasNext()) {
                if (sb.length() > 0) {
                    sb.append(", ");
                }
                sb.append(Bytes.toInt((byte[]) it.next()));
            }
            LOG.info("Queue should be empty but returned result: " + dequeue.toString() + ", value = " + sb.toString());
        }
        Assert.assertTrue("Entire queue should be evicted after test but dequeue succeeds.", dequeue.isEmpty());
        createTxContext2.abort();
    }

    protected void forceEviction(QueueName queueName) throws Exception {
    }
}
