package org.apache.rocketmq.tieredstore;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/TieredMessageFetcher.class */
public class TieredMessageFetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
    private final TieredMessageStoreConfig storeConfig;
    private final String brokerName;
    private TieredMetadataStore metadataStore;
    private final TieredFlatFileManager flatFileManager;
    protected final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> readAheadCache;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.tieredstore.TieredMessageFetcher$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/tieredstore/TieredMessageFetcher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode = new int[TieredStoreErrorCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode[TieredStoreErrorCode.NO_NEW_DATA.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    public TieredMessageFetcher(TieredMessageStoreConfig tieredMessageStoreConfig) {
        this.storeConfig = tieredMessageStoreConfig;
        this.brokerName = tieredMessageStoreConfig.getBrokerName();
        this.flatFileManager = TieredFlatFileManager.getInstance(tieredMessageStoreConfig);
        this.readAheadCache = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler()).expireAfterWrite(tieredMessageStoreConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS).maximumWeight((long) (Runtime.getRuntime().maxMemory() * tieredMessageStoreConfig.getReadAheadCacheSizeThresholdRate())).weigher((messageCacheKey, selectMappedBufferResultWrapper) -> {
            return selectMappedBufferResultWrapper.getDuplicateResult().getSize();
        }).recordStats().build();
        try {
            this.metadataStore = TieredStoreUtil.getMetadataStore(tieredMessageStoreConfig);
        } catch (Exception e) {
        }
    }

    public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> getReadAheadCache() {
        return this.readAheadCache;
    }

    public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQueueFlatFile compositeQueueFlatFile, String str, long j, int i) {
        return getMessageFromCacheAsync(compositeQueueFlatFile, str, j, i, true);
    }

    protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile compositeFlatFile, long j, SelectMappedBufferResult selectMappedBufferResult, long j2, long j3, int i) {
        return putMessageToCache(compositeFlatFile, j, selectMappedBufferResult, j2, j3, i, false);
    }

    protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile compositeFlatFile, long j, SelectMappedBufferResult selectMappedBufferResult, long j2, long j3, int i, boolean z) {
        SelectMappedBufferResultWrapper selectMappedBufferResultWrapper = new SelectMappedBufferResultWrapper(selectMappedBufferResult, j, j2, j3, i);
        if (z) {
            selectMappedBufferResultWrapper.addAccessCount();
        }
        this.readAheadCache.put(new MessageCacheKey(compositeFlatFile, j), selectMappedBufferResultWrapper);
        return selectMappedBufferResultWrapper;
    }

    @Nullable
    protected SelectMappedBufferResultWrapper getMessageFromCache(CompositeFlatFile compositeFlatFile, long j) {
        return (SelectMappedBufferResultWrapper) this.readAheadCache.getIfPresent(new MessageCacheKey(compositeFlatFile, j));
    }

    protected void recordCacheAccess(CompositeFlatFile compositeFlatFile, String str, long j, List<SelectMappedBufferResultWrapper> list) {
        if (list.size() > 0) {
            j = list.get(list.size() - 1).getCurOffset();
        }
        compositeFlatFile.recordGroupAccess(str, j);
        for (SelectMappedBufferResultWrapper selectMappedBufferResultWrapper : list) {
            selectMappedBufferResultWrapper.addAccessCount();
            if (selectMappedBufferResultWrapper.getAccessCount() >= compositeFlatFile.getActiveGroupCount()) {
                this.readAheadCache.invalidate(new MessageCacheKey(compositeFlatFile, selectMappedBufferResultWrapper.getCurOffset()));
            }
        }
    }

    private void preFetchMessage(CompositeQueueFlatFile compositeQueueFlatFile, String str, int i, long j) {
        long j2;
        if (i == 1 || compositeQueueFlatFile.getReadAheadFactor() == 1) {
            return;
        }
        MessageQueue messageQueue = compositeQueueFlatFile.getMessageQueue();
        if (compositeQueueFlatFile.getInflightRequest(str, j, Math.min(i * compositeQueueFlatFile.getReadAheadFactor(), this.storeConfig.getReadAheadMessageCountThreshold())).isAllDone()) {
            synchronized (compositeQueueFlatFile) {
                InFlightRequestFuture inflightRequest = compositeQueueFlatFile.getInflightRequest(j, i);
                if (inflightRequest.isAllDone()) {
                    long longValue = inflightRequest.getLastFuture().join().longValue();
                    boolean z = getMessageFromCache(compositeQueueFlatFile, j) == null;
                    LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, cacheRemainCount={}", new Object[]{str, Long.valueOf(j), Long.valueOf(longValue), Boolean.valueOf(z), Integer.valueOf((int) (longValue - j))});
                    if (z || (longValue != -1 && j >= inflightRequest.getStartOffset())) {
                        if (z) {
                            j2 = j;
                            compositeQueueFlatFile.decreaseReadAheadFactor();
                        } else {
                            j2 = longValue + 1;
                            compositeQueueFlatFile.increaseReadAheadFactor();
                        }
                        int min = Math.min(compositeQueueFlatFile.getReadAheadFactor(), this.storeConfig.getReadAheadMessageCountThreshold() / i);
                        int i2 = 0;
                        int i3 = 1;
                        if (min > this.storeConfig.getReadAheadBatchSizeFactorThreshold()) {
                            i2 = min % this.storeConfig.getReadAheadBatchSizeFactorThreshold() == 0 ? 0 : 1;
                            i3 = (min / this.storeConfig.getReadAheadBatchSizeFactorThreshold()) + i2;
                        }
                        int min2 = i * Math.min(min, this.storeConfig.getReadAheadBatchSizeFactorThreshold());
                        ArrayList arrayList = new ArrayList();
                        long j3 = j2;
                        if (i2 == 1) {
                            int readAheadBatchSizeFactorThreshold = (min % this.storeConfig.getReadAheadBatchSizeFactorThreshold()) * i;
                            arrayList.add(Pair.of(Integer.valueOf(readAheadBatchSizeFactorThreshold), prefetchAndPutMsgToCache(compositeQueueFlatFile, messageQueue, j3, readAheadBatchSizeFactorThreshold)));
                            j3 += readAheadBatchSizeFactorThreshold;
                        }
                        for (long j4 = 0; j4 < i3 - i2; j4++) {
                            arrayList.add(Pair.of(Integer.valueOf(min2), prefetchAndPutMsgToCache(compositeQueueFlatFile, messageQueue, j3 + (j4 * min2), min2)));
                        }
                        compositeQueueFlatFile.putInflightRequest(str, j2, i * min, arrayList);
                        LOGGER.debug("TieredMessageFetcher#preFetchMessage: try to prefetch messages for later requests: next begin offset: {}, request offset: {}, factor: {}, flag: {}, request batch: {}, concurrency: {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(min), Integer.valueOf(i2), Integer.valueOf(min2), Integer.valueOf(i3)});
                    }
                }
            }
        }
    }

    private CompletableFuture<Long> prefetchAndPutMsgToCache(CompositeQueueFlatFile compositeQueueFlatFile, MessageQueue messageQueue, long j, int i) {
        return getMessageFromTieredStoreAsync(compositeQueueFlatFile, j, i).thenApplyAsync(getMessageResult -> {
            if (getMessageResult.getStatus() != GetMessageStatus.FOUND) {
                LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i), getMessageResult.getStatus()});
                return -1L;
            }
            List messageQueueOffset = getMessageResult.getMessageQueueOffset();
            List messageMapedList = getMessageResult.getMessageMapedList();
            if (messageQueueOffset.size() != messageMapedList.size()) {
                LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, offsetList size: {}, msgList size: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(messageQueueOffset.size()), Integer.valueOf(messageMapedList.size())});
                return -1L;
            }
            if (messageQueueOffset.isEmpty()) {
                LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, batch size: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i)});
                return -1L;
            }
            Long l = (Long) messageQueueOffset.get(0);
            Long l2 = (Long) messageQueueOffset.get(messageQueueOffset.size() - 1);
            int size = messageQueueOffset.size();
            for (int i2 = 0; i2 < messageQueueOffset.size(); i2++) {
                putMessageToCache(compositeQueueFlatFile, ((Long) messageQueueOffset.get(i2)).longValue(), (SelectMappedBufferResult) messageMapedList.get(i2), l.longValue(), l2.longValue(), size);
            }
            if (size != i || l2.longValue() != (j + i) - 1) {
                LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, max offset: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(size), Long.valueOf(j), l, Long.valueOf((j + i) - 1), l2});
            }
            return l2;
        }, (Executor) TieredStoreExecutor.fetchDataExecutor);
    }

    private CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQueueFlatFile compositeQueueFlatFile, String str, long j, int i, boolean z) {
        CompletableFuture thenApplyAsync;
        MessageQueue messageQueue = compositeQueueFlatFile.getMessageQueue();
        long j2 = j - 1;
        ArrayList arrayList = new ArrayList(i);
        int i2 = 0;
        while (true) {
            if (i2 >= i) {
                break;
            }
            j2++;
            SelectMappedBufferResultWrapper messageFromCache = getMessageFromCache(compositeQueueFlatFile, j2);
            if (messageFromCache == null) {
                j2--;
                break;
            }
            arrayList.add(messageFromCache);
            i2++;
        }
        if (z) {
            Attributes build = TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_TOPIC, messageQueue.getTopic()).put(TieredStoreMetricsConstant.LABEL_GROUP, str).build();
            TieredStoreMetricsManager.cacheAccess.add(i, build);
            TieredStoreMetricsManager.cacheHit.add(arrayList.size(), build);
        }
        if (arrayList.isEmpty() && z) {
            CompletableFuture<Long> future = compositeQueueFlatFile.getInflightRequest(str, j, i).getFuture(j);
            if (!future.isDone()) {
                Stopwatch createStarted = Stopwatch.createStarted();
                return future.thenCompose(l -> {
                    LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
                    return getMessageFromCacheAsync(compositeQueueFlatFile, str, j, i, false);
                });
            }
        }
        int i3 = 0;
        while (true) {
            if (i3 >= i - arrayList.size()) {
                break;
            }
            j2++;
            SelectMappedBufferResultWrapper messageFromCache2 = getMessageFromCache(compositeQueueFlatFile, j2);
            if (messageFromCache2 == null) {
                j2--;
                break;
            }
            arrayList.add(messageFromCache2);
            i3++;
        }
        recordCacheAccess(compositeQueueFlatFile, str, j, arrayList);
        if (arrayList.isEmpty()) {
            LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i)});
            synchronized (compositeQueueFlatFile) {
                int readAheadMinFactor = i * this.storeConfig.getReadAheadMinFactor();
                thenApplyAsync = getMessageFromTieredStoreAsync(compositeQueueFlatFile, j, readAheadMinFactor).thenApplyAsync(getMessageResult -> {
                    if (getMessageResult.getStatus() != GetMessageStatus.FOUND) {
                        return getMessageResult;
                    }
                    GetMessageResult getMessageResult = new GetMessageResult();
                    getMessageResult.setStatus(GetMessageStatus.FOUND);
                    getMessageResult.setMinOffset(compositeQueueFlatFile.getConsumeQueueMinOffset());
                    getMessageResult.setMaxOffset(compositeQueueFlatFile.getConsumeQueueCommitOffset());
                    List messageQueueOffset = getMessageResult.getMessageQueueOffset();
                    List messageMapedList = getMessageResult.getMessageMapedList();
                    Long l2 = (Long) messageQueueOffset.get(0);
                    Long l3 = (Long) messageQueueOffset.get(messageQueueOffset.size() - 1);
                    int size = messageQueueOffset.size();
                    for (int i4 = 0; i4 < messageQueueOffset.size(); i4++) {
                        Long l4 = (Long) messageQueueOffset.get(i4);
                        SelectMappedBufferResultWrapper putMessageToCache = putMessageToCache(compositeQueueFlatFile, l4.longValue(), (SelectMappedBufferResult) messageMapedList.get(i4), l2.longValue(), l3.longValue(), size, true);
                        if (getMessageResult.getMessageMapedList().size() < i) {
                            getMessageResult.addMessage(putMessageToCache.getDuplicateResult(), l4.longValue());
                        }
                    }
                    getMessageResult.setNextBeginOffset(j + getMessageResult.getMessageMapedList().size());
                    return getMessageResult;
                }, (Executor) TieredStoreExecutor.fetchDataExecutor);
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(Pair.of(Integer.valueOf(readAheadMinFactor), thenApplyAsync.thenApply(getMessageResult2 -> {
                    if (getMessageResult2.getStatus() == GetMessageStatus.FOUND) {
                        return (Long) getMessageResult2.getMessageQueueOffset().get(getMessageResult2.getMessageQueueOffset().size() - 1);
                    }
                    return -1L;
                })));
                compositeQueueFlatFile.putInflightRequest(str, j, readAheadMinFactor, arrayList2);
            }
            return thenApplyAsync;
        }
        LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(arrayList.size())});
        preFetchMessage(compositeQueueFlatFile, str, i, j2 + 1);
        GetMessageResult getMessageResult3 = new GetMessageResult();
        getMessageResult3.setStatus(GetMessageStatus.FOUND);
        getMessageResult3.setMinOffset(compositeQueueFlatFile.getConsumeQueueMinOffset());
        getMessageResult3.setMaxOffset(compositeQueueFlatFile.getConsumeQueueCommitOffset());
        getMessageResult3.setNextBeginOffset(j + arrayList.size());
        arrayList.forEach(selectMappedBufferResultWrapper -> {
            getMessageResult3.addMessage(selectMappedBufferResultWrapper.getDuplicateResult(), selectMappedBufferResultWrapper.getCurOffset());
        });
        return CompletableFuture.completedFuture(getMessageResult3);
    }

    public CompletableFuture<GetMessageResult> getMessageFromTieredStoreAsync(CompositeQueueFlatFile compositeQueueFlatFile, long j, int i) {
        GetMessageResult getMessageResult = new GetMessageResult();
        getMessageResult.setMinOffset(compositeQueueFlatFile.getConsumeQueueMinOffset());
        getMessageResult.setMaxOffset(compositeQueueFlatFile.getConsumeQueueCommitOffset());
        try {
            CompletableFuture<ByteBuffer> consumeQueueAsync = compositeQueueFlatFile.getConsumeQueueAsync(j, i);
            return consumeQueueAsync.thenCombineAsync(consumeQueueAsync.thenComposeAsync(byteBuffer -> {
                long commitLogOffset = CQItemBufferUtil.getCommitLogOffset(byteBuffer);
                byteBuffer.position(byteBuffer.remaining() - 20);
                long commitLogOffset2 = CQItemBufferUtil.getCommitLogOffset(byteBuffer);
                if (commitLogOffset2 < commitLogOffset) {
                    MessageQueue messageQueue = compositeQueueFlatFile.getMessageQueue();
                    LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is not in order, try to fetch data in next store, topic: {}, queueId: {}, batch size: {}, queue offset {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i), Long.valueOf(j)});
                    throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in order");
                }
                long size = (commitLogOffset2 - commitLogOffset) + CQItemBufferUtil.getSize(byteBuffer);
                while (byteBuffer.limit() > 20 && size > this.storeConfig.getReadAheadMessageSizeThreshold()) {
                    byteBuffer.limit(byteBuffer.position());
                    byteBuffer.position(byteBuffer.limit() - 20);
                    size = (CQItemBufferUtil.getCommitLogOffset(byteBuffer) - commitLogOffset) + CQItemBufferUtil.getSize(byteBuffer);
                }
                if (size != size) {
                    MessageQueue messageQueue2 = compositeQueueFlatFile.getMessageQueue();
                    LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}", new Object[]{messageQueue2.getTopic(), Integer.valueOf(messageQueue2.getQueueId()), Integer.valueOf(i), Long.valueOf(size), Long.valueOf(size)});
                }
                return compositeQueueFlatFile.getCommitLogAsync(commitLogOffset, (int) size);
            }, (Executor) TieredStoreExecutor.fetchDataExecutor), (byteBuffer2, byteBuffer3) -> {
                List<Pair<Integer, Integer>> splitMessageBuffer = MessageBufferUtil.splitMessageBuffer(byteBuffer2, byteBuffer3);
                if (splitMessageBuffer.isEmpty()) {
                    long remaining = j + (byteBuffer2.remaining() / 20);
                    LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, consume queue buffer size: {}, message buffer size: {}, change offset from {} to {}", new Object[]{Integer.valueOf(byteBuffer2.remaining()), Integer.valueOf(byteBuffer3.remaining()), Long.valueOf(j), Long.valueOf(remaining)});
                    getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
                    getMessageResult.setNextBeginOffset(remaining);
                    return getMessageResult;
                }
                int remaining2 = byteBuffer2.remaining() / 20;
                getMessageResult.setStatus(GetMessageStatus.FOUND);
                getMessageResult.setNextBeginOffset(j + splitMessageBuffer.size());
                splitMessageBuffer.forEach(pair -> {
                    byteBuffer3.position(((Integer) pair.getLeft()).intValue());
                    ByteBuffer slice = byteBuffer3.slice();
                    slice.limit(((Integer) pair.getRight()).intValue());
                    getMessageResult.addMessage(new SelectMappedBufferResult(((Integer) pair.getLeft()).intValue(), slice, ((Integer) pair.getRight()).intValue(), (MappedFile) null), MessageBufferUtil.getQueueOffset(slice));
                });
                if (remaining2 != splitMessageBuffer.size()) {
                    HashSet hashSet = new HashSet();
                    for (int i2 = 0; i2 < remaining2; i2++) {
                        hashSet.add(Long.valueOf(j + i2));
                    }
                    LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, batch size: {}, request message count: {}, actual message count: {}, these messages may lost: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(remaining2), Integer.valueOf(splitMessageBuffer.size()), Sets.difference(hashSet, Sets.newHashSet(getMessageResult.getMessageQueueOffset()))});
                } else if (remaining2 != i) {
                    LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message count does not meet batch size, maybe dispatch delay: batch size: {}, request message count: {}", Integer.valueOf(i), Integer.valueOf(remaining2));
                }
                return getMessageResult;
            }, (Executor) TieredStoreExecutor.fetchDataExecutor).exceptionally((Function<Throwable, ? extends V>) th -> {
                MessageQueue messageQueue = compositeQueueFlatFile.getMessageQueue();
                LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: get message failed: topic: {} queueId: {}", new Object[]{messageQueue.getTopic(), Integer.valueOf(messageQueue.getQueueId()), th});
                getMessageResult.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
                getMessageResult.setNextBeginOffset(j);
                return getMessageResult;
            });
        } catch (TieredStoreException e) {
            switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$tieredstore$exception$TieredStoreErrorCode[e.getErrorCode().ordinal()]) {
                case FileSegmentMetadata.STATUS_SEALED /* 1 */:
                    getMessageResult.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
                    getMessageResult.setNextBeginOffset(j);
                    return CompletableFuture.completedFuture(getMessageResult);
                default:
                    getMessageResult.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
                    getMessageResult.setNextBeginOffset(j);
                    return CompletableFuture.completedFuture(getMessageResult);
            }
        }
    }

    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        CompositeQueueFlatFile flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str2, this.brokerName, i));
        if (flatFile == null) {
            GetMessageResult getMessageResult = new GetMessageResult();
            getMessageResult.setNextBeginOffset(j);
            getMessageResult.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
            return CompletableFuture.completedFuture(getMessageResult);
        }
        GetMessageResult getMessageResult2 = new GetMessageResult();
        long consumeQueueMinOffset = flatFile.getConsumeQueueMinOffset();
        getMessageResult2.setMinOffset(consumeQueueMinOffset);
        long consumeQueueCommitOffset = flatFile.getConsumeQueueCommitOffset();
        getMessageResult2.setMaxOffset(consumeQueueCommitOffset);
        if (flatFile.getConsumeQueueCommitOffset() <= 0) {
            getMessageResult2.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
            getMessageResult2.setNextBeginOffset(j);
            return CompletableFuture.completedFuture(getMessageResult2);
        }
        if (j < consumeQueueMinOffset) {
            getMessageResult2.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
            getMessageResult2.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
            return CompletableFuture.completedFuture(getMessageResult2);
        }
        if (j == consumeQueueCommitOffset) {
            getMessageResult2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
            getMessageResult2.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
            return CompletableFuture.completedFuture(getMessageResult2);
        }
        if (j <= consumeQueueCommitOffset) {
            return getMessageFromCacheAsync(flatFile, str, j, i2);
        }
        getMessageResult2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
        getMessageResult2.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
        return CompletableFuture.completedFuture(getMessageResult2);
    }

    public CompletableFuture<Long> getEarliestMessageTimeAsync(String str, int i) {
        CompositeQueueFlatFile flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str, this.brokerName, i));
        return flatFile == null ? CompletableFuture.completedFuture(-1L) : flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), 64).thenApply(MessageBufferUtil::getStoreTimeStamp);
    }

    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String str, int i, long j) {
        CompositeQueueFlatFile flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str, this.brokerName, i));
        return flatFile == null ? CompletableFuture.completedFuture(-1L) : flatFile.getConsumeQueueAsync(j).thenComposeAsync(byteBuffer -> {
            return flatFile.getCommitLogAsync(CQItemBufferUtil.getCommitLogOffset(byteBuffer), CQItemBufferUtil.getSize(byteBuffer));
        }, (Executor) TieredStoreExecutor.fetchDataExecutor).thenApply((Function<? super U, ? extends U>) MessageBufferUtil::getStoreTimeStamp).exceptionally(th -> {
            LOGGER.error("TieredMessageFetcher#getMessageStoreTimeStampAsync: get or decode message failed: topic: {}, queue: {}, offset: {}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j), th});
            return -1L;
        });
    }

    public long getOffsetInQueueByTime(String str, int i, long j, BoundaryType boundaryType) {
        CompositeQueueFlatFile flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str, this.brokerName, i));
        if (flatFile == null) {
            return -1L;
        }
        try {
            return flatFile.getOffsetInConsumeQueueByTime(j, boundaryType);
        } catch (Exception e) {
            LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j), boundaryType, e});
            return -1L;
        }
    }

    public CompletableFuture<QueryMessageResult> queryMessageAsync(String str, String str2, int i, long j, long j2) {
        TieredIndexFile indexFile = TieredFlatFileManager.getIndexFile(this.storeConfig);
        int indexKeyHashMethod = TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(str, str2));
        try {
            TopicMetadata topic = this.metadataStore.getTopic(str);
            if (topic == null) {
                LOGGER.info("TieredMessageFetcher#queryMessageAsync: get topic id from metadata failed, topic metadata not found: topic: {}", str);
                return CompletableFuture.completedFuture(new QueryMessageResult());
            }
            long topicId = topic.getTopicId();
            return indexFile.queryAsync(str, str2, j, j2).thenCompose(list -> {
                QueryMessageResult queryMessageResult = new QueryMessageResult();
                int i2 = 0;
                ArrayList arrayList = new ArrayList(i);
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Pair pair = (Pair) it.next();
                    Long l = (Long) pair.getKey();
                    ByteBuffer byteBuffer = (ByteBuffer) pair.getValue();
                    if (byteBuffer.remaining() % 28 == 0) {
                        for (int position = byteBuffer.position(); position < byteBuffer.limit(); position += 28) {
                            if (byteBuffer.getInt(position) == indexKeyHashMethod && byteBuffer.getInt(position + 4) == topicId) {
                                CompositeQueueFlatFile flatFile = TieredFlatFileManager.getInstance(this.storeConfig).getFlatFile(new MessageQueue(str, this.brokerName, byteBuffer.getInt(position + 4 + 4)));
                                if (flatFile != null) {
                                    long j3 = byteBuffer.getLong(position + 4 + 4 + 4);
                                    int i3 = byteBuffer.getInt(position + 4 + 4 + 4 + 8);
                                    long longValue = l.longValue() + byteBuffer.getInt(position + 4 + 4 + 4 + 8 + 4);
                                    if (longValue >= j && longValue <= j2) {
                                        arrayList.add(flatFile.getCommitLogAsync(j3, i3).thenAccept(byteBuffer2 -> {
                                            queryMessageResult.addMessage(new SelectMappedBufferResult(0L, byteBuffer2, i3, (MappedFile) null));
                                        }));
                                        i2++;
                                        if (i2 >= i) {
                                            break;
                                        }
                                    }
                                } else {
                                    continue;
                                }
                            }
                        }
                        if (i2 >= i) {
                            break;
                        }
                    } else {
                        LOGGER.error("[Bug]TieredMessageFetcher#queryMessageAsync: index buffer size {} is not multiple of index item size {}", Integer.valueOf(byteBuffer.remaining()), 28);
                    }
                }
                return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenApply(r3 -> {
                    return queryMessageResult;
                });
            });
        } catch (Exception e) {
            LOGGER.error("TieredMessageFetcher#queryMessageAsync: get topic id from metadata failed: topic: {}", str, e);
            return CompletableFuture.completedFuture(new QueryMessageResult());
        }
    }
}
