package cz.o2.proxima.direct.core.storage;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.scheme.SerializationException;
import cz.o2.proxima.core.storage.AccessType;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.StorageType;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.KeyAttributePartitioner;
import cz.o2.proxima.core.storage.commitlog.Partitioners;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.core.time.WatermarkEstimator;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.core.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.batch.BatchLogReader;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.LogObserverUtils;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.commitlog.ObserveHandleUtils;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.direct.core.randomaccess.KeyValue;
import cz.o2.proxima.direct.core.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.core.randomaccess.RandomOffset;
import cz.o2.proxima.direct.core.storage.InMemStorage;
import cz.o2.proxima.direct.core.test.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import cz.o2.proxima.direct.core.test.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import cz.o2.proxima.direct.core.test.shaded.com.fasterxml.jackson.core.type.TypeReference;
import cz.o2.proxima.direct.core.test.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import cz.o2.proxima.direct.core.view.CachedView;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:cz/o2/proxima/direct/core/storage/InMemStorageTest.class */
public class InMemStorageTest implements Serializable {
    final Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve(), new Repository.Validate[0]);
    final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
    final EntityDescriptor entity = this.repo.getEntity("dummy");
    final AttributeDescriptor<?> data = this.entity.getAttribute("data");
    final AttributeDescriptor<?> wildcard = this.entity.getAttribute("wildcard.*");

    @Test(timeout = 10000)
    public void testObservePartitions() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final AtomicReference atomicReference = new AtomicReference();
        Assert.assertTrue(ObserveHandleUtils.isAtHead(commitLogReader.observePartitions(commitLogReader.getPartitions(), new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.1
            public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
                atomicReference.set(new CountDownLatch(1));
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                ((CountDownLatch) atomicReference.get()).countDown();
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }), commitLogReader));
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        ((CountDownLatch) atomicReference.get()).await();
    }

    @Test(timeout = 10000)
    public void testObservePartitionsWithSamePath() throws InterruptedException {
        InMemStorage inMemStorage = new InMemStorage();
        DataAccessor createAccessor = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test1")));
        DataAccessor createAccessor2 = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test2")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3});
        attributeWriterBase.online().write(upsert, (z, th) -> {
        });
        ((AttributeWriterBase) createAccessor2.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer2");
        })).online().write(upsert, (z2, th2) -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        commitLogReader.observePartitions(commitLogReader.getPartitions(), Position.OLDEST, true, new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.2
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                atomicInteger.incrementAndGet();
                return true;
            }

            public boolean onError(Throwable th3) {
                throw new RuntimeException(th3);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testObserveBatch() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        BatchLogReader batchLogReader = (BatchLogReader) Optionals.get(createAccessor.getBatchLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        batchLogReader.observe(batchLogReader.getPartitions(), Collections.singletonList(this.data), new BatchLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.3
            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                countDownLatch.countDown();
                return false;
            }

            public boolean onError(Throwable th2) {
                throw new RuntimeException(th2);
            }
        });
        countDownLatch.await();
    }

    @Test(timeout = 10000)
    public void testObserveBatchWithSamePath() throws InterruptedException {
        InMemStorage inMemStorage = new InMemStorage();
        DataAccessor createAccessor = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test1")));
        DataAccessor createAccessor2 = inMemStorage.createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem://test2")));
        BatchLogReader batchLogReader = (BatchLogReader) Optionals.get(createAccessor.getBatchLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamElement upsert = StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3});
        attributeWriterBase.online().write(upsert, (z, th) -> {
        });
        ((AttributeWriterBase) createAccessor2.getWriter(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing writer2");
        })).online().write(upsert, (z2, th2) -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        batchLogReader.observe(batchLogReader.getPartitions(), Collections.singletonList(this.data), new BatchLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.4
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onNext(StreamElement streamElement, BatchLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                atomicInteger.incrementAndGet();
                return true;
            }

            public boolean onError(Throwable th3) {
                throw new RuntimeException(th3);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testObserveCancel() {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final ArrayList arrayList = new ArrayList();
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.5
            public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                Assert.assertEquals("key", streamElement.getKey());
                onNextContext.confirm();
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        Assert.assertEquals(1L, observePartitions.getCurrentOffsets().size());
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
        observePartitions.close();
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
    }

    @Test(timeout = 10000)
    public void testObserveOffsets() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final ArrayList arrayList = new ArrayList();
        CommitLogObserver commitLogObserver = new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.6
            public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                Assert.assertEquals(0L, onNextContext.getPartition().getId());
                arrayList.add(Byte.valueOf(streamElement.getValue()[0]));
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), commitLogObserver);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
        });
        List currentOffsets = observePartitions.getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets.size());
        Assert.assertTrue(((Offset) currentOffsets.get(0)).getWatermark() > 0);
        Assert.assertEquals(Collections.singletonList((byte) 1), arrayList);
        observePartitions.close();
        ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(currentOffsets, commitLogObserver);
        observeBulkOffsets.waitUntilReady();
        List currentOffsets2 = observeBulkOffsets.getCurrentOffsets();
        Assert.assertEquals(1L, currentOffsets2.size());
        Assert.assertTrue("Expected positive watermark, got " + ((Offset) currentOffsets2.get(0)).getWatermark(), ((Offset) currentOffsets2.get(0)).getWatermark() > 0);
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z2, th2) -> {
        });
        Assert.assertEquals(Arrays.asList((byte) 1, (byte) 1), arrayList);
        Assert.assertEquals(0L, ((InMemStorage.ConsumedOffset) observeBulkOffsets.getCurrentOffsets().get(0)).getConsumedKeyAttr().size());
        observeBulkOffsets.close();
    }

    @Test(timeout = 10000)
    public void testFetchOffsetsSinglePartition() throws InterruptedException {
        testFetchOffsets(1);
    }

    @Test(timeout = 10000)
    public void testFetchOffsetsMultiplePartitions() throws InterruptedException {
        testFetchOffsets(3);
    }

    private void testFetchOffsets(int i) throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), i));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 2 * i; i2++) {
            arrayList.add(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key" + (i2 + 1), this.data.getName(), currentTimeMillis + i2, new byte[]{1, 2, 3}));
        }
        arrayList.forEach(streamElement -> {
            attributeWriterBase.online().write(streamElement, (z, th) -> {
            });
        });
        Map fetchOffsets = commitLogReader.fetchOffsets(Position.OLDEST, commitLogReader.getPartitions());
        Assert.assertEquals(i, fetchOffsets.size());
        ArrayList arrayList2 = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(fetchOffsets.values(), true, LogObserverUtils.toList(arrayList2, bool -> {
            countDownLatch.countDown();
        }, streamElement2 -> {
            Objects.requireNonNull(countDownLatch2);
            ExceptionUtils.ignoringInterrupted(countDownLatch2::await);
            return true;
        }));
        Assert.assertFalse(ObserveHandleUtils.isAtHead(observeBulkOffsets, commitLogReader));
        countDownLatch2.countDown();
        countDownLatch.await();
        Assert.assertEquals(2 * i, arrayList2.size());
        Assert.assertTrue(ObserveHandleUtils.isAtHead(observeBulkOffsets, commitLogReader));
        Assert.assertEquals(IntStream.range(0, 2 * i).mapToObj(i3 -> {
            return "key" + (i3 + 1);
        }).collect(Collectors.toList()), arrayList2.stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
        arrayList2.clear();
        Map fetchOffsets2 = commitLogReader.fetchOffsets(Position.NEWEST, commitLogReader.getPartitions());
        Assert.assertEquals(i, fetchOffsets2.size());
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        commitLogReader.observeBulkOffsets(fetchOffsets2.values(), true, LogObserverUtils.toList(arrayList2, (Consumer<Boolean>) bool2 -> {
            countDownLatch3.countDown();
        }));
        countDownLatch3.await();
        Assert.assertEquals(i, arrayList2.size());
        Assert.assertEquals(IntStream.range(i, 2 * i).mapToObj(i4 -> {
            return "key" + (i4 + 1);
        }).collect(Collectors.toList()), arrayList2.stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testObserveWithEndOfTime() throws InterruptedException {
        URI create = URI.create("inmem:///inmemstoragetest");
        InMemStorage inMemStorage = new InMemStorage();
        InMemStorage.setWatermarkEstimatorFactory(create, (j, str, consumedOffset) -> {
            return new WatermarkEstimator() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.7
                {
                    Preconditions.checkArgument(consumedOffset != null);
                }

                public long getWatermark() {
                    return Long.MAX_VALUE - InMemStorage.getBoundedOutOfOrderness();
                }

                public void setMinWatermark(long j) {
                }
            };
        });
        CommitLogReader commitLogReader = (CommitLogReader) inMemStorage.createAccessor(this.direct, createFamilyDescriptor(create)).getCommitLogReader(this.direct.getContext()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        commitLogReader.observe("observer", new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.8
            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onError(Throwable th) {
                return false;
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                return false;
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test(timeout = 1000)
    public void testObserveError() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        commitLogReader.observe("failing-observer", new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.9
            public void onCompleted() {
                throw new UnsupportedOperationException("This should never happen.");
            }

            public boolean onError(Throwable th) {
                countDownLatch.countDown();
                return false;
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("Test exception.");
            }
        });
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        commitLogReader.observe("success-observer", new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.10
            public void onCompleted() {
                throw new UnsupportedOperationException("This should never happen.");
            }

            public boolean onError(Throwable th) {
                return false;
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                countDownLatch2.countDown();
                return true;
            }
        });
        for (int i = 0; i < 100; i++) {
            attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key_" + i, this.data.getName(), System.currentTimeMillis(), new byte[]{2}), (z, th) -> {
            });
        }
        countDownLatch.await();
        countDownLatch2.await();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testObserveMultiplePartitions() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        CountDownLatch countDownLatch = new CountDownLatch(1000);
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions().subList(0, 2), createObserver(2, concurrentHashMap, countDownLatch));
        commitLogReader.observePartitions(commitLogReader.getPartitions().subList(2, 3), createObserver(1, concurrentHashMap, countDownLatch));
        for (int i = 0; i < 1000; i++) {
            attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key_" + i, this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), CommitCallback.noop());
        }
        countDownLatch.await();
        Assert.assertEquals(3L, concurrentHashMap.size());
        Assert.assertEquals(2L, observePartitions.getCurrentOffsets().size());
    }

    private static CommitLogObserver createObserver(final int i, final ConcurrentMap<Partition, Long> concurrentMap, final CountDownLatch countDownLatch) {
        return new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.11
            public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(i, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                concurrentMap.merge(onNextContext.getPartition(), 1L, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                onNextContext.confirm();
                countDownLatch.countDown();
                return countDownLatch.getCount() > 0;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
    }

    @Test
    public void testObserveSinglePartitionOutOfMultiplePartitions() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(333);
        List subList = commitLogReader.getPartitions().subList(0, 1);
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions().subList(0, 1), new CommitLogObserver() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.12
            public void onRepartition(CommitLogObserver.OnRepartitionContext onRepartitionContext) {
                Assert.assertEquals(1L, onRepartitionContext.partitions().size());
            }

            public boolean onNext(StreamElement streamElement, CommitLogObserver.OnNextContext onNextContext) {
                concurrentHashMap.merge(onNextContext.getPartition(), 1L, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                onNextContext.confirm();
                countDownLatch.countDown();
                return countDownLatch.getCount() > 0;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        KeyAttributePartitioner keyAttributePartitioner = new KeyAttributePartitioner();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 999; i++) {
            StreamElement upsert = StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key_" + i, this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3});
            hashMap.merge(Partition.of(Partitioners.getTruncatedPartitionId(keyAttributePartitioner, upsert, 3)), 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
            attributeWriterBase.online().write(upsert, CommitCallback.noop());
        }
        Assert.assertEquals(3L, hashMap.size());
        countDownLatch.await();
        Assert.assertEquals(1L, concurrentHashMap.size());
        Assert.assertEquals(1L, observePartitions.getCurrentOffsets().size());
        Assert.assertEquals(hashMap.get(Iterables.getOnlyElement(subList)), concurrentHashMap.get(Iterables.getOnlyElement(subList)));
    }

    @Test
    public void testRandomAccessReaderWithMultiplePartitions() {
        Assert.assertFalse(new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getRandomAccessReader(this.direct.getContext()).isPresent());
    }

    @Test
    public void testBatchLogReaderWithMultiplePartitions() {
        Assert.assertFalse(new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getBatchLogReader(this.direct.getContext()).isPresent());
    }

    @Test
    public void testCachedViewWithMultiplePartitions() {
        Optional cachedView = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///test"), 3)).getCachedView(this.direct.getContext());
        Assert.assertTrue(cachedView.isPresent());
        CachedView cachedView2 = (CachedView) cachedView.get();
        Assert.assertEquals(3L, cachedView2.getPartitions().size());
        cachedView2.assign(cachedView2.getPartitions());
        cachedView2.write(StreamElement.upsert(this.entity, this.data, UUID.randomUUID().toString(), "key", this.data.getName(), System.currentTimeMillis(), new byte[0]), (z, th) -> {
            Assert.assertTrue(z);
        });
        Optional optional = cachedView2.get("key", this.data);
        Assert.assertTrue(optional.isPresent());
        Assert.assertEquals("key", ((KeyValue) optional.get()).getKey());
    }

    @Test(timeout = 10000)
    public void testReadWriteSequentialIds() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        CommitLogReader commitLogReader = (CommitLogReader) Optionals.get(createAccessor.getCommitLogReader(this.direct.getContext()));
        AttributeWriterBase attributeWriterBase = (AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()));
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        commitLogReader.observe("test", LogObserverUtils.toList(arrayList, (v0) -> {
            Assert.assertTrue(v0);
        }, streamElement -> {
            countDownLatch.countDown();
            return false;
        }));
        attributeWriterBase.online().write(StreamElement.upsert(this.entity, this.data, 1L, "key", this.data.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertTrue(((StreamElement) arrayList.get(0)).hasSequentialId());
        Assert.assertEquals(1L, ((StreamElement) arrayList.get(0)).getSequentialId());
    }

    @Test
    public void testConsumedOffsetExternalizerToJson() throws JsonProcessingException {
        HashMap hashMap = (HashMap) new ObjectMapper().readValue(new InMemStorage.ConsumedOffsetExternalizer().toJson(new InMemStorage.ConsumedOffset(Partition.of(1), new HashSet(Arrays.asList("a", "b")), 1000L)), new TypeReference<HashMap<String, Object>>() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.13
        });
        Assert.assertEquals(1, hashMap.get("partition"));
        Assert.assertEquals(Arrays.asList("a", "b"), hashMap.get("offset"));
        Assert.assertEquals(1000, hashMap.get("watermark"));
    }

    @Test
    public void testConsumedOffsetExternalizerFromJson() {
        InMemStorage.ConsumedOffsetExternalizer consumedOffsetExternalizer = new InMemStorage.ConsumedOffsetExternalizer();
        InMemStorage.ConsumedOffset consumedOffset = new InMemStorage.ConsumedOffset(Partition.of(1), new HashSet(Arrays.asList("a", "b")), 1000L);
        Assert.assertEquals(consumedOffset, consumedOffsetExternalizer.m8fromJson(consumedOffsetExternalizer.toJson(consumedOffset)));
    }

    @Test
    public void testOffsetExternalizerFromBytesWhenInvalidJson() {
        InMemStorage.ConsumedOffsetExternalizer consumedOffsetExternalizer = new InMemStorage.ConsumedOffsetExternalizer();
        Assert.assertThrows(SerializationException.class, () -> {
            consumedOffsetExternalizer.m8fromJson(JsonProperty.USE_DEFAULT_NAME);
        });
    }

    @Test
    public void testConsumedOffsetExternalizerFromBytes() {
        InMemStorage.ConsumedOffsetExternalizer consumedOffsetExternalizer = new InMemStorage.ConsumedOffsetExternalizer();
        InMemStorage.ConsumedOffset consumedOffset = new InMemStorage.ConsumedOffset(Partition.of(1), new HashSet(Arrays.asList("a", "b")), 1000L);
        Assert.assertEquals(consumedOffset, consumedOffsetExternalizer.fromBytes(consumedOffsetExternalizer.toBytes(consumedOffset)));
    }

    @Test
    public void testOffsetExternalizerFromBytesWhenInvalidBytes() {
        InMemStorage.ConsumedOffsetExternalizer consumedOffsetExternalizer = new InMemStorage.ConsumedOffsetExternalizer();
        Assert.assertThrows(SerializationException.class, () -> {
            consumedOffsetExternalizer.fromBytes(new byte[]{0});
        });
    }

    @Test(timeout = 10000)
    public void testScanWildcard() throws InterruptedException {
        DataAccessor createAccessor = new InMemStorage().createAccessor(this.direct, createFamilyDescriptor(URI.create("inmem:///inmemstoragetest")));
        RandomAccessReader randomAccessReader = (RandomAccessReader) Optionals.get(createAccessor.getRandomAccessReader(this.direct.getContext()));
        OnlineAttributeWriter online = ((AttributeWriterBase) Optionals.get(createAccessor.getWriter(this.direct.getContext()))).online();
        String str = "key";
        Stream.of((Object[]) new String[]{"prefix", "non-prefix"}).forEach(str2 -> {
            online.write(StreamElement.upsert(this.entity, this.wildcard, UUID.randomUUID().toString(), str, this.wildcard.toAttributePrefix() + str2, System.currentTimeMillis(), new byte[]{1}), (z, th) -> {
            });
        });
        ArrayList arrayList = new ArrayList();
        AttributeDescriptor<?> attributeDescriptor = this.wildcard;
        RandomOffset fetchOffset = randomAccessReader.fetchOffset(RandomAccessReader.Listing.ATTRIBUTE, this.wildcard.toAttributePrefix() + "prefi");
        Objects.requireNonNull(arrayList);
        randomAccessReader.scanWildcard("key", attributeDescriptor, fetchOffset, 1, (v1) -> {
            r5.add(v1);
        });
        Assert.assertEquals(1L, arrayList.size());
        Assert.assertEquals(this.wildcard.toAttributePrefix() + "prefix", ((KeyValue) arrayList.get(0)).getAttribute());
    }

    private AttributeFamilyDescriptor createFamilyDescriptor(URI uri) {
        return createFamilyDescriptor(uri, 1);
    }

    private AttributeFamilyDescriptor createFamilyDescriptor(URI uri, int i) {
        HashMap hashMap = new HashMap();
        if (i > 1) {
            hashMap.put("num-partitions", Integer.valueOf(i));
        }
        return AttributeFamilyDescriptor.newBuilder().setName("test").setEntity(this.entity).setType(StorageType.PRIMARY).setAccess(AccessType.from("commit-log")).setStorageUri(uri).setCfg(hashMap).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1907816219:
                if (implMethodName.equals("lambda$testFetchOffsets$b282f67f$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1720544715:
                if (implMethodName.equals("lambda$testObserveWithEndOfTime$6fb7e9b9$1")) {
                    z = 5;
                    break;
                }
                break;
            case -1150636388:
                if (implMethodName.equals("lambda$testFetchOffsets$c8e18788$1")) {
                    z = 4;
                    break;
                }
                break;
            case -613126426:
                if (implMethodName.equals("lambda$testReadWriteSequentialIds$933e418c$1")) {
                    z = true;
                    break;
                }
                break;
            case -9280345:
                if (implMethodName.equals("lambda$testFetchOffsets$63dd2e61$1")) {
                    z = 2;
                    break;
                }
                break;
            case 96417:
                if (implMethodName.equals("add")) {
                    z = false;
                    break;
                }
                break;
            case 93223254:
                if (implMethodName.equals("await")) {
                    z = 6;
                    break;
                }
                break;
            case 2090945012:
                if (implMethodName.equals("assertTrue")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lcz/o2/proxima/core/storage/StreamElement;)Z")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return streamElement -> {
                        countDownLatch.countDown();
                        return false;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Ljava/lang/Boolean;)V")) {
                    CountDownLatch countDownLatch2 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return bool -> {
                        countDownLatch2.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/junit/Assert") && serializedLambda.getImplMethodSignature().equals("(Z)V")) {
                    return (v0) -> {
                        Assert.assertTrue(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Ljava/lang/Boolean;)V")) {
                    CountDownLatch countDownLatch3 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return bool2 -> {
                        countDownLatch3.countDown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/storage/InMemStorage$WatermarkEstimatorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JLjava/lang/String;Lcz/o2/proxima/direct/core/storage/InMemStorage$ConsumedOffset;)Lcz/o2/proxima/core/time/WatermarkEstimator;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/String;Lcz/o2/proxima/direct/core/storage/InMemStorage$ConsumedOffset;)Lcz/o2/proxima/core/time/WatermarkEstimator;")) {
                    InMemStorageTest inMemStorageTest = (InMemStorageTest) serializedLambda.getCapturedArg(0);
                    return (j, str, consumedOffset) -> {
                        return new WatermarkEstimator() { // from class: cz.o2.proxima.direct.core.storage.InMemStorageTest.7
                            {
                                Preconditions.checkArgument(consumedOffset != null);
                            }

                            public long getWatermark() {
                                return Long.MAX_VALUE - InMemStorage.getBoundedOutOfOrderness();
                            }

                            public void setMinWatermark(long j) {
                            }
                        };
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CountDownLatch") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    CountDownLatch countDownLatch4 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return countDownLatch4::await;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/core/storage/InMemStorageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lcz/o2/proxima/core/storage/StreamElement;)Z")) {
                    CountDownLatch countDownLatch5 = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return streamElement2 -> {
                        Objects.requireNonNull(countDownLatch5);
                        ExceptionUtils.ignoringInterrupted(countDownLatch5::await);
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
