package cz.o2.proxima.storage.kafka;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.kafka.shaded.com.google.common.collect.Iterators;
import cz.o2.proxima.kafka.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorBase;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.OnlineAttributeWriter;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.FirstPartitionPartitioner;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.commitlog.ObserveHandle;
import cz.o2.proxima.storage.commitlog.Offset;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.storage.commitlog.RetryableBulkObserver;
import cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptor;
import cz.o2.proxima.storage.randomaccess.KeyValue;
import cz.o2.proxima.util.Pair;
import cz.o2.proxima.view.PartitionedCachedView;
import cz.o2.proxima.view.PartitionedLogObserver;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.executor.local.LocalExecutor;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest.class */
public class LocalKafkaCommitLogDescriptorTest implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptorTest.class);
    private final transient Factory<ExecutorService> serviceFactory = () -> {
        return Executors.newCachedThreadPool(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                th.printStackTrace(System.err);
            });
            return thread;
        });
    };
    private final transient Repository repo = ConfigRepository.Builder.ofTest(ConfigFactory.empty()).withExecutorFactory(this.serviceFactory).build();
    private final AttributeDescriptorBase<byte[]> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("attr").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorBase<byte[]> attrWildcard = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("wildcard.*").setSchemeUri(new URI("bytes:///")).build();
    private final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).addAttribute(this.attrWildcard).build();
    private final URI storageUri = new URI("kafka-test://dummy/topic");
    private LocalKafkaCommitLogDescriptor kafka;

    /* loaded from: input_file:cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest$FirstBytePartitioner.class */
    static final class FirstBytePartitioner implements Partitioner {
        FirstBytePartitioner() {
        }

        public int getPartitionId(StreamElement streamElement) {
            if (streamElement.isDelete()) {
                return 0;
            }
            return streamElement.getValue()[0];
        }
    }

    @Before
    public void setUp() {
        this.kafka = new LocalKafkaCommitLogDescriptor();
    }

    @Test(timeout = 10000)
    public void testSinglePartitionWriteAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Assert.assertEquals("key#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        KafkaConsumer create = accessor.createConsumerFactory().create();
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testEmptyPoll() {
        Assert.assertTrue(this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2)).createConsumerFactory().create().poll(100L).isEmpty());
    }

    @Test
    public void testWriteNull() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        OnlineAttributeWriter online = ((AttributeWriterBase) accessor.getWriter(context()).get()).online();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        online.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), 1234567890000L, new byte[]{1}), (z, th) -> {
        });
        online.write(StreamElement.delete(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), 1234567890000L + 1000), (z2, th2) -> {
        });
        ConsumerRecords poll = create.poll(100L);
        Assert.assertEquals(2L, poll.count());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (consumerRecord.timestamp() == 1234567890000L) {
                Assert.assertEquals(1L, ((byte[]) consumerRecord.value()).length);
                i++;
            } else if (consumerRecord.timestamp() == 1234567890000L + 1000) {
                Assert.assertNull(consumerRecord.value());
                i++;
            }
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunBeforeWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndTwoReads() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create();
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll2 = create.poll(1000L);
        Assert.assertEquals(1L, poll2.count());
        Assert.assertEquals(1L, poll2.partitions().size());
    }

    @Test(timeout = 10000)
    public void testTwoIdependentConsumers() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer[] kafkaConsumerArr = {accessor.createConsumerFactory().create("dummy1"), accessor.createConsumerFactory().create("dummy2")};
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        for (KafkaConsumer kafkaConsumer : kafkaConsumerArr) {
            ConsumerRecords poll = kafkaConsumer.poll(1000L);
            Assert.assertEquals(1L, poll.count());
            Assert.assertEquals(1L, poll.partitions().size());
            TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
            Assert.assertEquals(0L, topicPartition.partition());
            Assert.assertEquals("topic", topicPartition.topic());
            int i = 0;
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                Assert.assertEquals("key#attr", consumerRecord.key());
                Assert.assertEquals("topic", consumerRecord.topic());
                Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
                i++;
            }
            Assert.assertEquals(1L, i);
        }
    }

    @Test(timeout = 10000)
    public void testManualPartitionAssignment() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        }));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 10000)
    public void testPollAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertTrue(accessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        })).poll(100L).isEmpty());
    }

    @Test(timeout = 10000)
    public void testPollWithSeek() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        accessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        })).seek(new TopicPartition("topic", 0), 1L);
        Assert.assertEquals(1L, r0.poll(100L).count());
    }

    @Test
    public void testTwoPartitionsTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create("consumer");
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(2L, create.assignment().size());
        Assert.assertEquals(1L, poll.count());
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(1000L).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.1
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(1L));
                put(new TopicPartition("topic", 1), new OffsetAndMetadata(1L));
            }
        });
        KafkaConsumer create2 = accessor.createConsumerFactory().create("consumer");
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertEquals(1L, create2.poll(1000L).count());
        Assert.assertTrue(create.poll(1000L).isEmpty());
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
    }

    @Test
    public void testSinglePartitionTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        KafkaConsumer create = accessor.createConsumerFactory().create("consumer");
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        ConsumerRecords poll = create.poll(1000L);
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, poll.count());
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(1000L).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.2
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(2L));
            }
        });
        KafkaConsumer create2 = accessor.createConsumerFactory().create("consumer");
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertEquals(1L, create2.poll(1000L).count());
        Assert.assertTrue(create.poll(1000L).isEmpty());
        Assert.assertEquals(0L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
    }

    @Test(timeout = 10000)
    public void testPartitionedViewSinglePartition() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstPartitionPartitioner.class));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        PartitionedView partitionedView = (PartitionedView) accessor.getPartitionedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing partitioned view");
        });
        List partitions = partitionedView.getPartitions();
        Assert.assertEquals(3L, partitions.size());
        ArrayList newArrayList = Lists.newArrayList(partitions.subList(0, 1));
        final ArrayList arrayList = new ArrayList();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new LocalExecutor().submit(partitionedView.observePartitions(newArrayList, new PartitionedLogObserver<Void>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.3
            public void onRepartition(Collection<Partition> collection) {
                arrayList.addAll(collection);
            }

            public boolean onNext(StreamElement streamElement, PartitionedLogObserver.ConfirmCallback confirmCallback, Partition partition, Consumer<Void> consumer) {
                Assert.assertEquals(0L, partition.getId());
                confirmCallback.confirm();
                try {
                    synchronousQueue.put(streamElement);
                    return false;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public void onCompleted() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).getFlow());
        Thread.sleep(500L);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m4newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        StreamElement streamElement = (StreamElement) synchronousQueue.take();
        Assert.assertEquals(update.getKey(), streamElement.getKey());
        Assert.assertEquals(update.getAttribute(), streamElement.getAttribute());
        Assert.assertEquals(update.getAttributeDescriptor(), streamElement.getAttributeDescriptor());
        Assert.assertEquals(update.getEntityDescriptor(), streamElement.getEntityDescriptor());
        Assert.assertArrayEquals(update.getValue(), streamElement.getValue());
        Assert.assertEquals(1L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testPartitionedView() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        PartitionedView partitionedView = (PartitionedView) accessor.getPartitionedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing partitioned view");
        });
        final ArrayList arrayList = new ArrayList();
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new LocalExecutor().submit(partitionedView.observe("test", new PartitionedLogObserver<Void>() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.4
            public void onRepartition(Collection<Partition> collection) {
                arrayList.addAll(collection);
            }

            public boolean onNext(StreamElement streamElement, PartitionedLogObserver.ConfirmCallback confirmCallback, Partition partition, Consumer<Void> consumer) {
                confirmCallback.confirm();
                try {
                    synchronousQueue.put(streamElement);
                    return false;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }

            public void onCompleted() {
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).getFlow());
        Thread.sleep(500L);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m4newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        StreamElement streamElement = (StreamElement) synchronousQueue.take();
        Assert.assertEquals(update.getKey(), streamElement.getKey());
        Assert.assertEquals(update.getAttribute(), streamElement.getAttribute());
        Assert.assertEquals(update.getAttributeDescriptor(), streamElement.getAttributeDescriptor());
        Assert.assertEquals(update.getEntityDescriptor(), streamElement.getEntityDescriptor());
        Assert.assertArrayEquals(update.getValue(), streamElement.getValue());
        Assert.assertEquals(3L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testBulkObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulk = commitLogReader.observeBulk("test", Position.NEWEST, new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.5
            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
                throw new RuntimeException(th);
            }
        });
        m4newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals("FAIL!", ((Throwable) atomicReference.get()).getMessage());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertEquals(3L, observeBulk.getCommittedOffsets().size());
        observeBulk.getCurrentOffsets().forEach(offset -> {
            Assert.assertEquals(0L, ((TopicOffset) offset).getOffset());
        });
    }

    @Test(timeout = 10000)
    public void testBulkObserveWithExceptionAndRetry() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        new RetryableBulkObserver(3, "test", commitLogReader) { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.6
            protected void failure() {
                countDownLatch.countDown();
            }

            protected boolean onNextInternal(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }
        }.start();
        Executors.newCachedThreadPool().execute(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                    m4newWriter.write(update, (z, th) -> {
                        Assert.assertTrue(z);
                    });
                } catch (InterruptedException e) {
                    return;
                }
            }
        });
        countDownLatch.await();
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testBulkObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulk = commitLogReader.observeBulk("test", Position.NEWEST, new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.7
            public void onRestart(List<Offset> list) {
                atomicInteger.incrementAndGet();
            }

            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                atomicReference2.set(streamElement);
                offsetCommitter.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        m4newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertArrayEquals(update.getValue(), ((StreamElement) atomicReference2.get()).getValue());
        Assert.assertEquals(3L, observeBulk.getCommittedOffsets().size());
        Assert.assertEquals(1L, ((Long) observeBulk.getCommittedOffsets().stream().collect(Collectors.summingLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }))).longValue());
    }

    @Test(timeout = 10000)
    public void testBulkObservePartitionsSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulkPartitions = commitLogReader.observeBulkPartitions(commitLogReader.getPartitions(), Position.NEWEST, new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.8
            public void onRestart(List<Offset> list) {
                atomicInteger.incrementAndGet();
            }

            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                atomicReference2.set(streamElement);
                offsetCommitter.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        m4newWriter.write(update, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertArrayEquals(update.getValue(), ((StreamElement) atomicReference2.get()).getValue());
        Assert.assertEquals(3L, observeBulkPartitions.getCommittedOffsets().size());
        Assert.assertEquals(1L, ((Long) observeBulkPartitions.getCommittedOffsets().stream().collect(Collectors.summingLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }))).longValue());
    }

    @Test(timeout = 10000)
    public void testBulkObserveOffsets() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final ArrayList arrayList = new ArrayList();
        final AtomicReference atomicReference = new AtomicReference(new CountDownLatch(3));
        StreamElement update = StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        HashMap hashMap = new HashMap();
        BulkLogObserver bulkLogObserver = new BulkLogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.9
            public boolean onNext(StreamElement streamElement, BulkLogObserver.OffsetCommitter offsetCommitter) {
                arrayList.add((KafkaStreamElement) streamElement);
                offsetCommitter.confirm();
                ((CountDownLatch) atomicReference.get()).countDown();
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
        ObserveHandle observeBulkPartitions = commitLogReader.observeBulkPartitions(commitLogReader.getPartitions(), Position.NEWEST, bulkLogObserver);
        for (int i = 0; i < 2; i++) {
            m4newWriter.write(update, (z, th) -> {
                Assert.assertTrue(z);
                ((CountDownLatch) atomicReference.get()).countDown();
            });
        }
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        observeBulkPartitions.getCommittedOffsets().forEach(offset -> {
        });
        observeBulkPartitions.cancel();
        Assert.assertEquals(3L, hashMap.size());
        Assert.assertEquals(1L, ((Long) hashMap.values().stream().collect(Collectors.summingLong(offset2 -> {
            return ((TopicOffset) offset2).getOffset();
        }))).longValue());
        ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(Lists.newArrayList(hashMap.values()), bulkLogObserver);
        ((CountDownLatch) atomicReference.get()).await();
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(0L, ((KafkaStreamElement) arrayList.get(0)).getOffset());
        Assert.assertEquals(1L, ((KafkaStreamElement) arrayList.get(1)).getOffset());
        Assert.assertEquals(2L, ((Long) observeBulkOffsets.getCommittedOffsets().stream().collect(Collectors.summingLong(offset3 -> {
            return ((TopicOffset) offset3).getOffset();
        }))).longValue());
    }

    @Test(timeout = 10000)
    public void testCachedView() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) accessor.getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
            Assert.assertTrue(z);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{1, 2}, (byte[]) ((KeyValue) partitionedCachedView.get("key", this.attr).get()).getValue());
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z2, th2) -> {
            Assert.assertTrue(z2);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertArrayEquals(new byte[]{1, 2, 3}, (byte[]) ((KeyValue) partitionedCachedView.get("key", this.attr).get()).getValue());
    }

    @Test(timeout = 10000)
    public void testCachedViewReload() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) accessor.getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(2));
        Arrays.asList(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3})).forEach(streamElement -> {
            m4newWriter.write(streamElement, (z, th) -> {
                Assert.assertTrue(z);
                ((CountDownLatch) atomicReference.get()).countDown();
            });
        });
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        partitionedCachedView.assign((Collection) IntStream.range(1, 2).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertFalse(partitionedCachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(partitionedCachedView.get("key1", this.attr).isPresent());
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
            Assert.assertTrue(z);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        TimeUnit.SECONDS.sleep(1L);
        partitionedCachedView.assign((Collection) IntStream.range(1, 3).mapToObj(i2 -> {
            return () -> {
                return i2;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(partitionedCachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(partitionedCachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWrite() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        List asList = Arrays.asList(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        asList.forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertTrue(partitionedCachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(partitionedCachedView.get("key1", this.attr).isPresent());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(partitionedCachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(partitionedCachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndDelete() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.delete(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        asList.forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertFalse(partitionedCachedView.get("key1", this.attr).isPresent());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertFalse(partitionedCachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndDeleteWildcard() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis - 500, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis + 500, new byte[]{2, 3}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 500, new byte[]{3, 4})}).forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertTrue(partitionedCachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(partitionedCachedView.get("key1", "wildcard.2", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(partitionedCachedView.get("key1", "wildcard.3", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertArrayEquals(new byte[]{2, 3}, (byte[]) ((KeyValue) partitionedCachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).get()).getValue());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(partitionedCachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(partitionedCachedView.get("key1", "wildcard.2", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(partitionedCachedView.get("key1", "wildcard.3", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertArrayEquals(new byte[]{2, 3}, (byte[]) ((KeyValue) partitionedCachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).get()).getValue());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndList() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis - 500), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis, new byte[]{1, 2}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 499, new byte[]{3, 4})}).forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: ", z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        ArrayList arrayList = new ArrayList();
        AttributeDescriptorBase<byte[]> attributeDescriptorBase = this.attrWildcard;
        arrayList.getClass();
        partitionedCachedView.scanWildcard("key1", attributeDescriptorBase, (v1) -> {
            r3.add(v1);
        });
        Assert.assertEquals(2L, arrayList.size());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        arrayList.clear();
        AttributeDescriptorBase<byte[]> attributeDescriptorBase2 = this.attrWildcard;
        arrayList.getClass();
        partitionedCachedView.scanWildcard("key1", attributeDescriptorBase2, (v1) -> {
            r3.add(v1);
        });
        Assert.assertEquals(2L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndListAll() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 2000, new byte[]{0}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis - 500), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis, new byte[]{1, 2}), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 499, new byte[]{3, 4})}).forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        partitionedCachedView.scanWildcardAll("key1", (v1) -> {
            r2.add(v1);
        });
        Assert.assertEquals(3L, arrayList.size());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        arrayList.clear();
        arrayList.getClass();
        partitionedCachedView.scanWildcardAll("key1", (v1) -> {
            r2.add(v1);
        });
        Assert.assertEquals(3L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testCachedViewWritePreUpdate() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        List asList = Arrays.asList(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        AtomicInteger atomicInteger = new AtomicInteger();
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()), (streamElement2, pair) -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testCachedViewWritePreUpdateAndDeleteWildcard() throws InterruptedException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, KeyPartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis + 1000), StreamElement.update(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis + 500, new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Ex1ception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        AtomicInteger atomicInteger = new AtomicInteger();
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()), (streamElement2, pair) -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test
    public void testRewriteAndPrefetch() throws InterruptedException, IOException {
        PartitionedCachedView partitionedCachedView = (PartitionedCachedView) this.kafka.getAccessor(this.entity, this.storageUri, partitionsCfg(3, KeyPartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{1, 2}), StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            partitionedCachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{2, 3}, (byte[]) ((KeyValue) partitionedCachedView.get("key1", this.attr).get()).getValue());
        partitionedCachedView.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{3, 4}), (z, th) -> {
            Assert.assertTrue(z);
        });
        Assert.assertArrayEquals(new byte[]{3, 4}, (byte[]) ((KeyValue) partitionedCachedView.get("key1", this.attr).get()).getValue());
        partitionedCachedView.close();
        Assert.assertFalse(partitionedCachedView.get("key1", this.attr).isPresent());
        partitionedCachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i2 -> {
            return () -> {
                return i2;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{3, 4}, (byte[]) ((KeyValue) partitionedCachedView.get("key1", this.attr).get()).getValue());
    }

    @Test(timeout = 5000)
    public void testMaxBytesPerSec() throws InterruptedException {
        long nanoTime = System.nanoTime();
        testSequentialConsumption(3L);
        Assert.assertTrue(System.nanoTime() - nanoTime > 500000000);
    }

    @Test(timeout = 5000)
    public void testNoMaxBytesPerSec() throws InterruptedException {
        long nanoTime = System.nanoTime();
        testSequentialConsumption(Long.MAX_VALUE);
        Assert.assertTrue(System.nanoTime() - nanoTime < 500000000);
    }

    private void testSequentialConsumption(long j) throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor accessor = this.kafka.getAccessor(this.entity, this.storageUri, cfg(Pair.of("bytes-per-sec-max", Long.valueOf(j))));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m4newWriter = accessor.m4newWriter();
        final AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
        ((CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing log reader");
        })).observe("dummy", new LogObserver() { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.10
            public boolean onNext(StreamElement streamElement, LogObserver.OffsetCommitter offsetCommitter) {
                ((CountDownLatch) atomicReference.getAndSet(new CountDownLatch(1))).countDown();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        CountDownLatch countDownLatch = (CountDownLatch) atomicReference.get();
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        countDownLatch.await();
        CountDownLatch countDownLatch2 = (CountDownLatch) atomicReference.get();
        m4newWriter.write(StreamElement.update(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        countDownLatch2.await();
    }

    private static Map<String, Object> partitionsCfg(int i) {
        return partitionsCfg(i, null);
    }

    private static Map<String, Object> partitionsCfg(int i, @Nullable Class<? extends Partitioner> cls) {
        Pair[] pairArr = new Pair[2];
        pairArr[0] = Pair.of(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, String.valueOf(i));
        pairArr[1] = cls != null ? Pair.of("partitioner", cls.getName()) : null;
        return cfg(pairArr);
    }

    @SafeVarargs
    private static Map<String, Object> cfg(Pair<String, Object>... pairArr) {
        return (Map) Arrays.stream(pairArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getFirst();
        }, (v0) -> {
            return v0.getSecond();
        }));
    }

    private static byte[] emptyValue() {
        return new byte[0];
    }

    private Context context() {
        return new Context(this.serviceFactory) { // from class: cz.o2.proxima.storage.kafka.LocalKafkaCommitLogDescriptorTest.11
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1868602320:
                if (implMethodName.equals("lambda$testPollAfterWrite$89f1a89$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1538853141:
                if (implMethodName.equals("lambda$testPollWithSeek$89f1a89$1")) {
                    z = 6;
                    break;
                }
                break;
            case -659134205:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdateAndDeleteWildcard$306fe2c4$1")) {
                    z = 12;
                    break;
                }
                break;
            case -418056700:
                if (implMethodName.equals("lambda$null$74d5a673$10")) {
                    z = 8;
                    break;
                }
                break;
            case -418056699:
                if (implMethodName.equals("lambda$null$74d5a673$11")) {
                    z = 9;
                    break;
                }
                break;
            case -418056698:
                if (implMethodName.equals("lambda$null$74d5a673$12")) {
                    z = 10;
                    break;
                }
                break;
            case 96417:
                if (implMethodName.equals("add")) {
                    z = false;
                    break;
                }
                break;
            case 218366049:
                if (implMethodName.equals("lambda$testManualPartitionAssignment$89f1a89$1")) {
                    z = 2;
                    break;
                }
                break;
            case 288116889:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdate$306fe2c4$1")) {
                    z = 11;
                    break;
                }
                break;
            case 1509115140:
                if (implMethodName.equals("lambda$new$bdaa087c$1")) {
                    z = 13;
                    break;
                }
                break;
            case 1649082284:
                if (implMethodName.equals("lambda$null$74d5a673$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1649082285:
                if (implMethodName.equals("lambda$null$74d5a673$2")) {
                    z = 5;
                    break;
                }
                break;
            case 1649082286:
                if (implMethodName.equals("lambda$null$74d5a673$3")) {
                    z = true;
                    break;
                }
                break;
            case 1649082287:
                if (implMethodName.equals("lambda$null$74d5a673$4")) {
                    z = 3;
                    break;
                }
                break;
            case 1649082288:
                if (implMethodName.equals("lambda$null$74d5a673$5")) {
                    z = 16;
                    break;
                }
                break;
            case 1649082289:
                if (implMethodName.equals("lambda$null$74d5a673$6")) {
                    z = 17;
                    break;
                }
                break;
            case 1649082290:
                if (implMethodName.equals("lambda$null$74d5a673$7")) {
                    z = 14;
                    break;
                }
                break;
            case 1649082291:
                if (implMethodName.equals("lambda$null$74d5a673$8")) {
                    z = 15;
                    break;
                }
                break;
            case 1649082292:
                if (implMethodName.equals("lambda$null$74d5a673$9")) {
                    z = 18;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list3 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list4 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue3 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue4 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue5 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue5;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue6 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue6;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue7 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue7;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/util/Pair;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (streamElement2, pair) -> {
                        atomicInteger.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/util/Pair;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (streamElement22, pair2) -> {
                        atomicInteger2.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return () -> {
                        return Executors.newCachedThreadPool(runnable -> {
                            Thread thread = new Thread(runnable);
                            thread.setUncaughtExceptionHandler((thread2, th) -> {
                                th.printStackTrace(System.err);
                            });
                            return thread;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue8 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue8;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue9 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue9;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue10 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue10;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue11 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue11;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue12 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue12;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
