package org.apache.kafka.tiered.storage.actions;

import java.io.PrintStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher;
import org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:org/apache/kafka/tiered/storage/actions/ConsumeAction.class */
public final class ConsumeAction implements TieredStorageTestAction {
    private final TopicPartition topicPartition;
    private final Long fetchOffset;
    private final Integer expectedTotalCount;
    private final Integer expectedFromSecondTierCount;
    private final RemoteFetchSpec remoteFetchSpec;
    private final Serde<String> serde = Serdes.String();

    public ConsumeAction(TopicPartition topicPartition, Long l, Integer num, Integer num2, RemoteFetchSpec remoteFetchSpec) {
        this.topicPartition = topicPartition;
        this.fetchOffset = l;
        this.expectedTotalCount = num;
        this.expectedFromSecondTierCount = num2;
        this.remoteFetchSpec = remoteFetchSpec;
    }

    @Override // org.apache.kafka.tiered.storage.TieredStorageTestAction
    public void doExecute(TieredStorageTestContext tieredStorageTestContext) throws InterruptedException, ExecutionException {
        LocalTieredStorageHistory tieredStorageHistory = tieredStorageTestContext.tieredStorageHistory(this.remoteFetchSpec.getSourceBrokerId());
        Optional<LocalTieredStorageEvent> latestEvent = tieredStorageHistory.latestEvent(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, this.topicPartition);
        List<ConsumerRecord<String, String>> consume = tieredStorageTestContext.consume(this.topicPartition, this.expectedTotalCount, this.fetchOffset);
        List<Record> tieredStorageRecords = TieredStorageTestUtils.tieredStorageRecords(tieredStorageTestContext, this.topicPartition);
        Optional<Record> findFirst = tieredStorageRecords.stream().filter(record -> {
            return record.offset() >= this.fetchOffset.longValue();
        }).findFirst();
        if (!findFirst.isPresent()) {
            if (this.expectedFromSecondTierCount.intValue() > 0) {
                Assertions.fail("Could not find any record with offset >= " + this.fetchOffset + " from tier storage.");
                return;
            }
            return;
        }
        int indexOf = tieredStorageRecords.indexOf(findFirst.get());
        int size = tieredStorageRecords.size() - indexOf;
        Assertions.assertFalse(this.expectedFromSecondTierCount.intValue() > size, "Not enough records found in tiered storage from offset " + this.fetchOffset + " for " + this.topicPartition + ". Expected: " + this.expectedFromSecondTierCount + ", Was: " + size);
        Assertions.assertFalse(this.expectedFromSecondTierCount.intValue() < size, "Too many records found in tiered storage from offset " + this.fetchOffset + " for " + this.topicPartition + ". Expected: " + this.expectedFromSecondTierCount + ", Was: " + size);
        MatcherAssert.assertThat(tieredStorageRecords.subList(indexOf, tieredStorageRecords.size()), RecordsKeyValueMatcher.correspondTo(consume.subList(0, this.expectedFromSecondTierCount.intValue()), this.topicPartition, this.serde, this.serde));
        List<LocalTieredStorageEvent> events = tieredStorageHistory.getEvents(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, this.topicPartition);
        Assertions.assertEquals(this.remoteFetchSpec.getCount(), ((List) latestEvent.map(localTieredStorageEvent -> {
            return (List) events.stream().filter(localTieredStorageEvent -> {
                return localTieredStorageEvent.isAfter(localTieredStorageEvent);
            }).collect(Collectors.toList());
        }).orElse(events)).size(), "Number of fetch requests from broker " + this.remoteFetchSpec.getSourceBrokerId() + " to the tier storage does not match the expected value for topic-partition " + this.remoteFetchSpec.getTopicPartition());
    }

    @Override // org.apache.kafka.tiered.storage.TieredStorageTestAction
    public void describe(PrintStream printStream) {
        printStream.println("consume-action:");
        printStream.println("  topic-partition = " + this.topicPartition);
        printStream.println("  fetch-offset = " + this.fetchOffset);
        printStream.println("  expected-record-count = " + this.expectedTotalCount);
        printStream.println("  expected-record-from-tiered-storage = " + this.expectedFromSecondTierCount);
    }
}
