package io.zeebe.engine.processor;

import io.zeebe.db.impl.DefaultColumnFamily;
import io.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.util.TestSnapshotStorage;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.clock.ControlledActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/engine/processor/AsyncSnapshotingTest.class */
public final class AsyncSnapshotingTest {
    private final TemporaryFolder tempFolderRule = new TemporaryFolder();
    private final AutoCloseableRule autoCloseableRule = new AutoCloseableRule();
    private final ControlledActorClock clock = new ControlledActorClock();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(this.clock);

    @Rule
    public final RuleChain chain = RuleChain.outerRule(this.autoCloseableRule).around(this.tempFolderRule).around(this.actorSchedulerRule);
    private StateSnapshotController snapshotController;
    private LogStream logStream;
    private AsyncSnapshotDirector asyncSnapshotDirector;
    private StreamProcessor mockStreamProcessor;
    private List<ActorCondition> conditionList;

    @Before
    public void setup() throws IOException {
        TestSnapshotStorage testSnapshotStorage = new TestSnapshotStorage(this.tempFolderRule.getRoot().toPath());
        this.snapshotController = new StateSnapshotController(ZeebeRocksDbFactory.newFactory(DefaultColumnFamily.class), testSnapshotStorage);
        this.snapshotController.openDb();
        this.autoCloseableRule.manage(this.snapshotController);
        this.autoCloseableRule.manage(testSnapshotStorage);
        this.snapshotController = (StateSnapshotController) Mockito.spy(this.snapshotController);
        this.logStream = (LogStream) Mockito.mock(LogStream.class);
        Mockito.when(this.logStream.getCommitPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        this.conditionList = new ArrayList();
        ((LogStream) Mockito.doAnswer(invocationOnMock -> {
            this.conditionList.add((ActorCondition) invocationOnMock.getArguments()[0]);
            return null;
        }).when(this.logStream)).registerOnCommitPositionUpdatedCondition((ActorCondition) ArgumentMatchers.any());
        ActorScheduler actorScheduler = this.actorSchedulerRule.get();
        createStreamProcessorControllerMock();
        createAsyncSnapshotDirector(actorScheduler);
    }

    private void setCommitPosition(long j) {
        Mockito.when(this.logStream.getCommitPositionAsync()).thenReturn(CompletableActorFuture.completed(Long.valueOf(j)));
        this.conditionList.forEach((v0) -> {
            v0.signal();
        });
    }

    private void createStreamProcessorControllerMock() {
        this.mockStreamProcessor = (StreamProcessor) Mockito.mock(StreamProcessor.class);
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(0L)).thenReturn(CompletableActorFuture.completed(25L)).thenReturn(CompletableActorFuture.completed(32L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(99L), new ActorFuture[]{CompletableActorFuture.completed(100L)});
    }

    private void createAsyncSnapshotDirector(ActorScheduler actorScheduler) {
        this.asyncSnapshotDirector = new AsyncSnapshotDirector(0, this.mockStreamProcessor, this.snapshotController, this.logStream, Duration.ofMinutes(1L));
        actorScheduler.submitActor(this.asyncSnapshotDirector).join();
    }

    @Test
    public void shouldStartToTakeSnapshot() {
        this.clock.addTime(Duration.ofMinutes(1L));
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(0);
    }

    @Test
    public void shouldValidSnapshotWhenCommitPositionGreaterEquals() {
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(100L);
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(1);
    }

    @Test
    public void shouldNotStopTakingSnapshotsAfterFailingReplication() {
        ((StateSnapshotController) Mockito.doThrow(new Throwable[]{new RuntimeException("expected")}).when(this.snapshotController)).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(99L);
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(100L);
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 2;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(2);
    }

    @Test
    public void shouldTakeSnapshotsOneByOne() {
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(99L);
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(100L);
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 2;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(2);
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastWritePosRetrievingFailed() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completedExceptionally(new RuntimeException("getLastWrittenPositionAsync fails")));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StreamProcessor) Mockito.verify(this.mockStreamProcessor, Mockito.timeout(5000L).times(1))).getLastWrittenPositionAsync();
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        this.clock.addTime(Duration.ofMinutes(1L));
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(1);
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenLastProcessedPosRetrievingFailed() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completedExceptionally(new RuntimeException("getLastProcessedPositionAsync fails")));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StreamProcessor) Mockito.verify(this.mockStreamProcessor, Mockito.timeout(5000L).times(1))).getLastProcessedPositionAsync();
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        this.clock.addTime(Duration.ofMinutes(1L));
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(1);
    }

    @Test
    public void shouldSucceedToTakeSnapshotOnNextIntervalWhenCommitPositionRetrievingFailed() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        Mockito.when(this.logStream.getCommitPositionAsync()).thenReturn(CompletableActorFuture.completedExceptionally(new RuntimeException("getCommitPositionAsync fails")));
        this.clock.addTime(Duration.ofMinutes(1L));
        ((LogStream) Mockito.verify(this.logStream, Mockito.timeout(5000L).times(1))).getCommitPositionAsync();
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(1);
    }

    @Test
    public void shouldTakeSnapshotEvenExistsAfterRestart() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() == 1;
        });
        this.asyncSnapshotDirector.closeAsync().join();
        createAsyncSnapshotDirector(this.actorSchedulerRule.get());
        this.clock.addTime(Duration.ofMinutes(1L));
        TestUtil.waitUntil(() -> {
            return this.snapshotController.getValidSnapshotsCount() >= 2;
        });
        Assertions.assertThat(this.snapshotController.getValidSnapshotsCount()).isEqualTo(2);
    }
}
