package io.streamnative.pulsar.handlers.kop;

import com.google.common.collect.Lists;
import io.netty.util.Recycler;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/MessageFetchContext.class */
public final class MessageFetchContext {
    private KafkaRequestHandler requestHandler;
    private final Recycler.Handle<MessageFetchContext> recyclerHandle;
    private static final Logger log = LoggerFactory.getLogger(MessageFetchContext.class);
    private static final Recycler<MessageFetchContext> RECYCLER = new Recycler<MessageFetchContext>() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.1
        protected MessageFetchContext newObject(Recycler.Handle<MessageFetchContext> handle) {
            return new MessageFetchContext(handle);
        }

        /* renamed from: newObject, reason: collision with other method in class */
        protected /* bridge */ /* synthetic */ Object m22newObject(Recycler.Handle handle) {
            return newObject((Recycler.Handle<MessageFetchContext>) handle);
        }
    };

    public static MessageFetchContext get(KafkaRequestHandler kafkaRequestHandler) {
        MessageFetchContext messageFetchContext = (MessageFetchContext) RECYCLER.get();
        messageFetchContext.requestHandler = kafkaRequestHandler;
        return messageFetchContext;
    }

    private MessageFetchContext(Recycler.Handle<MessageFetchContext> handle) {
        this.recyclerHandle = handle;
    }

    public void recycle() {
        this.requestHandler = null;
        this.recyclerHandle.recycle(this);
    }

    public CompletableFuture<AbstractResponse> handleFetch(CompletableFuture<AbstractResponse> completableFuture, KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Map map = (Map) kafkaHeaderAndRequest.getRequest().fetchData().entrySet().stream().map(entry -> {
            return Pair.of((TopicPartition) entry.getKey(), this.requestHandler.getTopicManager().getTopicConsumerManager(KopTopic.toString((TopicPartition) entry.getKey())));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        CompletableFuture.allOf((CompletableFuture[]) map.entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r11, th) -> {
            readMessages(kafkaHeaderAndRequest, (Map) map.entrySet().stream().map(entry2 -> {
                try {
                    KafkaTopicConsumerManager kafkaTopicConsumerManager = (KafkaTopicConsumerManager) ((CompletableFuture) entry2.getValue()).get();
                    if (kafkaTopicConsumerManager == null) {
                        this.requestHandler.getTopicManager().getConsumerTopicManagers().remove(KopTopic.toString((TopicPartition) entry2.getKey()));
                        throw new NullPointerException("topic not owned, and return null TCM in fetch.");
                    }
                    long j = ((FetchRequest.PartitionData) kafkaHeaderAndRequest.getRequest().fetchData().get(entry2.getKey())).fetchOffset;
                    if (log.isDebugEnabled()) {
                        log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} - {}.", new Object[]{entry2.getKey(), Long.valueOf(j), MessageIdUtils.getPosition(j)});
                    }
                    Pair<ManagedCursor, Long> remove = kafkaTopicConsumerManager.remove(j);
                    if (remove != null) {
                        return Pair.of((TopicPartition) entry2.getKey(), remove);
                    }
                    log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. Fetch for topic return error.", Long.valueOf(j), entry2.getKey());
                    linkedHashMap.put((TopicPartition) entry2.getKey(), new FetchResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
                    return null;
                } catch (Exception e) {
                    log.warn("Error for get KafkaTopicConsumerManager.", e);
                    linkedHashMap.put((TopicPartition) entry2.getKey(), new FetchResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
                    return null;
                }
            }).filter(pair -> {
                return pair != null;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })), completableFuture, linkedHashMap);
        });
        return completableFuture;
    }

    private void readMessages(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, Map<TopicPartition, Pair<ManagedCursor, Long>> map, CompletableFuture<AbstractResponse> completableFuture, LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> linkedHashMap) {
        readMessagesInternal(kafkaHeaderAndRequest, map, new AtomicInteger(0), new ConcurrentHashMap(), completableFuture, linkedHashMap);
    }

    private void readMessagesInternal(KafkaCommandDecoder.KafkaHeaderAndRequest kafkaHeaderAndRequest, Map<TopicPartition, Pair<ManagedCursor, Long>> map, AtomicInteger atomicInteger, Map<TopicPartition, List<Entry>> map2, CompletableFuture<AbstractResponse> completableFuture, LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> linkedHashMap) {
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Map<TopicPartition, CompletableFuture<List<Entry>>> readAllCursorOnce = readAllCursorOnce(map);
        CompletableFuture.allOf((CompletableFuture[]) readAllCursorOnce.values().stream().toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r19, th) -> {
            readAllCursorOnce.entrySet().parallelStream().forEach(entry -> {
                TopicPartition topicPartition = (TopicPartition) entry.getKey();
                try {
                    List list = (List) ((CompletableFuture) entry.getValue()).get();
                    List list2 = (List) map2.computeIfAbsent(topicPartition, topicPartition2 -> {
                        return Lists.newArrayList();
                    });
                    if (list != null && !list.isEmpty()) {
                        list2.addAll(list);
                        atomicInteger2.addAndGet(list.size());
                        atomicInteger.addAndGet(((Integer) ((Stream) list2.stream().parallel()).map(entry -> {
                            return Integer.valueOf(entry.getLength());
                        }).reduce(0, (v0, v1) -> {
                            return Integer.sum(v0, v1);
                        })).intValue());
                        if (log.isDebugEnabled()) {
                            log.debug("Request {}: For topic {}, entries in list: {}.", new Object[]{kafkaHeaderAndRequest.getHeader(), topicPartition.toString(), Integer.valueOf(list2.size())});
                        }
                    }
                } catch (Exception e) {
                    log.error("Request {}: Failed readEntry.get for topic: {}. ", new Object[]{kafkaHeaderAndRequest.getHeader(), topicPartition, e});
                    this.requestHandler.getTopicManager().getTopicConsumerManager(KopTopic.toString(topicPartition)).thenAccept(kafkaTopicConsumerManager -> {
                        if (kafkaTopicConsumerManager != null) {
                            kafkaTopicConsumerManager.deleteOneCursorAsync((ManagedCursor) ((Pair) map.get(topicPartition)).getLeft(), "cursor.readEntry fail. deleteCursor");
                        } else {
                            this.requestHandler.getTopicManager().getConsumerTopicManagers().remove(KopTopic.toString(topicPartition));
                            log.warn("Cursor deleted while TCM close.");
                        }
                    });
                    map.remove(topicPartition);
                    linkedHashMap.put(topicPartition, new FetchResponse.PartitionData(Errors.NONE, -1L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
                }
            });
            FetchRequest request = kafkaHeaderAndRequest.getRequest();
            int maxBytes = request.maxBytes();
            int minBytes = request.minBytes();
            int maxWait = request.maxWait();
            long currentTimeMillis = maxWait > 0 ? System.currentTimeMillis() + maxWait : maxWait;
            int i2 = atomicInteger.get();
            if (log.isDebugEnabled()) {
                log.debug("Request {}: One round read {} entries, allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}", new Object[]{kafkaHeaderAndRequest.getHeader(), Integer.valueOf(atomicInteger2.get()), Integer.valueOf(i2), Integer.valueOf(maxBytes), Integer.valueOf(minBytes), new Date(currentTimeMillis)});
            }
            if (!(i2 == 0 && atomicInteger2.get() == 0) && ((currentTimeMillis <= 0 || currentTimeMillis > System.currentTimeMillis()) && i2 <= minBytes && i2 <= maxBytes)) {
                if (log.isDebugEnabled()) {
                    log.debug("Request {}: Read time or size not reach, do another round of read before return.", kafkaHeaderAndRequest.getHeader());
                }
                readMessagesInternal(kafkaHeaderAndRequest, map, atomicInteger, map2, completableFuture, linkedHashMap);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug(" Request {}: Complete read {} entries with size {}", new Object[]{kafkaHeaderAndRequest.getHeader(), Integer.valueOf(atomicInteger2.get()), Integer.valueOf(i2)});
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            map2.entrySet().parallelStream().forEach(entry2 -> {
                FetchResponse.PartitionData partitionData;
                TopicPartition topicPartition = (TopicPartition) entry2.getKey();
                List<Entry> list = (List) entry2.getValue();
                Pair pair = (Pair) map.get(topicPartition);
                this.requestHandler.getTopicManager().getTopicConsumerManager(KopTopic.toString(topicPartition)).thenAccept(kafkaTopicConsumerManager -> {
                    if (kafkaTopicConsumerManager != null) {
                        kafkaTopicConsumerManager.add(((Long) pair.getRight()).longValue(), pair);
                    } else {
                        this.requestHandler.getTopicManager().getConsumerTopicManagers().remove(KopTopic.toString(topicPartition));
                        log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM.");
                    }
                });
                if (list.isEmpty()) {
                    partitionData = new FetchResponse.PartitionData(Errors.NONE, -1L, -1L, -1L, (List) null, MemoryRecords.EMPTY);
                } else {
                    atomicBoolean.set(false);
                    Entry entry2 = list.get(list.size() - 1);
                    long offset = MessageIdUtils.getOffset(entry2.getLedgerId(), entry2.getEntryId()) + ((ManagedCursor) ((Pair) map.get(topicPartition)).getLeft()).getNumberOfEntries();
                    short apiVersion = kafkaHeaderAndRequest.getHeader().apiVersion();
                    byte b = 2;
                    if (apiVersion <= 1) {
                        b = 0;
                    } else if (apiVersion <= 3) {
                        b = 1;
                    }
                    partitionData = new FetchResponse.PartitionData(Errors.NONE, offset, offset, offset, (List) null, this.requestHandler.getEntryFormatter().decode(list, b));
                }
                linkedHashMap.put(topicPartition, partitionData);
            });
            if (!atomicBoolean.get()) {
                completableFuture.complete(new FetchResponse(Errors.NONE, linkedHashMap, ((Integer) CommonFields.THROTTLE_TIME_MS.defaultValue).intValue(), kafkaHeaderAndRequest.getRequest().metadata().sessionId()));
                recycle();
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Request {}: All partitions for request read 0 entry", kafkaHeaderAndRequest.getHeader());
                }
                this.requestHandler.getPulsarService().getExecutor().schedule(() -> {
                    completableFuture.complete(new FetchResponse(Errors.NONE, linkedHashMap, ((Integer) CommonFields.THROTTLE_TIME_MS.defaultValue).intValue(), kafkaHeaderAndRequest.getRequest().metadata().sessionId()));
                    recycle();
                }, maxWait, TimeUnit.MILLISECONDS);
            }
        });
    }

    private Map<TopicPartition, CompletableFuture<List<Entry>>> readAllCursorOnce(Map<TopicPartition, Pair<ManagedCursor, Long>> map) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        map.entrySet().parallelStream().forEach(entry -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            final ManagedCursor managedCursor = (ManagedCursor) ((Pair) entry.getValue()).getLeft();
            final long longValue = ((Long) ((Pair) entry.getValue()).getRight()).longValue();
            managedCursor.asyncReadEntries(this.requestHandler.getMaxReadEntriesNum(), new AsyncCallbacks.ReadEntriesCallback() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.2
                public void readEntriesComplete(List<Entry> list, Object obj) {
                    String kopTopic = KopTopic.toString((TopicPartition) entry.getKey());
                    if (!list.isEmpty()) {
                        Stream stream = StreamSupport.stream(list.spliterator(), true);
                        ManagedCursor managedCursor2 = managedCursor;
                        Map map2 = map;
                        Map.Entry entry = entry;
                        long j = longValue;
                        stream.forEachOrdered(entry2 -> {
                            long offset = MessageIdUtils.getOffset(entry2.getLedgerId(), entry2.getEntryId());
                            PositionImpl positionImpl = PositionImpl.get(entry2.getLedgerId(), entry2.getEntryId());
                            MessageFetchContext.commitOffset((NonDurableCursorImpl) managedCursor2, positionImpl);
                            PositionImpl nextAvailablePosition = ((NonDurableCursorImpl) managedCursor2).getNextAvailablePosition(positionImpl);
                            long offset2 = MessageIdUtils.getOffset(nextAvailablePosition.getLedgerId(), nextAvailablePosition.getEntryId());
                            map2.put((TopicPartition) entry.getKey(), Pair.of(managedCursor2, Long.valueOf(offset2)));
                            if (MessageFetchContext.log.isDebugEnabled()) {
                                MessageFetchContext.log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}, ConsumerManager original offset: {}, entryOffset: {} - {}, nextOffset: {} - {}", new Object[]{kopTopic, Long.valueOf(entry2.getLedgerId()), Long.valueOf(entry2.getEntryId()), Integer.valueOf(entry2.getLength()), Long.valueOf(j), Long.valueOf(offset), positionImpl, Long.valueOf(offset2), nextAvailablePosition});
                            }
                        });
                    }
                    completableFuture.complete(list);
                }

                public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    MessageFetchContext.log.error("Error read entry for topic: {}", KopTopic.toString((TopicPartition) entry.getKey()));
                    completableFuture.completeExceptionally(managedLedgerException);
                }
            }, (Object) null);
            concurrentHashMap.putIfAbsent((TopicPartition) entry.getKey(), completableFuture);
        });
        return concurrentHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void commitOffset(NonDurableCursorImpl nonDurableCursorImpl, final PositionImpl positionImpl) {
        nonDurableCursorImpl.asyncMarkDelete(positionImpl, new AsyncCallbacks.MarkDeleteCallback() { // from class: io.streamnative.pulsar.handlers.kop.MessageFetchContext.3
            public void markDeleteComplete(Object obj) {
                if (MessageFetchContext.log.isDebugEnabled()) {
                    MessageFetchContext.log.debug("Mark delete success for position: {}", positionImpl);
                }
            }

            public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
                MessageFetchContext.log.warn("Mark delete success for position: {} with error:", positionImpl, managedLedgerException);
            }
        }, (Object) null);
    }
}
