package cz.o2.proxima.repository;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.commitlog.RetryableLogObserver;
import cz.o2.proxima.util.Optionals;
import cz.seznam.euphoria.executor.local.LocalExecutor;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/repository/CommitLogReaderTest.class */
public class CommitLogReaderTest {
    private transient LocalExecutor executor;
    private transient CommitLogReader reader;
    private transient AttributeWriterBase writer;
    private final transient Repository repo = Repository.of(ConfigFactory.load().withFallback(ConfigFactory.load("test-reference.conf")).resolve());
    private final transient EntityDescriptor entity = (EntityDescriptor) Optionals.get(this.repo.findEntity("event"));
    private final transient AttributeDescriptor<?> attr = (AttributeDescriptor) Optionals.get(this.entity.findAttribute("data"));

    @Before
    public void setUp() {
        this.executor = new LocalExecutor();
        AttributeFamilyDescriptor attributeFamilyDescriptor = (AttributeFamilyDescriptor) this.repo.getAllFamilies().filter(attributeFamilyDescriptor2 -> {
            return attributeFamilyDescriptor2.getName().equals("event-storage-stream");
        }).findAny().get();
        this.reader = (CommitLogReader) attributeFamilyDescriptor.getCommitLogReader().get();
        this.writer = (AttributeWriterBase) attributeFamilyDescriptor.getWriter().get();
    }

    @After
    public void tearDown() {
        this.executor.abort();
    }

    @Test(timeout = 10000)
    public void testObserveSimple() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.reader.observe("test", new LogObserver() { // from class: cz.o2.proxima.repository.CommitLogReaderTest.1
            public boolean onNext(StreamElement streamElement, LogObserver.OffsetCommitter offsetCommitter) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                offsetCommitter.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testObserveWithError() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        this.reader.observe("test", new LogObserver() { // from class: cz.o2.proxima.repository.CommitLogReaderTest.2
            public boolean onNext(StreamElement streamElement, LogObserver.OffsetCommitter offsetCommitter) {
                throw new RuntimeException("fail");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
                return false;
            }
        });
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertNotNull(atomicReference.get());
    }

    @Test(timeout = 10000)
    public void testObserveWithRetry() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        new RetryableLogObserver(2, "test", this.reader) { // from class: cz.o2.proxima.repository.CommitLogReaderTest.3
            protected boolean onNextInternal(StreamElement streamElement, LogObserver.OffsetCommitter offsetCommitter) {
                if (atomicInteger.incrementAndGet() == 0) {
                    throw new RuntimeException("fail");
                }
                arrayList.add(streamElement);
                countDownLatch.countDown();
                return true;
            }

            protected void failure() {
            }
        }.start();
        this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testBulkObserve() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        this.reader.observeBulk("test", new BulkLogObserver() { // from class: cz.o2.proxima.repository.CommitLogReaderTest.4
            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                arrayList.add(streamElement);
                countDownLatch.countDown();
                if (arrayList.size() != 2) {
                    return true;
                }
                offsetCommitter.confirm();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        for (int i = 0; i < 2; i++) {
            this.writer.online().write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
            });
        }
        countDownLatch.await();
        Assert.assertEquals(2L, arrayList.size());
    }
}
