package com.questdb.net.ha;

import com.questdb.net.ha.bridge.JournalEventBridge;
import com.questdb.net.ha.bridge.JournalEventHandler;
import com.questdb.net.ha.bridge.JournalEventProcessor;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/questdb/net/ha/JournalEventBridgeTest.class */
public class JournalEventBridgeTest {

    /* loaded from: input_file:com/questdb/net/ha/JournalEventBridgeTest$Handler.class */
    private class Handler implements JournalEventHandler {
        private final int index;
        private int counter;

        private Handler(int i) {
            this.index = i;
        }

        public int getCounter() {
            return this.counter;
        }

        public void handle(int i) {
            if (i == this.index) {
                this.counter++;
            }
        }
    }

    @Test
    public void testStartStop() throws Exception {
        JournalEventBridge journalEventBridge = new JournalEventBridge(2L, TimeUnit.SECONDS);
        for (int i = 0; i < 10000; i++) {
            journalEventBridge.publish(10, System.currentTimeMillis());
        }
    }

    @Test
    public void testTwoPublishersThreeConsumers() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        JournalEventBridge journalEventBridge = new JournalEventBridge(50L, TimeUnit.MILLISECONDS);
        Future[] futureArr = new Future[2];
        Handler[] handlerArr = new Handler[3];
        CyclicBarrier cyclicBarrier = new CyclicBarrier(futureArr.length + handlerArr.length);
        CountDownLatch countDownLatch = new CountDownLatch(futureArr.length + handlerArr.length);
        for (int i = 0; i < futureArr.length; i++) {
            int i2 = i;
            futureArr[i] = newCachedThreadPool.submit(() -> {
                int i3 = 0;
                try {
                    try {
                        cyclicBarrier.await();
                        for (int i4 = 0; i4 < 1000; i4++) {
                            journalEventBridge.publish(i2, System.nanoTime());
                            i3++;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        countDownLatch.countDown();
                    }
                    return Integer.valueOf(i3);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        for (int i3 = 0; i3 < handlerArr.length; i3++) {
            JournalEventProcessor journalEventProcessor = new JournalEventProcessor(journalEventBridge);
            Handler handler = new Handler(i3);
            handlerArr[i3] = handler;
            newCachedThreadPool.submit(() -> {
                try {
                    try {
                        cyclicBarrier.await();
                        do {
                        } while (journalEventProcessor.process(handler, true));
                        countDownLatch.countDown();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        }
        countDownLatch.await();
        for (Future future : futureArr) {
            Assert.assertEquals(1000, future.get());
        }
        Assert.assertEquals(1000L, handlerArr[0].getCounter());
        Assert.assertEquals(1000L, handlerArr[1].getCounter());
        Assert.assertEquals(0L, handlerArr[2].getCounter());
    }
}
