package cz.o2.proxima.direct.kafka;

import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.RetryableLogObserver;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptor;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.functional.Consumer;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterators;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeDescriptorBase;
import cz.o2.proxima.repository.ConfigRepository;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicy;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.util.Pair;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest.class */
public class LocalKafkaCommitLogDescriptorTest implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptorTest.class);
    private final transient Factory<ExecutorService> serviceFactory = () -> {
        return Executors.newCachedThreadPool(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                th.printStackTrace(System.err);
            });
            return thread;
        });
    };
    private final transient Repository repo = ConfigRepository.Builder.ofTest(ConfigFactory.empty()).build();
    private final DirectDataOperator direct = this.repo.getOrCreateOperator(DirectDataOperator.class, new Consumer[]{directDataOperator -> {
        directDataOperator.withExecutorFactory(this.serviceFactory);
    }});
    private final AttributeDescriptorBase<byte[]> attr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("attr").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorBase<byte[]> attrWildcard = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("wildcard.*").setSchemeUri(new URI("bytes:///")).build();
    private final AttributeDescriptorBase<String> strAttr = AttributeDescriptor.newBuilder(this.repo).setEntity("entity").setName("strAttr").setSchemeUri(new URI("string:///")).build();
    private final EntityDescriptor entity = EntityDescriptor.newBuilder().setName("entity").addAttribute(this.attr).addAttribute(this.attrWildcard).addAttribute(this.strAttr).build();
    private final URI storageUri = new URI("kafka-test://dummy/topic");
    private LocalKafkaCommitLogDescriptor kafka;

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest$FirstBytePartitioner.class */
    public static final class FirstBytePartitioner implements Partitioner {
        public int getPartitionId(StreamElement streamElement) {
            if (streamElement.isDelete()) {
                return 0;
            }
            return streamElement.getValue()[0];
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest$FixedWatermarkEstimatorFactory.class */
    public static final class FixedWatermarkEstimatorFactory implements WatermarkEstimatorFactory {
        public static final long FIXED_WATERMARK = 333;

        public WatermarkEstimator create(Map<String, Object> map, WatermarkIdlePolicyFactory watermarkIdlePolicyFactory) {
            return new WatermarkEstimator() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.FixedWatermarkEstimatorFactory.1
                public long getWatermark() {
                    return 333L;
                }

                public void setMinWatermark(long j) {
                }
            };
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest$FixedWatermarkIdlePolicyFactory.class */
    public static final class FixedWatermarkIdlePolicyFactory implements WatermarkIdlePolicyFactory {
        public static final long FIXED_IDLE_WATERMARK = 555;

        public WatermarkIdlePolicy create(Map<String, Object> map) {
            return () -> {
                return 555L;
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -458497283:
                    if (implMethodName.equals("lambda$create$591916a5$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/time/WatermarkIdlePolicy") && serializedLambda.getFunctionalInterfaceMethodName().equals("getIdleWatermark") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest$FixedWatermarkIdlePolicyFactory") && serializedLambda.getImplMethodSignature().equals("()J")) {
                        return () -> {
                            return 555L;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest$SingleAttrSerializer.class */
    public static final class SingleAttrSerializer implements ElementSerializer<String, String> {
        private AttributeDescriptor<?> attrDesc;

        public void setup(EntityDescriptor entityDescriptor) {
            this.attrDesc = (AttributeDescriptor) entityDescriptor.findAttribute("strAttr").orElseThrow(() -> {
                return new IllegalStateException("Missing attribute 'strAttr'");
            });
        }

        @Nullable
        public StreamElement read(ConsumerRecord<String, String> consumerRecord, EntityDescriptor entityDescriptor) {
            return StreamElement.upsert(entityDescriptor, this.attrDesc, this.attrDesc.getName(), UUID.randomUUID().toString(), (String) consumerRecord.key(), consumerRecord.timestamp(), ((String) consumerRecord.value()).getBytes());
        }

        public Pair<String, String> write(StreamElement streamElement) {
            return Pair.of(streamElement.getKey(), (String) streamElement.getParsed().get());
        }

        public Serde<String> keySerde() {
            return Serdes.String();
        }

        public Serde<String> valueSerde() {
            return Serdes.String();
        }
    }

    @Before
    public void setUp() {
        this.kafka = new LocalKafkaCommitLogDescriptor();
    }

    @Test(timeout = 10000)
    public void testSinglePartitionWriteAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Assert.assertEquals("key#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testEmptyPoll() {
        Assert.assertTrue(this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2)).createConsumerFactory().create().poll(Duration.ofMillis(100L)).isEmpty());
    }

    @Test
    public void testWriteNull() {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        OnlineAttributeWriter online = ((AttributeWriterBase) createAccessor.getWriter(context()).get()).online();
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        online.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), 1234567890000L, new byte[]{1}), (z, th) -> {
        });
        online.write(StreamElement.delete(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), 1234567890000L + 1000), (z2, th2) -> {
        });
        ConsumerRecords poll = create.poll(Duration.ofMillis(100L));
        Assert.assertEquals(2L, poll.count());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (consumerRecord.timestamp() == 1234567890000L) {
                Assert.assertEquals(1L, ((byte[]) consumerRecord.value()).length);
                i++;
            } else if (consumerRecord.timestamp() == 1234567890000L + 1000) {
                Assert.assertNull(consumerRecord.value());
                i++;
            }
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndConsumeBySingleConsumerRunBeforeWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(2L, poll.count());
        Assert.assertEquals(2L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals(1L, ((TopicPartition) r0.next()).partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(2L, i);
    }

    @Test(timeout = 10000)
    public void testTwoPartitionsTwoWritesAndTwoReads() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
        });
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
        });
        ConsumerRecords poll2 = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, poll2.count());
        Assert.assertEquals(1L, poll2.partitions().size());
    }

    @Test(timeout = 10000)
    public void testTwoIdependentConsumers() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer[] kafkaConsumerArr = {createAccessor.createConsumerFactory().create("dummy1"), createAccessor.createConsumerFactory().create("dummy2")};
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        for (KafkaConsumer kafkaConsumer : kafkaConsumerArr) {
            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000L));
            Assert.assertEquals(1L, poll.count());
            Assert.assertEquals(1L, poll.partitions().size());
            TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
            Assert.assertEquals(0L, topicPartition.partition());
            Assert.assertEquals("topic", topicPartition.topic());
            int i = 0;
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                Assert.assertEquals("key#attr", consumerRecord.key());
                Assert.assertEquals("topic", consumerRecord.topic());
                Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
                i++;
            }
            Assert.assertEquals(1L, i);
        }
    }

    @Test(timeout = 10000)
    public void testManualPartitionAssignment() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        }));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) poll.partitions().iterator().next();
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            i++;
            Assert.assertEquals("key" + i + "#attr", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertArrayEquals(emptyValue(), (byte[]) consumerRecord.value());
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 10000)
    public void testPollAfterWrite() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertTrue(createAccessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        })).poll(Duration.ofMillis(100L)).isEmpty());
    }

    @Test(timeout = 10000)
    public void testPollWithSeek() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
            Assert.assertTrue(z2);
            Assert.assertNull(th2);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        createAccessor.createConsumerFactory().create(Arrays.asList(() -> {
            return 0;
        })).seek(new TopicPartition("topic", 0), 1L);
        Assert.assertEquals(1L, r0.poll(Duration.ofMillis(100L)).count());
    }

    @Test
    public void testTwoPartitionsTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(2));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create("consumer");
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(2L, create.assignment().size());
        Assert.assertEquals(1L, poll.count());
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(Duration.ofMillis(1000L)).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.1
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(1L));
                put(new TopicPartition("topic", 1), new OffsetAndMetadata(1L));
            }
        });
        KafkaConsumer create2 = createAccessor.createConsumerFactory().create("consumer");
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertEquals(1L, create2.poll(Duration.ofMillis(1000L)).count());
        Assert.assertTrue(create.poll(Duration.ofMillis(1000L)).isEmpty());
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
    }

    @Test
    public void testSinglePartitionTwoConsumersRebalance() {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create("consumer");
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
        });
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, create.assignment().size());
        Assert.assertEquals(1L, poll.count());
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z2, th2) -> {
        });
        Assert.assertEquals(1L, create.poll(Duration.ofMillis(1000L)).count());
        create.commitSync(new HashMap<TopicPartition, OffsetAndMetadata>() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.2
            {
                put(new TopicPartition("topic", 0), new OffsetAndMetadata(2L));
            }
        });
        KafkaConsumer create2 = createAccessor.createConsumerFactory().create("consumer");
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z3, th3) -> {
        });
        Assert.assertEquals(1L, create2.poll(Duration.ofMillis(1000L)).count());
        Assert.assertTrue(create.poll(Duration.ofMillis(1000L)).isEmpty());
        Assert.assertEquals(0L, create.assignment().size());
        Assert.assertEquals(1L, create2.assignment().size());
    }

    @Test(timeout = 10000)
    public void testObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observe = commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.3
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                onNextContext.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals(3L, observe.getCommittedOffsets().size());
        Assert.assertEquals(1L, observe.getCommittedOffsets().stream().mapToLong(offset -> {
            TopicOffset topicOffset = (TopicOffset) offset;
            Assert.assertTrue(topicOffset.getOffset() <= 1);
            return topicOffset.getOffset();
        }).sum());
    }

    @Test(timeout = 10000)
    public void testObserveMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        UnaryFunction unaryFunction = num -> {
            return StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key" + num, this.attr.getName(), currentTimeMillis + num.intValue(), new byte[]{1, 2});
        };
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.4
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicLong.set(onNextContext.getWatermark());
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).waitUntilReady();
        for (int i = 0; i < 100; i++) {
            m3newWriter.write((StreamElement) unaryFunction.apply(Integer.valueOf(i)), (z, th) -> {
            });
        }
        countDownLatch.await();
        Assert.assertTrue(atomicLong.get() > 0);
        Assert.assertTrue(atomicLong.get() < currentTimeMillis * 10);
    }

    @Test(timeout = 10000)
    public void testEmptyPollMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000"))));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), currentTimeMillis + 2000, new byte[]{1, 2});
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.5
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicLong.set(onNextContext.getWatermark());
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).waitUntilReady();
        m3newWriter.write(upsert, (z, th) -> {
        });
        TimeUnit.SECONDS.sleep(2L);
        m3newWriter.write(upsert, (z2, th2) -> {
        });
        countDownLatch.await();
        Assert.assertTrue(atomicLong.get() > 0);
        Assert.assertTrue(atomicLong.get() < currentTimeMillis * 10);
    }

    @Test(timeout = 10000)
    public void testEmptyPollWithNoDataMovesWatermark() throws InterruptedException {
        CommitLogReader commitLogReader = (CommitLogReader) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000")))).getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(30);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.6
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }

            public void onIdle(LogObserver.OnIdleContext onIdleContext) {
                atomicLong.set(onIdleContext.getWatermark());
                countDownLatch.countDown();
            }
        }).waitUntilReady();
        TimeUnit.SECONDS.sleep(2L);
        countDownLatch.await();
        Assert.assertTrue(atomicLong.get() > 0);
        Assert.assertTrue(atomicLong.get() < currentTimeMillis * 10);
    }

    @Test(timeout = 10000)
    public void testSlowPollMovesWatermarkSlowly() throws InterruptedException {
        CommitLogReader commitLogReader = (CommitLogReader) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000")))).getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(30);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.7
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }

            public void onIdle(LogObserver.OnIdleContext onIdleContext) {
                atomicLong.set(onIdleContext.getWatermark());
                countDownLatch.countDown();
            }
        }).waitUntilReady();
        TimeUnit.SECONDS.sleep(2L);
        countDownLatch.await();
        Assert.assertTrue(atomicLong.get() > 0);
        Assert.assertTrue(atomicLong.get() < currentTimeMillis * 10);
    }

    @Test(timeout = 10000)
    public void testPollFromMoreConsumersThanPartitionsMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000"), Pair.of("assignment-timeout-ms", "1"))));
        testPollFromNConsumersMovesWatermarkWithNoWrite(createAccessor, 4);
        writeData(createAccessor);
        testPollFromNConsumersMovesWatermark(createAccessor, 4);
    }

    @Test(timeout = 100000)
    public void testPollFromManyMoreConsumersThanPartitionsMovesWatermark() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000"), Pair.of("assignment-timeout-ms", "1"))));
        testPollFromNConsumersMovesWatermarkWithNoWrite(createAccessor, 400);
        writeData(createAccessor);
        testPollFromNConsumersMovesWatermark(createAccessor, 400);
    }

    void writeData(LocalKafkaCommitLogDescriptor.Accessor accessor) {
        accessor.m3newWriter().write(StreamElement.upsert(this.entity, this.attr, "key", UUID.randomUUID().toString(), this.attr.getName(), 1500000000000L, new byte[]{1}), (z, th) -> {
        });
    }

    void testPollFromNConsumersMovesWatermark(LocalKafkaCommitLogDescriptor.Accessor accessor, int i) throws InterruptedException {
        testPollFromNConsumersMovesWatermark(accessor, i, true);
    }

    void testPollFromNConsumersMovesWatermarkWithNoWrite(LocalKafkaCommitLogDescriptor.Accessor accessor, int i) throws InterruptedException {
        testPollFromNConsumersMovesWatermark(accessor, i, false);
    }

    void testPollFromNConsumersMovesWatermark(LocalKafkaCommitLogDescriptor.Accessor accessor, final int i, final boolean z) throws InterruptedException {
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(i);
        final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final AtomicInteger atomicInteger = new AtomicInteger();
        for (int i2 = 0; i2 < i; i2++) {
            commitLogReader.observe("test-" + z, Position.OLDEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.8
                public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                    LocalKafkaCommitLogDescriptorTest.log.debug("Received element {} on watermark {}", streamElement, Long.valueOf(onNextContext.getWatermark()));
                    return true;
                }

                public boolean onError(Throwable th) {
                    throw new RuntimeException(th);
                }

                public void onIdle(LogObserver.OnIdleContext onIdleContext) {
                    if (atomicInteger.get() == i) {
                        concurrentHashMap.compute(this, (logObserver, l) -> {
                            return Long.valueOf(Math.max(((Long) MoreObjects.firstNonNull(l, Long.MIN_VALUE)).longValue(), onIdleContext.getWatermark()));
                        });
                        if ((!z || ((Long) concurrentHashMap.get(this)).longValue() > 0) && synchronizedSet.add(this)) {
                            countDownLatch.countDown();
                        }
                    }
                }
            }).waitUntilReady();
            atomicInteger.incrementAndGet();
        }
        countDownLatch.await();
        Assert.assertEquals(i, concurrentHashMap.size());
        long longValue = ((Long) concurrentHashMap.values().stream().min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(Long.MIN_VALUE)).longValue();
        Assert.assertTrue(!z || longValue > 0);
        Assert.assertTrue("Watermark should not be too far, got " + longValue + " calculated from " + concurrentHashMap, longValue < currentTimeMillis * 10);
    }

    @Test(timeout = 10000)
    public void testObserveBulkCommitsCorrectly() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, cfg(Pair.of("assignment-timeout-ms", 1L), Pair.of(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, 3)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        LocalKafkaCommitLogDescriptor.LocalKafkaLogReader localKafkaLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key-" + i, this.attr.getName(), currentTimeMillis + 2000, new byte[]{1, 2}), (z, th) -> {
            });
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ObserveHandle observeBulk = localKafkaLogReader.observeBulk("test", Position.OLDEST, true, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.9
            int processed = 0;

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                int i2 = this.processed + 1;
                this.processed = i2;
                if (i2 != 100) {
                    return true;
                }
                onNextContext.confirm();
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onError(Throwable th2) {
                throw new RuntimeException(th2);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(100L, observeBulk.getCommittedOffsets().stream().mapToLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }).sum());
        KafkaConsumer<Object, Object> consumer = localKafkaLogReader.getConsumer();
        String topic = createAccessor.getTopic();
        Assert.assertEquals(100L, consumer.committed((Set) observeBulk.getCommittedOffsets().stream().map(offset2 -> {
            return new TopicPartition(topic, offset2.getPartition().getId());
        }).collect(Collectors.toSet())).values().stream().mapToLong((v0) -> {
            return v0.offset();
        }).sum());
    }

    @Test(timeout = 100000)
    public void testOnlineObserveWithRebalanceResetsOffsetCommitter() throws InterruptedException {
        final int i = 5;
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, cfg(Pair.of(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, 3), Pair.of("kafka.max.poll.records", 1)));
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        testOnlineObserveWithRebalanceResetsOffsetCommitterWithObserver(new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.10
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        onNextContext.confirm();
                        break;
                    case 2:
                        throw new RuntimeException("Failing first consumer!");
                    default:
                        synchronizedList.add(onNextContext);
                        break;
                }
                if (atomicInteger.get() == i) {
                    synchronizedList.forEach((v0) -> {
                        v0.confirm();
                    });
                }
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
            }

            public boolean onError(Throwable th) {
                return true;
            }
        }, createAccessor, 5);
        countDownLatch.await();
        Assert.assertEquals("Invalid committed offests: " + createAccessor.committedOffsets, 3L, createAccessor.committedOffsets.values().stream().mapToInt(atomicInteger2 -> {
            return atomicInteger2.get();
        }).sum());
    }

    private void testOnlineObserveWithRebalanceResetsOffsetCommitterWithObserver(LogObserver logObserver, LocalKafkaCommitLogDescriptor.Accessor accessor, int i) throws InterruptedException {
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = accessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) accessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        new AtomicInteger();
        Collections.synchronizedList(new ArrayList());
        ObserveHandle[] observeHandleArr = {commitLogReader.observe("test", Position.NEWEST, logObserver), commitLogReader.observe("test", Position.NEWEST, logObserver)};
        for (int i2 = 0; i2 < i; i2++) {
            m3newWriter.write(upsert, (z, th) -> {
                Assert.assertTrue(z);
            });
        }
    }

    @Test(timeout = 10000)
    public void testObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observe = commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.11
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
                throw new RuntimeException(th);
            }
        });
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals("FAIL!", ((Throwable) atomicReference.get()).getMessage());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertEquals(3L, observe.getCommittedOffsets().size());
        observe.getCurrentOffsets().forEach(offset -> {
            Assert.assertEquals(0L, ((TopicOffset) offset).getOffset());
        });
    }

    @Test(timeout = 10000)
    public void testBulkObserveWithException() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulk = commitLogReader.observeBulk("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.12
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                countDownLatch.countDown();
                throw new RuntimeException(th);
            }
        });
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertEquals("FAIL!", ((Throwable) atomicReference.get()).getMessage());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertEquals(3L, observeBulk.getCommittedOffsets().size());
        observeBulk.getCurrentOffsets().forEach(offset -> {
            Assert.assertEquals(0L, ((TopicOffset) offset).getOffset());
        });
    }

    @Test(timeout = 10000)
    public void testBulkObserveWithExceptionAndRetry() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        RetryableLogObserver.bulk(3, "test", commitLogReader, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.13
            public boolean onError(Throwable th) {
                countDownLatch.countDown();
                return false;
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("FAIL!");
            }
        }).start();
        Executors.newCachedThreadPool().execute(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                    m3newWriter.write(upsert, (z, th) -> {
                        Assert.assertTrue(z);
                    });
                } catch (InterruptedException e) {
                    return;
                }
            }
        });
        countDownLatch.await();
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testBulkObserveSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulk = commitLogReader.observeBulk("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.14
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                atomicInteger.incrementAndGet();
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicReference2.set(streamElement);
                onNextContext.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertArrayEquals(upsert.getValue(), ((StreamElement) atomicReference2.get()).getValue());
        Assert.assertEquals(3L, observeBulk.getCommittedOffsets().size());
        Assert.assertEquals(1L, Long.valueOf(observeBulk.getCommittedOffsets().stream().mapToLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }).sum()).longValue());
    }

    @Test(timeout = 10000)
    public void testBulkObservePartitionsFromOldestSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        for (int i = 0; i < 1000; i++) {
            m3newWriter.write(upsert, (z, th) -> {
                Assert.assertTrue(z);
            });
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        commitLogReader.observeBulkPartitions(commitLogReader.getPartitions(), Position.OLDEST, true, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.15
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicInteger.incrementAndGet();
                onNextContext.confirm();
                return true;
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }

            public boolean onError(Throwable th2) {
                throw new RuntimeException(th2);
            }
        });
        countDownLatch.await();
        Assert.assertEquals(1000L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testBulkObservePartitionsSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observeBulkPartitions = commitLogReader.observeBulkPartitions(commitLogReader.getPartitions(), Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.16
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                atomicInteger.incrementAndGet();
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicReference2.set(streamElement);
                onNextContext.confirm();
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(1L, atomicInteger.get());
        Assert.assertArrayEquals(upsert.getValue(), ((StreamElement) atomicReference2.get()).getValue());
        Assert.assertEquals(3L, observeBulkPartitions.getCommittedOffsets().size());
        Assert.assertEquals(1L, Long.valueOf(observeBulkPartitions.getCommittedOffsets().stream().mapToLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }).sum()).longValue());
    }

    @Test(timeout = 10000)
    public void testBulkObservePartitionsResetOffsetsSuccess() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        final AtomicReference atomicReference3 = new AtomicReference(new CountDownLatch(2));
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        ObserveHandle observePartitions = commitLogReader.observePartitions(commitLogReader.getPartitions(), Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.17
            public void onRepartition(LogObserver.OnRepartitionContext onRepartitionContext) {
                atomicInteger.incrementAndGet();
            }

            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicReference2.set(streamElement);
                onNextContext.confirm();
                ((CountDownLatch) atomicReference3.get()).countDown();
                return true;
            }

            public boolean onError(Throwable th) {
                atomicReference.set(th);
                throw new RuntimeException(th);
            }
        });
        observePartitions.waitUntilReady();
        m3newWriter.write(upsert, (z, th) -> {
            Assert.assertTrue(z);
            ((CountDownLatch) atomicReference3.get()).countDown();
        });
        ((CountDownLatch) atomicReference3.get()).await();
        atomicReference3.set(new CountDownLatch(1));
        observePartitions.resetOffsets((List) commitLogReader.getPartitions().stream().map(partition -> {
            return new TopicOffset(partition.getId(), 0L, Long.MIN_VALUE);
        }).collect(Collectors.toList()));
        ((CountDownLatch) atomicReference3.get()).await();
        Assert.assertEquals(1L, Long.valueOf(observePartitions.getCommittedOffsets().stream().mapToLong(offset -> {
            return ((TopicOffset) offset).getOffset();
        }).sum()).longValue());
    }

    @Test
    public void testObserveOnNonExistingTopic() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.LocalKafkaLogReader m2newReader = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3)).m2newReader(context());
        try {
            Assert.assertNotNull(m2newReader.getPartitions());
            m2newReader.validateTopic(m2newReader.getConsumer(), "non-existing-topic");
            Assert.fail("Should throw exception");
            Assert.fail("Should throw IllegalArgumentException");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("Received null or empty partitions for topic [non-existing-topic]. Please check that the topic exists and has at least one partition.", e.getMessage());
        }
    }

    @Test(timeout = 10000)
    public void testBulkObserveOffsets() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        final ArrayList arrayList = new ArrayList();
        final AtomicReference atomicReference = new AtomicReference(new CountDownLatch(3));
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2});
        HashMap hashMap = new HashMap();
        LogObserver logObserver = new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.18
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                arrayList.add((KafkaStreamElement) streamElement);
                onNextContext.confirm();
                ((CountDownLatch) atomicReference.get()).countDown();
                return false;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
        ObserveHandle observeBulkPartitions = commitLogReader.observeBulkPartitions(commitLogReader.getPartitions(), Position.NEWEST, logObserver);
        Throwable th = null;
        for (int i = 0; i < 2; i++) {
            try {
                try {
                    m3newWriter.write(upsert, (z, th2) -> {
                        Assert.assertTrue(z);
                        ((CountDownLatch) atomicReference.get()).countDown();
                    });
                } finally {
                }
            } catch (Throwable th3) {
                if (observeBulkPartitions != null) {
                    if (th != null) {
                        try {
                            observeBulkPartitions.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        observeBulkPartitions.close();
                    }
                }
                throw th3;
            }
        }
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        observeBulkPartitions.getCommittedOffsets().forEach(offset -> {
        });
        if (observeBulkPartitions != null) {
            if (0 != 0) {
                try {
                    observeBulkPartitions.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            } else {
                observeBulkPartitions.close();
            }
        }
        Assert.assertEquals(3L, hashMap.size());
        Assert.assertEquals(1L, ((Long) hashMap.values().stream().collect(Collectors.summingLong(offset2 -> {
            return ((TopicOffset) offset2).getOffset();
        }))).longValue());
        ObserveHandle observeBulkOffsets = commitLogReader.observeBulkOffsets(Lists.newArrayList(hashMap.values()), logObserver);
        ((CountDownLatch) atomicReference.get()).await();
        Assert.assertEquals(2L, arrayList.size());
        Assert.assertEquals(0L, ((KafkaStreamElement) arrayList.get(0)).getOffset());
        Assert.assertEquals(1L, ((KafkaStreamElement) arrayList.get(1)).getOffset());
        Assert.assertEquals(2L, ((Long) observeBulkOffsets.getCommittedOffsets().stream().collect(Collectors.summingLong(offset3 -> {
            return ((TopicOffset) offset3).getOffset();
        }))).longValue());
    }

    @Test(timeout = 10000)
    public void testCachedView() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CachedView cachedView = (CachedView) createAccessor.getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), (z, th) -> {
            Assert.assertTrue(z);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{1, 2}, ((KeyValue) cachedView.get("key", this.attr).get()).getValue());
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z2, th2) -> {
            Assert.assertTrue(z2);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertArrayEquals(new byte[]{1, 2, 3}, ((KeyValue) cachedView.get("key", this.attr).get()).getValue());
    }

    @Test(timeout = 10000)
    public void testCachedViewReload() throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CachedView cachedView = (CachedView) createAccessor.getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        AtomicReference atomicReference = new AtomicReference(new CountDownLatch(2));
        Arrays.asList(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3})).forEach(streamElement -> {
            m3newWriter.write(streamElement, (z, th) -> {
                Assert.assertTrue(z);
                ((CountDownLatch) atomicReference.get()).countDown();
            });
        });
        ((CountDownLatch) atomicReference.get()).await();
        atomicReference.set(new CountDownLatch(1));
        cachedView.assign((Collection) IntStream.range(1, 2).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertFalse(cachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(cachedView.get("key1", this.attr).isPresent());
        m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2, 3}), (z, th) -> {
            Assert.assertTrue(z);
            ((CountDownLatch) atomicReference.get()).countDown();
        });
        ((CountDownLatch) atomicReference.get()).await();
        TimeUnit.SECONDS.sleep(1L);
        cachedView.assign((Collection) IntStream.range(1, 3).mapToObj(i2 -> {
            return () -> {
                return i2;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(cachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(cachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWrite() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        List asList = Arrays.asList(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        asList.forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertTrue(cachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(cachedView.get("key1", this.attr).isPresent());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(cachedView.get("key2", this.attr).isPresent());
        Assert.assertTrue(cachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndDelete() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.delete(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        asList.forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertFalse(cachedView.get("key1", this.attr).isPresent());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertFalse(cachedView.get("key1", this.attr).isPresent());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndDeleteWildcard() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis - 500, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis + 500, new byte[]{2, 3}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 500, new byte[]{3, 4})}).forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        Assert.assertTrue(cachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(cachedView.get("key1", "wildcard.2", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(cachedView.get("key1", "wildcard.3", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertArrayEquals(new byte[]{2, 3}, ((KeyValue) cachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).get()).getValue());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertTrue(cachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(cachedView.get("key1", "wildcard.2", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertFalse(cachedView.get("key1", "wildcard.3", this.attrWildcard, currentTimeMillis + 500).isPresent());
        Assert.assertArrayEquals(new byte[]{2, 3}, ((KeyValue) cachedView.get("key1", "wildcard.1", this.attrWildcard, currentTimeMillis + 500).get()).getValue());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndList() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis - 500), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 499, new byte[]{3, 4})}).forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: ", z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        ArrayList arrayList = new ArrayList();
        AttributeDescriptorBase<byte[]> attributeDescriptorBase = this.attrWildcard;
        Objects.requireNonNull(arrayList);
        cachedView.scanWildcard("key1", attributeDescriptorBase, (v1) -> {
            r3.add(v1);
        });
        Assert.assertEquals(2L, arrayList.size());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        arrayList.clear();
        AttributeDescriptorBase<byte[]> attributeDescriptorBase2 = this.attrWildcard;
        Objects.requireNonNull(arrayList);
        cachedView.scanWildcard("key1", attributeDescriptorBase2, (v1) -> {
            r3.add(v1);
        });
        Assert.assertEquals(2L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testCachedViewWriteAndListAll() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Stream.of((Object[]) new StreamElement[]{StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis - 2000, new byte[]{0}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis - 1000, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis - 500), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.3", currentTimeMillis - 499, new byte[]{3, 4})}).forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        cachedView.scanWildcardAll("key1", (v1) -> {
            r2.add(v1);
        });
        Assert.assertEquals(3L, arrayList.size());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        arrayList.clear();
        Objects.requireNonNull(arrayList);
        cachedView.scanWildcardAll("key1", (v1) -> {
            r2.add(v1);
        });
        Assert.assertEquals(3L, arrayList.size());
    }

    @Test(timeout = 10000)
    public void testCachedViewWritePreUpdate() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, FirstBytePartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        List asList = Arrays.asList(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key2", this.attr.getName(), System.currentTimeMillis(), new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        AtomicInteger atomicInteger = new AtomicInteger();
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()), (streamElement2, pair) -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(2L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testCachedViewWritePreUpdateAndDeleteWildcard() throws InterruptedException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, KeyPartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.1", currentTimeMillis, new byte[]{1, 2}), StreamElement.deleteWildcard(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", currentTimeMillis + 1000), StreamElement.upsert(this.entity, this.attrWildcard, UUID.randomUUID().toString(), "key1", "wildcard.2", currentTimeMillis + 500, new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Ex1ception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        AtomicInteger atomicInteger = new AtomicInteger();
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()), (streamElement2, pair) -> {
            atomicInteger.incrementAndGet();
        });
        Assert.assertEquals(3L, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testRewriteAndPrefetch() throws InterruptedException, IOException {
        CachedView cachedView = (CachedView) this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(3, KeyPartitioner.class)).getCachedView(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing cached view");
        });
        long currentTimeMillis = System.currentTimeMillis();
        List asList = Arrays.asList(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{1, 2}), StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{2, 3}));
        CountDownLatch countDownLatch = new CountDownLatch(asList.size());
        asList.forEach(streamElement -> {
            cachedView.write(streamElement, (z, th) -> {
                Assert.assertTrue("Exception: " + th, z);
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i -> {
            return () -> {
                return i;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{2, 3}, ((KeyValue) cachedView.get("key1", this.attr).get()).getValue());
        cachedView.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), currentTimeMillis, new byte[]{3, 4}), (z, th) -> {
            Assert.assertTrue(z);
        });
        Assert.assertArrayEquals(new byte[]{3, 4}, ((KeyValue) cachedView.get("key1", this.attr).get()).getValue());
        cachedView.close();
        Assert.assertFalse(cachedView.get("key1", this.attr).isPresent());
        cachedView.assign((Collection) IntStream.range(0, 3).mapToObj(i2 -> {
            return () -> {
                return i2;
            };
        }).collect(Collectors.toList()));
        Assert.assertArrayEquals(new byte[]{3, 4}, ((KeyValue) cachedView.get("key1", this.attr).get()).getValue());
    }

    @Test(timeout = 5000)
    public void testMaxBytesPerSec() throws InterruptedException {
        long testSequentialConsumption = testSequentialConsumption(3L);
        long nanos = TimeUnit.MILLISECONDS.toNanos(500L);
        Assert.assertTrue(String.format("maxLatency should be greater than %d, got %d", Long.valueOf(nanos), Long.valueOf(testSequentialConsumption)), testSequentialConsumption > nanos);
    }

    @Test(timeout = 5000)
    public void testNoMaxBytesPerSec() throws InterruptedException {
        Assert.assertTrue(testSequentialConsumption(Long.MAX_VALUE) < 500000000);
    }

    @Test(timeout = 10000)
    public void testCustomElementSerializer() throws InterruptedException {
        this.kafka = new LocalKafkaCommitLogDescriptor(accessor -> {
            return new LocalKafkaCommitLogDescriptor.Accessor(accessor, Collections.singletonMap("serializer-class", SingleAttrSerializer.class.getName()));
        });
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg(1));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        KafkaConsumer create = createAccessor.createConsumerFactory().create();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        m3newWriter.write(StreamElement.upsert(this.entity, this.strAttr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis(), "this is test".getBytes()), (z, th) -> {
            Assert.assertTrue(z);
            Assert.assertNull(th);
            countDownLatch.countDown();
        });
        countDownLatch.await();
        ConsumerRecords poll = create.poll(Duration.ofMillis(1000L));
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(1L, poll.partitions().size());
        TopicPartition topicPartition = (TopicPartition) Iterators.getOnlyElement(poll.partitions().iterator());
        Assert.assertEquals(0L, topicPartition.partition());
        Assert.assertEquals("topic", topicPartition.topic());
        int i = 0;
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Assert.assertEquals("key", consumerRecord.key());
            Assert.assertEquals("topic", consumerRecord.topic());
            Assert.assertEquals("this is test", consumerRecord.value());
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 10000)
    public void testCustomWatermarkEstimator() throws InterruptedException {
        Map<String, Object> partitionsCfg = partitionsCfg(3);
        partitionsCfg.put("watermark.estimator-factory", FixedWatermarkEstimatorFactory.class.getName());
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, partitionsCfg);
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        long currentTimeMillis = System.currentTimeMillis();
        UnaryFunction unaryFunction = num -> {
            return StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key" + num, this.attr.getName(), currentTimeMillis + num.intValue(), new byte[]{1, 2});
        };
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.19
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicLong.set(onNextContext.getWatermark());
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).waitUntilReady();
        for (int i = 0; i < 100; i++) {
            m3newWriter.write((StreamElement) unaryFunction.apply(Integer.valueOf(i)), (z, th) -> {
            });
        }
        countDownLatch.await();
        Assert.assertEquals(333L, atomicLong.get());
    }

    @Test(timeout = 10000)
    public void testCustomIdlePolicy() throws InterruptedException {
        Map<String, Object> and = and(partitionsCfg(3), cfg(Pair.of("poll.allowed-empty-before-watermark-move", "1000")));
        and.put("watermark.idle-policy-factory", FixedWatermarkIdlePolicyFactory.class.getName());
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, and);
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing commit log reader");
        });
        StreamElement upsert = StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key", this.attr.getName(), System.currentTimeMillis() + 2000, new byte[]{1, 2});
        final AtomicLong atomicLong = new AtomicLong();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        commitLogReader.observe("test", Position.NEWEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.20
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                atomicLong.set(onNextContext.getWatermark());
                countDownLatch.countDown();
                return true;
            }

            public void onCompleted() {
                Assert.fail("This should not be called");
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        }).waitUntilReady();
        m3newWriter.write(upsert, (z, th) -> {
        });
        TimeUnit.SECONDS.sleep(2L);
        m3newWriter.write(upsert, (z2, th2) -> {
        });
        countDownLatch.await();
        Assert.assertEquals(555L, atomicLong.get());
    }

    private long testSequentialConsumption(long j) throws InterruptedException {
        LocalKafkaCommitLogDescriptor.Accessor createAccessor = this.kafka.createAccessor(this.direct, this.entity, this.storageUri, cfg(Pair.of("assignment-timeout-ms", 1L), Pair.of("bytes-per-sec-max", Long.valueOf(j)), Pair.of("kafka.max.poll.records", 1)));
        LocalKafkaCommitLogDescriptor.LocalKafkaWriter m3newWriter = createAccessor.m3newWriter();
        CommitLogReader commitLogReader = (CommitLogReader) createAccessor.getCommitLogReader(context()).orElseThrow(() -> {
            return new IllegalStateException("Missing log reader");
        });
        final AtomicLong atomicLong = new AtomicLong(Long.MIN_VALUE);
        final AtomicLong atomicLong2 = new AtomicLong(0L);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        commitLogReader.observe("dummy", Position.OLDEST, new LogObserver() { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptorTest.21
            public boolean onNext(StreamElement streamElement, LogObserver.OnNextContext onNextContext) {
                long nanoTime = System.nanoTime();
                long andSet = atomicLong.getAndSet(nanoTime);
                if (andSet > 0) {
                    long j2 = nanoTime - andSet;
                    atomicLong2.getAndUpdate(j3 -> {
                        return Math.max(j3, j2);
                    });
                }
                countDownLatch.countDown();
                return true;
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        });
        for (int i = 0; i < 2; i++) {
            m3newWriter.write(StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key1", this.attr.getName(), System.currentTimeMillis(), emptyValue()), (z, th) -> {
                Assert.assertTrue(z);
                Assert.assertNull(th);
            });
        }
        countDownLatch.await();
        return atomicLong2.get();
    }

    private Context context() {
        return this.direct.getContext();
    }

    private static Map<String, Object> partitionsCfg(int i) {
        return partitionsCfg(i, null);
    }

    private static Map<String, Object> partitionsCfg(int i, @Nullable Class<? extends Partitioner> cls) {
        Pair[] pairArr = new Pair[2];
        pairArr[0] = Pair.of(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, String.valueOf(i));
        pairArr[1] = cls != null ? Pair.of("partitioner", cls.getName()) : null;
        return cfg(pairArr);
    }

    @SafeVarargs
    private static Map<String, Object> cfg(Pair<String, Object>... pairArr) {
        return (Map) Arrays.stream(pairArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getFirst();
        }, (v0) -> {
            return v0.getSecond();
        }));
    }

    private static Map<String, Object> and(Map<String, Object> map, Map<String, Object> map2) {
        return (Map) Stream.concat(map.entrySet().stream(), map2.entrySet().stream()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private static byte[] emptyValue() {
        return new byte[0];
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2088638189:
                if (implMethodName.equals("lambda$testCachedViewWrite$68716b58$1")) {
                    z = 16;
                    break;
                }
                break;
            case -1953011076:
                if (implMethodName.equals("lambda$testCustomWatermarkEstimator$e60dd9d1$1")) {
                    z = 10;
                    break;
                }
                break;
            case -1868602320:
                if (implMethodName.equals("lambda$testPollAfterWrite$89f1a89$1")) {
                    z = 14;
                    break;
                }
                break;
            case -1538853141:
                if (implMethodName.equals("lambda$testPollWithSeek$89f1a89$1")) {
                    z = 12;
                    break;
                }
                break;
            case -1531087543:
                if (implMethodName.equals("lambda$testCachedViewWriteAndDeleteWildcard$68716b58$1")) {
                    z = 18;
                    break;
                }
                break;
            case -1229009575:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdate$68716b58$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1087973785:
                if (implMethodName.equals("lambda$testCachedViewWriteAndListAll$68716b58$1")) {
                    z = true;
                    break;
                }
                break;
            case -1054797498:
                if (implMethodName.equals("lambda$testCachedView$68716b58$1")) {
                    z = 3;
                    break;
                }
                break;
            case -659134205:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdateAndDeleteWildcard$306fe2c4$1")) {
                    z = 20;
                    break;
                }
                break;
            case -243693008:
                if (implMethodName.equals("lambda$testCachedViewWriteAndList$68716b58$1")) {
                    z = 7;
                    break;
                }
                break;
            case 96417:
                if (implMethodName.equals("add")) {
                    z = false;
                    break;
                }
                break;
            case 218366049:
                if (implMethodName.equals("lambda$testManualPartitionAssignment$89f1a89$1")) {
                    z = 2;
                    break;
                }
                break;
            case 224714349:
                if (implMethodName.equals("lambda$testCachedViewReload$68716b58$1")) {
                    z = 6;
                    break;
                }
                break;
            case 224714350:
                if (implMethodName.equals("lambda$testCachedViewReload$68716b58$2")) {
                    z = 8;
                    break;
                }
                break;
            case 288116889:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdate$306fe2c4$1")) {
                    z = 19;
                    break;
                }
                break;
            case 566474891:
                if (implMethodName.equals("lambda$testRewriteAndPrefetch$68716b58$1")) {
                    z = 5;
                    break;
                }
                break;
            case 566474892:
                if (implMethodName.equals("lambda$testRewriteAndPrefetch$68716b58$2")) {
                    z = 15;
                    break;
                }
                break;
            case 1229310441:
                if (implMethodName.equals("lambda$testObserveMovesWatermark$e60dd9d1$1")) {
                    z = 11;
                    break;
                }
                break;
            case 1509115140:
                if (implMethodName.equals("lambda$new$bdaa087c$1")) {
                    z = 21;
                    break;
                }
                break;
            case 1926104707:
                if (implMethodName.equals("lambda$testCachedViewWriteAndDelete$68716b58$1")) {
                    z = 13;
                    break;
                }
                break;
            case 2097764861:
                if (implMethodName.equals("lambda$new$60305140$1")) {
                    z = 17;
                    break;
                }
                break;
            case 2118706627:
                if (implMethodName.equals("lambda$testCachedViewWritePreUpdateAndDeleteWildcard$68716b58$1")) {
                    z = 9;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list3 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/List") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    List list4 = (List) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        r0.add(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue3 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue4 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue5 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue5;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue6 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue6;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue7 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue7;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue8 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue8;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/Integer;)Lcz/o2/proxima/storage/StreamElement;")) {
                    LocalKafkaCommitLogDescriptorTest localKafkaCommitLogDescriptorTest = (LocalKafkaCommitLogDescriptorTest) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return num -> {
                        return StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key" + num, this.attr.getName(), longValue + num.intValue(), new byte[]{1, 2});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(JLjava/lang/Integer;)Lcz/o2/proxima/storage/StreamElement;")) {
                    LocalKafkaCommitLogDescriptorTest localKafkaCommitLogDescriptorTest2 = (LocalKafkaCommitLogDescriptorTest) serializedLambda.getCapturedArg(0);
                    long longValue2 = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return num2 -> {
                        return StreamElement.upsert(this.entity, this.attr, UUID.randomUUID().toString(), "key" + num2, this.attr.getName(), longValue2 + num2.intValue(), new byte[]{1, 2});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue9 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue9;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return () -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue10 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue10;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue11 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue11;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Consumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/core/DirectDataOperator;)V")) {
                    LocalKafkaCommitLogDescriptorTest localKafkaCommitLogDescriptorTest3 = (LocalKafkaCommitLogDescriptorTest) serializedLambda.getCapturedArg(0);
                    return directDataOperator -> {
                        directDataOperator.withExecutorFactory(this.serviceFactory);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(I)I")) {
                    int intValue12 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return () -> {
                        return intValue12;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/util/Pair;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (streamElement2, pair) -> {
                        atomicInteger.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/BiConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/util/Pair;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (streamElement22, pair2) -> {
                        atomicInteger2.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptorTest") && serializedLambda.getImplMethodSignature().equals("()Ljava/util/concurrent/ExecutorService;")) {
                    return () -> {
                        return Executors.newCachedThreadPool(runnable -> {
                            Thread thread = new Thread(runnable);
                            thread.setUncaughtExceptionHandler((thread2, th) -> {
                                th.printStackTrace(System.err);
                            });
                            return thread;
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
