package org.apache.rocketmq.tieredstore;

import com.google.common.base.Stopwatch;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.TopicMetadata;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;

/* loaded from: input_file:org/apache/rocketmq/tieredstore/TieredMessageStore.class */
public class TieredMessageStore extends AbstractPluginMessageStore {
    protected static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
    protected final String brokerName;
    protected final TieredMessageStoreConfig storeConfig;
    protected final TieredMetadataStore metadataStore;
    protected final TieredDispatcher dispatcher;
    protected final TieredMessageFetcher fetcher;
    protected final TieredFlatFileManager flatFileManager;

    public TieredMessageStore(MessageStorePluginContext messageStorePluginContext, MessageStore messageStore) {
        super(messageStorePluginContext, messageStore);
        this.storeConfig = new TieredMessageStoreConfig();
        messageStorePluginContext.registerConfiguration(this.storeConfig);
        this.brokerName = this.storeConfig.getBrokerName();
        TieredStoreUtil.addSystemTopic(this.storeConfig.getBrokerClusterName());
        TieredStoreUtil.addSystemTopic(this.brokerName);
        TieredStoreExecutor.init();
        this.metadataStore = TieredStoreUtil.getMetadataStore(this.storeConfig);
        this.fetcher = new TieredMessageFetcher(this.storeConfig);
        this.dispatcher = new TieredDispatcher(messageStore, this.storeConfig);
        this.flatFileManager = TieredFlatFileManager.getInstance(this.storeConfig);
        messageStore.addDispatcher(this.dispatcher);
    }

    public boolean load() {
        boolean z = this.flatFileManager.load() && this.next.load();
        if (z) {
            this.dispatcher.start();
        }
        return z;
    }

    public TieredMessageStoreConfig getStoreConfig() {
        return this.storeConfig;
    }

    public boolean viaTieredStorage(String str, int i, long j) {
        return viaTieredStorage(str, i, j, 1);
    }

    public boolean viaTieredStorage(String str, int i, long j, int i2) {
        CompositeQueueFlatFile flatFile;
        TieredMessageStoreConfig.TieredStorageLevel tieredStorageLevel = this.storeConfig.getTieredStorageLevel();
        if (tieredStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) {
            return true;
        }
        if (!tieredStorageLevel.isEnable() || (flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str, this.brokerName, i))) == null || j >= flatFile.getConsumeQueueCommitOffset()) {
            return false;
        }
        if (!tieredStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.NOT_IN_DISK) || this.next.checkInStoreByConsumeOffset(str, i, j)) {
            return tieredStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.NOT_IN_MEM) && !this.next.checkInMemByConsumeOffset(str, i, j, i2);
        }
        return true;
    }

    public GetMessageResult getMessage(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        return getMessageAsync(str, str2, i, j, i2, messageFilter).join();
    }

    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        if (viaTieredStorage(str2, i, j, i2)) {
            Stopwatch createStarted = Stopwatch.createStarted();
            return this.fetcher.getMessageAsync(str, str2, i, j, i2, messageFilter).thenApply(getMessageResult -> {
                Attributes build = TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE).put(TieredStoreMetricsConstant.LABEL_TOPIC, str2).put(TieredStoreMetricsConstant.LABEL_GROUP, str).build();
                TieredStoreMetricsManager.apiLatency.record(createStarted.elapsed(TimeUnit.MILLISECONDS), build);
                if ((getMessageResult.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL || getMessageResult.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE || getMessageResult.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) && this.next.checkInStoreByConsumeOffset(str2, i, j)) {
                    TieredStoreMetricsManager.fallbackTotal.add(1L, build);
                    logger.debug("GetMessageAsync not found then try back to next store, result: {}, topic: {}, queue: {}, queue offset: {}, offset range: {}-{}", new Object[]{getMessageResult.getStatus(), str2, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(getMessageResult.getMinOffset()), Long.valueOf(getMessageResult.getMaxOffset())});
                    return this.next.getMessage(str, str2, i, j, i2, messageFilter);
                }
                if (getMessageResult.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE && (TieredStoreUtil.isSystemTopic(str2) || PopAckConstants.isStartWithRevivePrefix(str2))) {
                    return this.next.getMessage(str, str2, i, j, i2, messageFilter);
                }
                if (getMessageResult.getStatus() != GetMessageStatus.FOUND && getMessageResult.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE && getMessageResult.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
                    logger.warn("GetMessageAsync not found and message is not in next store, result: {}, topic: {}, queue: {}, queue offset: {}, offset range: {}-{}", new Object[]{getMessageResult.getStatus(), str2, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(getMessageResult.getMinOffset()), Long.valueOf(getMessageResult.getMaxOffset())});
                }
                if (getMessageResult.getStatus() == GetMessageStatus.FOUND) {
                    TieredStoreMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_TOPIC, str2).put(TieredStoreMetricsConstant.LABEL_GROUP, str).build());
                }
                long minOffsetInQueue = this.next.getMinOffsetInQueue(str2, i);
                if (minOffsetInQueue >= 0 && minOffsetInQueue < getMessageResult.getMinOffset()) {
                    getMessageResult.setMinOffset(minOffsetInQueue);
                }
                long maxOffsetInQueue = this.next.getMaxOffsetInQueue(str2, i);
                if (maxOffsetInQueue >= 0 && maxOffsetInQueue > getMessageResult.getMaxOffset()) {
                    getMessageResult.setMaxOffset(maxOffsetInQueue);
                }
                return getMessageResult;
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                logger.error("GetMessageAsync from tiered store failed: ", th);
                return this.next.getMessage(str, str2, i, j, i2, messageFilter);
            });
        }
        logger.debug("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", new Object[]{str2, Integer.valueOf(i), Long.valueOf(j)});
        return this.next.getMessageAsync(str, str2, i, j, i2, messageFilter);
    }

    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner messageExtBrokerInner) {
        return super.asyncPutMessage(messageExtBrokerInner);
    }

    public long getMinOffsetInQueue(String str, int i) {
        long minOffsetInQueue = this.next.getMinOffsetInQueue(str, i);
        CompositeQueueFlatFile flatFile = this.flatFileManager.getFlatFile(new MessageQueue(str, this.brokerName, i));
        if (flatFile == null) {
            return minOffsetInQueue;
        }
        long consumeQueueMinOffset = flatFile.getConsumeQueueMinOffset();
        return consumeQueueMinOffset < 0 ? minOffsetInQueue : Math.min(minOffsetInQueue, consumeQueueMinOffset);
    }

    public long getEarliestMessageTime(String str, int i) {
        return getEarliestMessageTimeAsync(str, i).join().longValue();
    }

    public CompletableFuture<Long> getEarliestMessageTimeAsync(String str, int i) {
        long earliestMessageTime = this.next.getEarliestMessageTime(str, i);
        long j = earliestMessageTime > 0 ? earliestMessageTime : Long.MAX_VALUE;
        Stopwatch createStarted = Stopwatch.createStarted();
        return this.fetcher.getEarliestMessageTimeAsync(str, i).thenApply(l -> {
            TieredStoreMetricsManager.apiLatency.record(createStarted.elapsed(TimeUnit.MILLISECONDS), TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME).put(TieredStoreMetricsConstant.LABEL_TOPIC, str).build());
            if (l.longValue() >= 0) {
                return Long.valueOf(Math.min(j, l.longValue()));
            }
            logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}", str, Integer.valueOf(i));
            return Long.valueOf(j != Long.MAX_VALUE ? j : -1L);
        });
    }

    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String str, int i, long j) {
        if (!viaTieredStorage(str, i, j)) {
            return this.next.getMessageStoreTimeStampAsync(str, i, j);
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        return this.fetcher.getMessageStoreTimeStampAsync(str, i, j).thenApply(l -> {
            TieredStoreMetricsManager.apiLatency.record(createStarted.elapsed(TimeUnit.MILLISECONDS), TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET).put(TieredStoreMetricsConstant.LABEL_TOPIC, str).build());
            if (l.longValue() != -1) {
                return l;
            }
            logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j)});
            return Long.valueOf(this.next.getMessageStoreTimeStamp(str, i, j));
        });
    }

    public long getOffsetInQueueByTime(String str, int i, long j) {
        return getOffsetInQueueByTime(str, i, j, BoundaryType.LOWER);
    }

    public long getOffsetInQueueByTime(String str, int i, long j, BoundaryType boundaryType) {
        long earliestMessageTime = this.next.getEarliestMessageTime();
        if (earliestMessageTime <= 0) {
            logger.warn("TieredMessageStore#getOffsetInQueueByTimeAsync: get earliest message time in next store failed: {}", Long.valueOf(earliestMessageTime));
            return this.next.getOffsetInQueueByTime(str, i, j);
        }
        boolean z = this.storeConfig.getTieredStorageLevel() == TieredMessageStoreConfig.TieredStorageLevel.FORCE;
        if (j >= earliestMessageTime && !z) {
            return this.next.getOffsetInQueueByTime(str, i, j);
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        long offsetInQueueByTime = this.fetcher.getOffsetInQueueByTime(str, i, j, boundaryType);
        TieredStoreMetricsManager.apiLatency.record(createStarted.elapsed(TimeUnit.MILLISECONDS), TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_OFFSET_BY_TIME).put(TieredStoreMetricsConstant.LABEL_TOPIC, str).build());
        return (offsetInQueueByTime != -1 || z) ? offsetInQueueByTime : this.next.getOffsetInQueueByTime(str, i, j);
    }

    public QueryMessageResult queryMessage(String str, String str2, int i, long j, long j2) {
        return queryMessageAsync(str, str2, i, j, j2).join();
    }

    public CompletableFuture<QueryMessageResult> queryMessageAsync(String str, String str2, int i, long j, long j2) {
        long earliestMessageTime = this.next.getEarliestMessageTime();
        if (earliestMessageTime <= 0) {
            logger.warn("TieredMessageStore#queryMessageAsync: get earliest message time in next store failed: {}", Long.valueOf(earliestMessageTime));
        }
        boolean z = this.storeConfig.getTieredStorageLevel() == TieredMessageStoreConfig.TieredStorageLevel.FORCE;
        QueryMessageResult queryMessageResult = (j2 < earliestMessageTime || z) ? new QueryMessageResult() : this.next.queryMessage(str, str2, i, j, j2);
        int size = queryMessageResult.getMessageBufferList().size();
        if ((size >= i || j >= earliestMessageTime) && !z) {
            return CompletableFuture.completedFuture(queryMessageResult);
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            return this.fetcher.queryMessageAsync(str, str2, i - size, j, z ? j2 : earliestMessageTime).thenApply(queryMessageResult2 -> {
                TieredStoreMetricsManager.apiLatency.record(createStarted.elapsed(TimeUnit.MILLISECONDS), TieredStoreMetricsManager.newAttributesBuilder().put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_QUERY_MESSAGE).put(TieredStoreMetricsConstant.LABEL_TOPIC, str).build());
                Iterator it = queryMessageResult2.getMessageMapedList().iterator();
                while (it.hasNext()) {
                    queryMessageResult.addMessage((SelectMappedBufferResult) it.next());
                }
                return queryMessageResult;
            });
        } catch (Exception e) {
            logger.error("TieredMessageStore#queryMessageAsync: query message in tiered store failed", e);
            return CompletableFuture.completedFuture(queryMessageResult);
        }
    }

    public List<Pair<InstrumentSelector, View>> getMetricsView() {
        List<Pair<InstrumentSelector, View>> metricsView = super.getMetricsView();
        metricsView.addAll(TieredStoreMetricsManager.getMetricsView());
        return metricsView;
    }

    public void initMetrics(Meter meter, Supplier<AttributesBuilder> supplier) {
        super.initMetrics(meter, supplier);
        TieredStoreMetricsManager.init(meter, supplier, this.storeConfig, this.fetcher, this.next);
    }

    public void shutdown() {
        this.next.shutdown();
        this.dispatcher.shutdown();
        TieredFlatFileManager.getInstance(this.storeConfig).shutdown();
        TieredStoreExecutor.shutdown();
    }

    public void destroy() {
        this.next.destroy();
        TieredFlatFileManager.getInstance(this.storeConfig).destroy();
        try {
            this.metadataStore.destroy();
        } catch (Exception e) {
            logger.error("TieredMessageStore#destroy: destroy metadata store failed", e);
        }
    }

    public int cleanUnusedTopic(Set<String> set) {
        try {
            this.metadataStore.iterateTopic(topicMetadata -> {
                String topic = topicMetadata.getTopic();
                if (set.contains(topic) || TopicValidator.isSystemTopic(topic) || MixAll.isLmq(topic)) {
                    return;
                }
                logger.info("TieredMessageStore#cleanUnusedTopic: start deleting topic {}", topic);
                try {
                    destroyCompositeFlatFile(topicMetadata);
                } catch (Exception e) {
                    logger.error("TieredMessageStore#cleanUnusedTopic: delete topic {} failed", topic, e);
                }
            });
        } catch (Exception e) {
            logger.error("TieredMessageStore#cleanUnusedTopic: iterate topic metadata failed", e);
        }
        return this.next.cleanUnusedTopic(set);
    }

    public int deleteTopics(Set<String> set) {
        for (String str : set) {
            logger.info("TieredMessageStore#deleteTopics: start deleting topic {}", str);
            try {
                TopicMetadata topic = this.metadataStore.getTopic(str);
                if (topic != null) {
                    destroyCompositeFlatFile(topic);
                } else {
                    logger.error("TieredMessageStore#deleteTopics: delete topic {} failed, can not obtain metadata", str);
                }
            } catch (Exception e) {
                logger.error("TieredMessageStore#deleteTopics: delete topic {} failed", str, e);
            }
        }
        return this.next.deleteTopics(set);
    }

    public void destroyCompositeFlatFile(TopicMetadata topicMetadata) {
        this.metadataStore.iterateQueue(topicMetadata.getTopic(), queueMetadata -> {
            MessageQueue queue = queueMetadata.getQueue();
            if (this.flatFileManager.getFlatFile(queue) != null) {
                this.flatFileManager.destroyCompositeFile(queue);
                try {
                    this.metadataStore.deleteQueue(queue);
                    logger.info("TieredMessageStore#destroyCompositeFlatFile: destroy flatFile success: topic: {}, queueId: {}", queue.getTopic(), Integer.valueOf(queue.getQueueId()));
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }
        });
        this.metadataStore.deleteTopic(topicMetadata.getTopic());
    }
}
