package cz.o2.proxima.direct.kafka;

import com.google.auto.service.AutoService;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessorFactory;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.Node;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.record.TimestampType;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.storage.internal.AbstractDataAccessorFactory;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.Pair;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import lombok.Generated;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({DataAccessorFactory.class})
/* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor.class */
public class LocalKafkaCommitLogDescriptor implements DataAccessorFactory {
    private static final long serialVersionUID = 1;
    public static final String CFG_NUM_PARTITIONS = "local-kafka-num-partitions";
    public static final String CFG_RETENTION = "retention-elements";
    final String id;
    private final Function<Accessor, Accessor> accessorModifier;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(LocalKafkaCommitLogDescriptor.class);
    private static final Map<String, Map<URI, Accessor>> ACCESSORS = new ConcurrentHashMap();

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$Accessor.class */
    public static class Accessor extends KafkaAccessor {
        private static final long serialVersionUID = 1;
        String descriptorId;
        int numPartitions;
        long perPartitionRetention;
        transient Map<String, ConsumerGroup> consumerGroups;
        transient List<List<StreamElement>> written;
        transient Map<ConsumerId, Map<Integer, Integer>> consumerOffsets;
        transient Map<Pair<String, Integer>, AtomicInteger> committedOffsets;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Accessor(Accessor accessor, Map<String, Object> map) {
            super(accessor.getEntityDescriptor(), accessor.getUri(), map);
            this.numPartitions = 1;
            this.perPartitionRetention = -1L;
            this.descriptorId = accessor.descriptorId;
            this.numPartitions = accessor.numPartitions;
            this.consumerGroups = accessor.consumerGroups;
            this.written = accessor.written;
            this.consumerOffsets = accessor.consumerOffsets;
            this.committedOffsets = accessor.committedOffsets;
            configure(accessor.getUri(), map);
        }

        public Accessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map, String str) {
            super(entityDescriptor, uri, map);
            this.numPartitions = 1;
            this.perPartitionRetention = -1L;
            this.descriptorId = str;
            this.consumerOffsets = new ConcurrentHashMap();
            this.written = Collections.synchronizedList(new ArrayList());
            this.consumerGroups = new ConcurrentHashMap();
            this.committedOffsets = Collections.synchronizedMap(new HashMap());
            configure(uri, map);
        }

        public boolean isAcceptable(AttributeFamilyDescriptor attributeFamilyDescriptor) {
            return true;
        }

        private void configure(URI uri, Map<String, Object> map) {
            this.numPartitions = ((Integer) Optional.ofNullable(map.get(LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS)).map(obj -> {
                return Integer.valueOf(obj.toString());
            }).orElse(Integer.valueOf(this.numPartitions))).intValue();
            for (int i = 0; i < this.numPartitions; i++) {
                this.written.add(Collections.synchronizedList(new ArrayList()));
            }
            this.serializerClass = (Class) Optional.ofNullable(map.get("serializer-class")).map((v0) -> {
                return v0.toString();
            }).map(str -> {
                return Classpath.findClass(str, ElementSerializer.class);
            }).orElse(this.serializerClass);
            this.perPartitionRetention = ((Long) Optional.ofNullable(map.get(LocalKafkaCommitLogDescriptor.CFG_RETENTION)).map(obj2 -> {
                return Long.valueOf(obj2.toString());
            }).orElse(Long.valueOf(this.perPartitionRetention))).longValue();
            LocalKafkaCommitLogDescriptor.log.info("Created accessor with URI {} and {} partitions", uri, Integer.valueOf(this.numPartitions));
        }

        public <K, V> KafkaConsumerFactory<K, V> createConsumerFactory() {
            final ElementSerializer serializer = getSerializer();
            return new KafkaConsumerFactory<K, V>(getUri(), new Properties(), serializer.keySerde(), serializer.valueSerde()) { // from class: cz.o2.proxima.direct.kafka.LocalKafkaCommitLogDescriptor.Accessor.1
                public KafkaConsumer<K, V> create() {
                    return create(Position.NEWEST, allPartitions());
                }

                public KafkaConsumer<K, V> create(Position position, Collection<Partition> collection) {
                    String str = "unnamed-consumer-" + UUID.randomUUID();
                    return Accessor.this.mockKafkaConsumer(str, new ConsumerGroup(str, Accessor.this.getTopic(), Accessor.this.numPartitions, false), serializer, collection, null);
                }

                public KafkaConsumer<K, V> create(String str) {
                    return create(str, Position.NEWEST, null);
                }

                public KafkaConsumer<K, V> create(String str, Position position, @Nullable ConsumerRebalanceListener consumerRebalanceListener) {
                    KafkaConsumer<K, V> mockKafkaConsumer;
                    synchronized (LocalKafkaCommitLogDescriptor.class) {
                        ConsumerGroup consumerGroup = Accessor.this.consumerGroups.get(str);
                        if (consumerGroup == null) {
                            consumerGroup = new ConsumerGroup(str, Accessor.this.getTopic(), Accessor.this.numPartitions, true);
                            Accessor.this.consumerGroups.put(str, consumerGroup);
                        }
                        mockKafkaConsumer = Accessor.this.mockKafkaConsumer(str, consumerGroup, serializer, null, consumerRebalanceListener);
                    }
                    return mockKafkaConsumer;
                }

                private List<Partition> allPartitions() {
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < Accessor.this.numPartitions; i++) {
                        arrayList.add(new PartitionWithTopic(Accessor.this.getTopic(), i));
                    }
                    return arrayList;
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Multi-variable type inference failed */
        public <K, V> KafkaConsumer<K, V> mockKafkaConsumer(String str, ConsumerGroup consumerGroup, ElementSerializer<K, V> elementSerializer, @Nullable Collection<Partition> collection, @Nullable ConsumerRebalanceListener consumerRebalanceListener) {
            LocalKafkaCommitLogDescriptor.log.info("Creating mock kafka consumer name {}, with committed offsets {}", str, this.committedOffsets);
            KafkaConsumer<K, V> kafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
            int add = collection != null ? consumerGroup.add(collection) : consumerGroup.add(consumerRebalanceListener);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            ConsumerId of = ConsumerId.of(str, add);
            this.consumerOffsets.put(of, consumerGroup.getAssignment(of.getId()).stream().map(partition -> {
                int committedOffset = getCommittedOffset(str, partition.getId());
                return Pair.of(Integer.valueOf(partition.getId()), Integer.valueOf(committedOffset >= 0 ? committedOffset : this.written.get(partition.getId()).size()));
            }).collect(Collectors.toConcurrentMap((v0) -> {
                return v0.getFirst();
            }, (v0) -> {
                return v0.getSecond();
            })));
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock -> {
                atomicBoolean.set(true);
                return pollConsumer(consumerGroup, Math.min(200L, ((Duration) invocationOnMock.getArguments()[0]).toMillis()), of, elementSerializer, consumerRebalanceListener);
            }).when(kafkaConsumer)).poll((Duration) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock2 -> {
                return getEndOffsets(str, (Collection) invocationOnMock2.getArguments()[0]);
            }).when(kafkaConsumer)).endOffsets((Collection) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock3 -> {
                Collection collection2 = (Collection) invocationOnMock3.getArguments()[0];
                Map map = (Map) collection2.stream().collect(Collectors.toMap(topicPartition -> {
                    return topicPartition;
                }, topicPartition2 -> {
                    return 0L;
                }));
                LocalKafkaCommitLogDescriptor.log.debug("Consumer {} beginningOffsets {}: {}", new Object[]{str, collection2, map});
                return map;
            }).when(kafkaConsumer)).beginningOffsets((Collection) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock4 -> {
                seekConsumerToBeginning(of, (Collection) invocationOnMock4.getArguments()[0]);
                atomicBoolean.set(true);
                return null;
            }).when(kafkaConsumer)).seekToBeginning((Collection) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock5 -> {
                TopicPartition topicPartition = (TopicPartition) invocationOnMock5.getArguments()[0];
                seekConsumerTo(of, topicPartition.partition(), ((Long) invocationOnMock5.getArguments()[1]).longValue());
                atomicBoolean.set(true);
                return null;
            }).when(kafkaConsumer)).seek((TopicPartition) Mockito.any(), Mockito.anyLong());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock6 -> {
                commitConsumer(str, (Map) invocationOnMock6.getArguments()[0]);
                return null;
            }).when(kafkaConsumer)).commitSync((Map) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock7 -> {
                return ((Set) invocationOnMock7.getArguments()[0]).stream().map(topicPartition -> {
                    int committedOffset = getCommittedOffset(str, topicPartition.partition());
                    return committedOffset >= 0 ? Pair.of(topicPartition, new OffsetAndMetadata(committedOffset)) : Pair.of(topicPartition, (Object) null);
                }).filter(pair -> {
                    return pair.getSecond() != null;
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getFirst();
                }, (v0) -> {
                    return v0.getSecond();
                }));
            }).when(kafkaConsumer)).committed((Set) Mockito.any());
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock8 -> {
                return atomicBoolean.get() ? consumerGroup.getAssignment(of.getId()).stream().map(partition2 -> {
                    return new TopicPartition(getTopic(), partition2.getId());
                }).collect(Collectors.toSet()) : Collections.emptySet();
            }).when(kafkaConsumer)).assignment();
            Mockito.when(kafkaConsumer.partitionsFor((String) Mockito.eq(consumerGroup.getTopic()))).thenReturn(IntStream.range(0, consumerGroup.getNumPartitions()).mapToObj(i -> {
                return new PartitionInfo(consumerGroup.getTopic(), i, (Node) null, (Node[]) null, (Node[]) null);
            }).collect(Collectors.toList()));
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock9 -> {
                consumerGroup.remove(of.getId());
                return null;
            }).when(kafkaConsumer)).close();
            ((KafkaConsumer) Mockito.doAnswer(invocationOnMock10 -> {
                return Long.valueOf(((Integer) Optional.ofNullable(this.consumerOffsets.get(of).get(Integer.valueOf(((TopicPartition) invocationOnMock10.getArguments()[0]).partition()))).orElse(-1)).intValue());
            }).when(kafkaConsumer)).position((TopicPartition) Mockito.any());
            return kafkaConsumer;
        }

        private int getCommittedOffset(String str, int i) {
            AtomicInteger atomicInteger = this.committedOffsets.get(Pair.of(str, Integer.valueOf(i)));
            if (atomicInteger != null) {
                return atomicInteger.get();
            }
            return -1;
        }

        private void commitConsumer(String str, Map<TopicPartition, OffsetAndMetadata> map) {
            synchronized (this.committedOffsets) {
                map.forEach((topicPartition, offsetAndMetadata) -> {
                    int partition = topicPartition.partition();
                    long offset = offsetAndMetadata.offset();
                    this.committedOffsets.compute(Pair.of(str, Integer.valueOf(partition)), (pair, atomicInteger) -> {
                        if (atomicInteger == null) {
                            return new AtomicInteger((int) offset);
                        }
                        atomicInteger.set((int) offset);
                        return atomicInteger;
                    });
                });
            }
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} committed offsets {}, offsets now {}", new Object[]{str, map, this.committedOffsets});
        }

        private void seekConsumerTo(ConsumerId consumerId, int i, long j) {
            Preconditions.checkArgument(j >= 0, "Cannot seek to negative offset %s", j);
            this.consumerOffsets.get(consumerId).put(Integer.valueOf(i), Integer.valueOf((int) j));
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} seeked to offset {} in partition {}", new Object[]{consumerId, Long.valueOf(j), Integer.valueOf(i)});
        }

        private void seekConsumerToBeginning(ConsumerId consumerId, Collection<TopicPartition> collection) {
            this.consumerOffsets.compute(consumerId, (consumerId2, map) -> {
                collection.forEach(topicPartition -> {
                });
                return map;
            });
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} seeked to beginning of {}", consumerId.getName(), collection);
        }

        private Map<TopicPartition, Long> getEndOffsets(String str, Collection<TopicPartition> collection) {
            HashMap hashMap = new HashMap();
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                hashMap.put(new TopicPartition(getTopic(), it.next().partition()), Long.valueOf(this.written.get(r0.partition()).size()));
            }
            LocalKafkaCommitLogDescriptor.log.debug("Consumer {} endOffsets {}: {}", new Object[]{str, collection, hashMap});
            return hashMap;
        }

        private <K, V> ConsumerRecords<K, V> pollConsumer(ConsumerGroup consumerGroup, long j, ConsumerId consumerId, ElementSerializer<K, V> elementSerializer, @Nullable ConsumerRebalanceListener consumerRebalanceListener) throws InterruptedException {
            ConsumerRecords<K, V> consumerRecords;
            synchronized (consumerId) {
                String name = consumerId.getName();
                if (!consumerId.isAssigned()) {
                    LocalKafkaCommitLogDescriptor.log.debug("Initializing consumer {} after first time poll with listener {}", name, consumerRebalanceListener);
                    if (!consumerGroup.rebalanceIfNeeded() && consumerRebalanceListener != null) {
                        consumerRebalanceListener.onPartitionsAssigned((Collection) consumerGroup.getAssignment(consumerId.getId()).stream().map(partition -> {
                            return new TopicPartition(getTopic(), partition.getId());
                        }).collect(Collectors.toList()));
                    }
                    consumerId.setAssigned(true);
                }
                LocalKafkaCommitLogDescriptor.log.debug("Sleeping {} ms before attempting to poll", Long.valueOf(j));
                Thread.sleep(j);
                HashMap hashMap = new HashMap();
                ArrayList<Partition> newArrayList = Lists.newArrayList(consumerGroup.getAssignment(consumerId.getId()));
                Map<Integer, Integer> map = this.consumerOffsets.get(consumerId);
                if (LocalKafkaCommitLogDescriptor.log.isDebugEnabled()) {
                    LocalKafkaCommitLogDescriptor.log.debug("Polling consumerId {}.{} with assignment {} and offsets {}", new Object[]{this.descriptorId, consumerId, newArrayList.stream().map((v0) -> {
                        return v0.getId();
                    }).collect(Collectors.toList()), map});
                }
                int maxPollRecords = getMaxPollRecords();
                for (Partition partition2 : newArrayList) {
                    int id = partition2.getId();
                    if (id < this.written.size()) {
                        List<StreamElement> list = this.written.get(id);
                        int size = list.size();
                        ArrayList arrayList = new ArrayList();
                        int intValue = ((Integer) Optional.ofNullable(map.get(Integer.valueOf(id))).orElse(Integer.valueOf(getCommittedOffset(name, partition2.getId())))).intValue();
                        LocalKafkaCommitLogDescriptor.log.trace("Partition {} has last {}, reading from {}", new Object[]{Integer.valueOf(id), Integer.valueOf(size), Integer.valueOf(intValue)});
                        while (intValue < size) {
                            int i = maxPollRecords;
                            maxPollRecords--;
                            if (i <= 0) {
                                break;
                            }
                            if (intValue >= 0) {
                                arrayList.add(toConsumerRecord(list.get(intValue), elementSerializer, partition2.getId(), intValue));
                            }
                            intValue++;
                        }
                        map.put(Integer.valueOf(id), Integer.valueOf(intValue));
                        LocalKafkaCommitLogDescriptor.log.trace("Advanced offset of consumer ID {} on partition {} to {}", new Object[]{consumerId, Integer.valueOf(id), Integer.valueOf(intValue)});
                        if (!arrayList.isEmpty()) {
                            hashMap.put(new TopicPartition(getTopic(), id), arrayList);
                        }
                    }
                }
                LocalKafkaCommitLogDescriptor.log.debug("Consumer {} id {} polled records {}", new Object[]{name, consumerId, hashMap});
                consumerRecords = new ConsumerRecords<>(hashMap);
            }
            return consumerRecords;
        }

        private <K, V> ConsumerRecord<K, V> toConsumerRecord(StreamElement streamElement, ElementSerializer<K, V> elementSerializer, int i, int i2) {
            ProducerRecord write = elementSerializer.write(getTopic(), i, streamElement);
            return new ConsumerRecord<>(getTopic(), i, i2, streamElement.getStamp(), TimestampType.CREATE_TIME, 0L, elementSerializer.keySerde().serializer().serialize(getTopic(), write.key()).length, streamElement.isDelete() ? 0 : elementSerializer.valueSerde().serializer().serialize(getTopic(), write.value()).length, write.key(), write.value(), write.headers());
        }

        public boolean allConsumed(List<Integer> list) {
            return ((Boolean) this.consumerGroups.keySet().stream().map(str -> {
                int i = 0;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    int i2 = i;
                    if (this.consumerOffsets.get(ConsumerId.of(str, i2)).entrySet().stream().filter(entry -> {
                        return ((Integer) entry.getKey()).intValue() == i2;
                    }).anyMatch(entry2 -> {
                        return ((Integer) entry2.getValue()).intValue() < intValue;
                    })) {
                        return false;
                    }
                    i++;
                }
                return true;
            }).reduce(true, (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            })).booleanValue();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: newWriter, reason: merged with bridge method [inline-methods] */
        public LocalKafkaWriter<?, ?> m3newWriter() {
            return new LocalKafkaWriter<>(this, this.numPartitions, this.descriptorId);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: newReader, reason: merged with bridge method [inline-methods] */
        public LocalKafkaLogReader m2newReader(Context context) {
            return new LocalKafkaLogReader(this, context);
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            objectOutputStream.defaultWriteObject();
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            objectInputStream.defaultReadObject();
            Accessor accessor = (Accessor) ((Map) LocalKafkaCommitLogDescriptor.ACCESSORS.get(this.descriptorId)).get(getUri());
            this.committedOffsets = accessor.committedOffsets;
            this.consumerGroups = accessor.consumerGroups;
            this.consumerOffsets = accessor.consumerOffsets;
            this.written = accessor.written;
        }

        public void clear() {
            this.consumerOffsets.clear();
            this.written.clear();
            this.consumerGroups.clear();
            this.committedOffsets.clear();
        }

        public int getPerPartitionElementsRetention() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$ConsumerId.class */
    public static class ConsumerId {
        final String name;
        final int id;
        boolean assigned = false;

        static ConsumerId of(String str, int i) {
            return new ConsumerId(str, i);
        }

        private ConsumerId(String str, int i) {
            this.name = str;
            this.id = i;
        }

        public int hashCode() {
            return Objects.hash(this.name, Integer.valueOf(this.id));
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof ConsumerId)) {
                return false;
            }
            ConsumerId consumerId = (ConsumerId) obj;
            return this.id == consumerId.id && Objects.equals(this.name, consumerId.name);
        }

        public String toString() {
            return "ConsumerId(" + this.name + ", " + this.id + ")";
        }

        @Generated
        public String getName() {
            return this.name;
        }

        @Generated
        public int getId() {
            return this.id;
        }

        @Generated
        public boolean isAssigned() {
            return this.assigned;
        }

        @Generated
        public void setAssigned(boolean z) {
            this.assigned = z;
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaLogReader.class */
    public static class LocalKafkaLogReader extends KafkaLogReader {

        @Generated
        private static final Logger log = LoggerFactory.getLogger(LocalKafkaLogReader.class);
        private KafkaConsumer<Object, Object> consumer;

        public LocalKafkaLogReader(KafkaAccessor kafkaAccessor, Context context) {
            super(kafkaAccessor, context);
            this.consumer = null;
        }

        public ObserveHandle observePartitions(String str, Collection<Partition> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
            ObserveHandle observePartitions = super.observePartitions(str, collection, position, z, commitLogObserver);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog URI {}", collection, getUri());
            return observePartitions;
        }

        public ObserveHandle observe(String str, Position position, CommitLogObserver commitLogObserver) {
            ObserveHandle observe = super.observe(str, position, commitLogObserver);
            log.debug("Started to observe LocalKafkaCommitLog with URI {} by consumer {}", getUri(), str);
            return observe;
        }

        ObserveHandle observeKafka(@Nullable String str, Collection<Partition> collection, Position position, boolean z, CommitLogObserver commitLogObserver) {
            ObserveHandle observeKafka = super.observeKafka(str, collection, position, z, commitLogObserver);
            log.debug("Started to observe partitions {} of LocalKafkaCommitLog with URI {} by consumer {}", new Object[]{collection, getUri(), str});
            return observeKafka;
        }

        public ObserveHandle observeBulk(String str, Position position, CommitLogObserver commitLogObserver) {
            ObserveHandle observeBulk = super.observeBulk(str, position, commitLogObserver);
            log.debug("Started to bulk observe LocalKafkaCommitLog with URI {} by {}", getUri(), str);
            return observeBulk;
        }

        /* renamed from: getAccessor, reason: merged with bridge method [inline-methods] */
        public Accessor m5getAccessor() {
            return (Accessor) this.accessor;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        public KafkaConsumer<Object, Object> getConsumer() {
            return (KafkaConsumer) Objects.requireNonNull(this.consumer);
        }

        KafkaConsumer<Object, Object> createConsumer(String str, Collection<Offset> collection, ConsumerRebalanceListener consumerRebalanceListener, Position position) {
            KafkaConsumer<Object, Object> createConsumer = super.createConsumer(str, collection, consumerRebalanceListener, position);
            this.consumer = createConsumer;
            return createConsumer;
        }

        public CommitLogReader.Factory<?> asFactory() {
            Accessor m5getAccessor = m5getAccessor();
            Context context = getContext();
            return repository -> {
                return new LocalKafkaLogReader(m5getAccessor, context);
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1663431888:
                    if (implMethodName.equals("lambda$asFactory$5d3aa07e$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/CommitLogReader$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaLogReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/kafka/KafkaAccessor;Lcz/o2/proxima/direct/core/Context;Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/commitlog/CommitLogReader;")) {
                        KafkaAccessor kafkaAccessor = (KafkaAccessor) serializedLambda.getCapturedArg(0);
                        Context context = (Context) serializedLambda.getCapturedArg(1);
                        return repository -> {
                            return new LocalKafkaLogReader(kafkaAccessor, context);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaWriter.class */
    public static class LocalKafkaWriter<K, V> extends KafkaWriter<K, V> {
        private final int numPartitions;
        private final String descriptorId;

        public LocalKafkaWriter(Accessor accessor, int i, String str) {
            super(accessor);
            this.numPartitions = i;
            this.descriptorId = str;
        }

        public void write(StreamElement streamElement, CommitCallback commitCallback) {
            int partitionId = (this.accessor.getPartitioner().getPartitionId(streamElement) & Integer.MAX_VALUE) % this.numPartitions;
            Accessor accessor = (Accessor) this.accessor;
            List<StreamElement> list = accessor.written.get(partitionId);
            list.add(streamElement);
            LocalKafkaCommitLogDescriptor.log.debug("Written data {} to LocalKafkaCommitLog descriptorId {} URI {}, partition {} at offset {}", new Object[]{streamElement, this.descriptorId, getUri(), Integer.valueOf(partitionId), Long.valueOf(list.size() - 1)});
            if (m6getAccessor().getPerPartitionElementsRetention() > 0 && list.size() > m6getAccessor().getPerPartitionElementsRetention()) {
                synchronized (list) {
                    accessor.written.set(partitionId, list.subList(list.size() - m6getAccessor().getPerPartitionElementsRetention(), m6getAccessor().getPerPartitionElementsRetention()));
                }
            }
            commitCallback.commit(true, (Throwable) null);
        }

        /* renamed from: getAccessor, reason: merged with bridge method [inline-methods] */
        public Accessor m6getAccessor() {
            return (Accessor) this.accessor;
        }

        /* renamed from: asFactory, reason: merged with bridge method [inline-methods] */
        public OnlineAttributeWriter.Factory<?> m7asFactory() {
            Accessor m6getAccessor = m6getAccessor();
            int i = this.numPartitions;
            String str = this.descriptorId;
            return repository -> {
                return new LocalKafkaWriter(m6getAccessor, i, str);
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1259578078:
                    if (implMethodName.equals("lambda$asFactory$6f543363$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/OnlineAttributeWriter$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$LocalKafkaWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/kafka/LocalKafkaCommitLogDescriptor$Accessor;ILjava/lang/String;Lcz/o2/proxima/repository/Repository;)Lcz/o2/proxima/direct/core/OnlineAttributeWriter;")) {
                        Accessor accessor = (Accessor) serializedLambda.getCapturedArg(0);
                        int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                        String str = (String) serializedLambda.getCapturedArg(2);
                        return repository -> {
                            return new LocalKafkaWriter(accessor, intValue, str);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public LocalKafkaCommitLogDescriptor() {
        this(Function.identity());
    }

    public LocalKafkaCommitLogDescriptor(Function<Accessor, Accessor> function) {
        this.id = UUID.randomUUID().toString();
        ACCESSORS.put(this.id, Collections.synchronizedMap(new HashMap()));
        this.accessorModifier = function;
    }

    @Override // 
    public Accessor createAccessor(DirectDataOperator directDataOperator, AttributeFamilyDescriptor attributeFamilyDescriptor) {
        return ACCESSORS.get(this.id).computeIfAbsent(attributeFamilyDescriptor.getStorageUri(), uri -> {
            return this.accessorModifier.apply(new Accessor(attributeFamilyDescriptor.getEntity(), uri, attributeFamilyDescriptor.getCfg(), this.id));
        });
    }

    public AbstractDataAccessorFactory.Accept accepts(URI uri) {
        return uri.getScheme().equals("kafka-test") ? AbstractDataAccessorFactory.Accept.ACCEPT : AbstractDataAccessorFactory.Accept.REJECT;
    }
}
