package io.zeebe.broker.logstreams;

import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.snapshot.PendingSnapshot;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalReader;
import io.atomix.utils.time.WallClockTimestamp;
import io.zeebe.broker.clustering.atomix.storage.snapshot.AtomixRecordEntrySupplierImpl;
import io.zeebe.broker.clustering.atomix.storage.snapshot.AtomixSnapshotStorage;
import io.zeebe.broker.clustering.atomix.storage.snapshot.DbSnapshotStore;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.logstreams.state.SnapshotMetrics;
import io.zeebe.logstreams.storage.atomix.AtomixLogStorageReader;
import io.zeebe.logstreams.storage.atomix.ZeebeIndexAdapter;
import io.zeebe.logstreams.util.AtomixLogStorageRule;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/zeebe/broker/logstreams/AtomixLogDeletionServiceTest.class */
public final class AtomixLogDeletionServiceTest {
    private final ActorSchedulerRule actorScheduler = new ActorSchedulerRule();
    private final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final AtomixLogStorageRule logStorageRule = new AtomixLogStorageRule(this.temporaryFolder, PARTITION_ID, builder -> {
        return builder(builder, this.temporaryFolder);
    });

    @Rule
    public final RuleChain chain = RuleChain.outerRule(this.temporaryFolder).around(this.actorScheduler).around(this.logStorageRule);
    private AtomixSnapshotStorage snapshotStorage;
    private AtomixLogStorageReader storageReader;
    private LogDeletionService deletionService;
    private Compactor compactor;
    private static final int PARTITION_ID = 1;
    private static final ByteBuffer DATA = ByteBuffer.allocate(4).putInt(0, PARTITION_ID);

    /* loaded from: input_file:io/zeebe/broker/logstreams/AtomixLogDeletionServiceTest$Compactor.class */
    private final class Compactor implements LogCompactor {
        private final Map<Long, CompletableFuture<Void>> compactions = new ConcurrentHashMap();

        private Compactor() {
        }

        private void awaitCompaction(long j, Duration duration) {
            this.compactions.computeIfAbsent(Long.valueOf(j), l -> {
                return new CompletableFuture();
            }).orTimeout(duration.toNanos(), TimeUnit.NANOSECONDS).join();
        }

        public CompletableFuture<Void> compactLog(long j) {
            CompletableFuture<Void> compute = this.compactions.compute(Long.valueOf(j), (l, completableFuture) -> {
                return (completableFuture == null || completableFuture.isDone()) ? new CompletableFuture() : completableFuture;
            });
            AtomixLogDeletionServiceTest.this.logStorageRule.compact(j).whenComplete((r4, th) -> {
                if (th != null) {
                    compute.completeExceptionally(th);
                } else {
                    compute.complete(r4);
                }
            });
            return compute;
        }
    }

    @Before
    public void setUp() throws IOException {
        Path path = this.temporaryFolder.newFolder().toPath();
        Path path2 = this.temporaryFolder.newFolder().toPath();
        this.storageReader = new AtomixLogStorageReader(ZeebeIndexAdapter.ofDensity(5), this.logStorageRule.getRaftLog().openReader(-1L, JournalReader.Mode.COMMITS));
        this.snapshotStorage = new AtomixSnapshotStorage(path, path2, this.logStorageRule.getSnapshotStore(), new AtomixRecordEntrySupplierImpl(this.storageReader), new SnapshotMetrics(PARTITION_ID));
        this.compactor = new Compactor();
        this.deletionService = new LogDeletionService(0, PARTITION_ID, this.compactor, this.snapshotStorage);
        this.actorScheduler.submitActor(this.deletionService).join();
    }

    @After
    public void tearDown() {
        this.storageReader.close();
        this.snapshotStorage.close();
        this.deletionService.close();
    }

    @Test
    public void shouldDeleteUpToCompactionBound() {
        RaftLogReader openReader = this.logStorageRule.getRaftLog().openReader(-1L);
        this.logStorageRule.appendEntry(1L, 1L, DATA).index();
        this.logStorageRule.appendEntry(2L, 2L, DATA).index();
        this.logStorageRule.appendEntry(3L, 3L, DATA).index();
        createSnapshot(2L);
        this.compactor.awaitCompaction(2L, Duration.ofSeconds(5L));
        openReader.reset();
        Assertions.assertThat(readAllEntries(openReader)).isNotEmpty().hasSize(2).extracting((v0) -> {
            return v0.index();
        }).containsExactly(new Long[]{2L, 3L});
    }

    @Test
    public void shouldNotDeleteOnLowerCompactionBound() {
        RaftLogReader openReader = this.logStorageRule.getRaftLog().openReader(-1L);
        this.logStorageRule.appendEntry(1L, 1L, DATA).index();
        this.logStorageRule.appendEntry(2L, 2L, DATA).index();
        this.logStorageRule.appendEntry(3L, 3L, DATA).index();
        createSnapshot(0L);
        this.compactor.awaitCompaction(0L, Duration.ofSeconds(5L));
        openReader.reset();
        Assertions.assertThat(readAllEntries(openReader)).isNotEmpty().hasSize(3).extracting((v0) -> {
            return v0.index();
        }).containsExactly(new Long[]{1L, 2L, 3L});
    }

    @Test
    public void shouldDeleteLowerEntriesEvenIfIndexNotFound() {
        RaftLogReader openReader = this.logStorageRule.getRaftLog().openReader(-1L);
        this.logStorageRule.appendEntry(1L, 1L, DATA).index();
        this.logStorageRule.appendEntry(2L, 2L, DATA).index();
        this.logStorageRule.appendEntry(3L, 3L, DATA).index();
        createSnapshot(5L);
        this.compactor.awaitCompaction(5L, Duration.ofSeconds(5L));
        openReader.reset();
        Assertions.assertThat(readAllEntries(openReader)).isNotEmpty().hasSize(PARTITION_ID).extracting((v0) -> {
            return v0.index();
        }).containsExactly(new Long[]{3L});
    }

    private void createSnapshot(long j) {
        PendingSnapshot newPendingSnapshot = this.logStorageRule.getSnapshotStore().newPendingSnapshot(j, 0L, WallClockTimestamp.from(System.currentTimeMillis()));
        newPendingSnapshot.write(ByteBuffer.wrap("foo".getBytes()), ByteBuffer.wrap(TestJarExporter.FOO.getBytes()));
        newPendingSnapshot.commit();
    }

    private static RaftStorage.Builder builder(RaftStorage.Builder builder, TemporaryFolder temporaryFolder) {
        try {
            return builder.withMaxSegmentSize(96).withSnapshotStore(new DbSnapshotStore(temporaryFolder.newFolder("runtime").toPath(), temporaryFolder.newFolder("snapshots").toPath(), new ConcurrentSkipListMap()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private List<Indexed<?>> readAllEntries(RaftLogReader raftLogReader) {
        ArrayList arrayList = new ArrayList();
        while (raftLogReader.hasNext()) {
            arrayList.add(raftLogReader.next());
        }
        return arrayList;
    }
}
