package io.zeebe.engine.processor;

import io.zeebe.db.impl.DefaultColumnFamily;
import io.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.state.Snapshot;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.logstreams.util.TestSnapshotStorage;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.clock.ControlledActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/zeebe/engine/processor/AsyncSnapshotingTest.class */
public final class AsyncSnapshotingTest {
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(2000);
    private final TemporaryFolder tempFolderRule = new TemporaryFolder();
    private final AutoCloseableRule autoCloseableRule = new AutoCloseableRule();
    private final ControlledActorClock clock = new ControlledActorClock();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(this.clock);

    @Rule
    public final RuleChain chain = RuleChain.outerRule(this.autoCloseableRule).around(this.tempFolderRule).around(this.actorSchedulerRule);
    private StateSnapshotController snapshotController;
    private LogStream logStream;
    private AsyncSnapshotDirector asyncSnapshotDirector;
    private StreamProcessor mockStreamProcessor;
    private List<ActorCondition> conditionList;

    @Before
    public void setup() throws IOException {
        TestSnapshotStorage testSnapshotStorage = new TestSnapshotStorage(this.tempFolderRule.getRoot().toPath());
        this.snapshotController = new StateSnapshotController(ZeebeRocksDbFactory.newFactory(DefaultColumnFamily.class), testSnapshotStorage);
        this.snapshotController.openDb();
        this.autoCloseableRule.manage(this.snapshotController);
        this.autoCloseableRule.manage(testSnapshotStorage);
        this.snapshotController = (StateSnapshotController) Mockito.spy(this.snapshotController);
        this.logStream = (LogStream) Mockito.mock(LogStream.class);
        Mockito.when(this.logStream.getCommitPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        this.conditionList = new ArrayList();
        ((LogStream) Mockito.doAnswer(invocationOnMock -> {
            this.conditionList.add((ActorCondition) invocationOnMock.getArguments()[0]);
            return null;
        }).when(this.logStream)).registerOnCommitPositionUpdatedCondition((ActorCondition) ArgumentMatchers.any());
        ActorScheduler actorScheduler = this.actorSchedulerRule.get();
        createStreamProcessorControllerMock();
        createAsyncSnapshotDirector(actorScheduler);
    }

    private void setCommitPosition(long j) {
        Mockito.when(this.logStream.getCommitPositionAsync()).thenReturn(CompletableActorFuture.completed(Long.valueOf(j)));
        this.conditionList.forEach(actorCondition -> {
            actorCondition.signal();
        });
    }

    private void createStreamProcessorControllerMock() {
        this.mockStreamProcessor = (StreamProcessor) Mockito.mock(StreamProcessor.class);
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L)).thenReturn(CompletableActorFuture.completed(32L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(99L), new ActorFuture[]{CompletableActorFuture.completed(100L)});
    }

    private void createAsyncSnapshotDirector(ActorScheduler actorScheduler) {
        this.asyncSnapshotDirector = new AsyncSnapshotDirector(0, this.mockStreamProcessor, this.snapshotController, this.logStream, Duration.ofMinutes(1L));
        actorScheduler.submitActor(this.asyncSnapshotDirector).join();
    }

    @Test
    public void shouldStartToTakeSnapshot() {
        this.clock.addTime(Duration.ofMinutes(1L));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.snapshotController, this.logStream});
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).registerOnCommitPositionUpdatedCondition((ActorCondition) ArgumentMatchers.any());
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).getCommitPositionAsync();
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).getCommitPositionAsync();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldValidSnapshotWhenCommitPositionGreaterEquals() throws Exception {
        this.clock.addTime(Duration.ofMinutes(1L));
        setCommitPosition(100L);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.snapshotController});
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT.times(1))).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotStopTakingSnapshotsAfterFailingReplication() throws Exception {
        ((StateSnapshotController) Mockito.doThrow(new Throwable[]{new RuntimeException("expected")}).when(this.snapshotController)).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(99L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(2))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(100L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot2 -> {
            return snapshot2.getPosition() == 32;
        }));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(2))).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotTakeMoreThenOneSnapshot() {
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        this.clock.addTime(Duration.ofMinutes(1L));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.snapshotController, this.logStream});
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).registerOnCommitPositionUpdatedCondition((ActorCondition) ArgumentMatchers.any());
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).getCommitPositionAsync();
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((LogStream) inOrder.verify(this.logStream, TIMEOUT.times(1))).getCommitPositionAsync();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldTakeSnapshotsOneByOne() throws Exception {
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(99L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(2))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(100L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot2 -> {
            return snapshot2.getPosition() == 32;
        }));
    }

    @Test
    public void shouldDeleteDataOnMaxSnapshots() throws IOException {
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(99L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(2))).takeTempSnapshot(ArgumentMatchers.anyLong());
        setCommitPosition(100L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot2 -> {
            return snapshot2.getPosition() == 32;
        }));
    }

    @Test
    public void shouldNotTakeSameSnapshotTwice() throws Exception {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.snapshotController, this.mockStreamProcessor});
        ((StreamProcessor) inOrder.verify(this.mockStreamProcessor, TIMEOUT)).getLastProcessedPositionAsync();
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StreamProcessor) inOrder.verify(this.mockStreamProcessor, TIMEOUT)).getLastProcessedPositionAsync();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldEnforceSnapshotCreation() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        setCommitPosition(100L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT)).getLastValidSnapshotPosition();
        this.asyncSnapshotDirector.enforceSnapshotCreation(100L, 27L, 26L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT)).takeSnapshot(26L);
    }

    @Test
    public void shouldNotEnforceSnapshotCreationIfExists() throws Exception {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(26L));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) Mockito.verify(this.snapshotController, TIMEOUT)).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        this.asyncSnapshotDirector.enforceSnapshotCreation(100L, 26L, 25L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, Mockito.never())).takeSnapshot(25L);
    }

    @Test
    public void shouldNotEnforceSnapshotCreationIfNotCommitted() {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(101L));
        setCommitPosition(100L);
        this.asyncSnapshotDirector.enforceSnapshotCreation(100L, 101L, 25L);
        ((StateSnapshotController) Mockito.verify(this.snapshotController, Mockito.never())).takeSnapshot(25L);
    }

    @Test
    public void shouldNotTakeSnapshotIfExistsAfterRestart() throws Exception {
        Mockito.when(this.mockStreamProcessor.getLastProcessedPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        Mockito.when(this.mockStreamProcessor.getLastWrittenPositionAsync()).thenReturn(CompletableActorFuture.completed(25L));
        setCommitPosition(100L);
        this.clock.addTime(Duration.ofMinutes(1L));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.snapshotController, this.mockStreamProcessor});
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).getLastValidSnapshotPosition();
        ((StreamProcessor) inOrder.verify(this.mockStreamProcessor, TIMEOUT)).getLastProcessedPositionAsync();
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == 25;
        }));
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).replicateLatestSnapshot((Consumer) ArgumentMatchers.any());
        createAsyncSnapshotDirector(this.actorSchedulerRule.get());
        this.clock.addTime(Duration.ofMinutes(1L));
        ((StateSnapshotController) inOrder.verify(this.snapshotController, TIMEOUT)).getLastValidSnapshotPosition();
        ((StreamProcessor) inOrder.verify(this.mockStreamProcessor, TIMEOUT.atLeastOnce())).getLastProcessedPositionAsync();
        ((StateSnapshotController) inOrder.verify(this.snapshotController, Mockito.never())).takeTempSnapshot(ArgumentMatchers.anyLong());
    }
}
