package io.zeebe.engine.processor;

import io.zeebe.engine.state.ZeebeState;
import io.zeebe.engine.util.StreamProcessorRule;
import io.zeebe.logstreams.state.Snapshot;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.exception.RecoverableException;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/zeebe/engine/processor/StreamProcessorTest.class */
public final class StreamProcessorTest {
    private static final Duration SNAPSHOT_INTERVAL = Duration.ofMinutes(1);
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    private ActorControl processingContextActor;

    @Test
    public void shouldCallStreamProcessorLifecycle() throws Exception {
        StreamProcessorLifecycleAware streamProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.withListener(streamProcessorLifecycleAware).withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.engine.processor.StreamProcessorTest.1
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder(new Object[]{streamProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, Mockito.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, Mockito.times(1))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallRecordProcessorLifecycle() throws Exception {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.engine.processor.StreamProcessorTest.2
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, Mockito.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, Mockito.times(1))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldProcessRecord() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
        Assertions.assertThat(this.streamProcessorRule.getZeebeState().getLastSuccessfulProcessedRecordPosition()).isEqualTo(writeWorkflowInstanceEvent);
    }

    @Test
    public void shouldRetryProcessingRecordOnRecoverableException() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((TypedRecordProcessor) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger.getAndIncrement() == 0) {
                throw new RecoverableException("recoverable");
            }
            return null;
        }).when(typedRecordProcessor)).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldIgnoreRecordWhenNoProcessorExistForThisType() {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        long writeWorkflowInstanceEvent2 = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent2), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldWriteFollowUpEvent() {
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.3
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecord.getValue());
                }
            });
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        Record record = (Record) ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).findAny();
        }).until((v0) -> {
            return v0.isPresent();
        })).get();
        Assertions.assertThat(record).isNotNull();
        Assertions.assertThat(record.getSourceRecordPosition()).isEqualTo(writeWorkflowInstanceEvent);
        Assertions.assertThat((Long) startTypedStreamProcessor.getLastWrittenPositionAsync().join()).isEqualTo(record.getPosition());
        Assertions.assertThat((Long) startTypedStreamProcessor.getLastProcessedPositionAsync().join()).isEqualTo(writeWorkflowInstanceEvent);
    }

    @Test
    public void shouldExecuteSideEffects() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.4
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    CountDownLatch countDownLatch2 = countDownLatch;
                    consumer.accept(() -> {
                        countDownLatch2.countDown();
                        return true;
                    });
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
    }

    @Test
    public void shouldRepeatExecuteSideEffects() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.5
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    CountDownLatch countDownLatch2 = countDownLatch;
                    consumer.accept(() -> {
                        countDownLatch2.countDown();
                        return countDownLatch2.getCount() < 1;
                    });
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
    }

    @Test
    public void shouldSkipSideEffectsOnException() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.6
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    consumer.accept(() -> {
                        throw new RuntimeException("expected");
                    });
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
    }

    @Test
    public void shouldNotUpdateStateOnExceptionInProcessing() throws Exception {
        AtomicLong atomicLong = new AtomicLong(-1L);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor(processingContext -> {
            this.processingContextActor = processingContext.getActor();
            final ZeebeState zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(zeebeState.getKeyGenerator()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.8
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    atomicLong.set(zeebeState.getKeyGenerator().nextKey());
                    countDownLatch.countDown();
                    throw new RuntimeException("expected");
                }
            }).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.7
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 2);
        countDownLatch.await();
        this.processingContextActor.call(() -> {
            Assertions.assertThat(atomicLong.get()).isEqualTo(this.streamProcessorRule.getZeebeState().getKeyGenerator().nextKey());
        }).join();
    }

    @Test
    public void shouldUpdateStateAfterProcessing() throws Exception {
        AtomicLong atomicLong = new AtomicLong(-1L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor(processingContext -> {
            this.processingContextActor = processingContext.getActor();
            final ZeebeState zeebeState = processingContext.getZeebeState();
            return TypedRecordProcessors.processors(zeebeState.getKeyGenerator()).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.10
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    atomicLong.set(zeebeState.getKeyGenerator().nextKey());
                }
            }).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.9
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 2);
        countDownLatch.await();
        this.processingContextActor.call(() -> {
            long nextKey = this.streamProcessorRule.getZeebeState().getKeyGenerator().nextKey();
            Assertions.assertThat(atomicLong.get()).isGreaterThan(0L);
            Assertions.assertThat(atomicLong.get()).isLessThan(nextKey);
        }).join();
    }

    @Test
    public void shouldCreateSnapshot() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.11
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        TestUtil.waitUntil(() -> {
            return ((Long) startTypedStreamProcessor.getLastProcessedPositionAsync().join()).longValue() > -1;
        });
        this.streamProcessorRule.getClock().addTime(SNAPSHOT_INTERVAL);
        StateSnapshotController stateSnapshotController = this.streamProcessorRule.getStateSnapshotController();
        InOrder inOrder = Mockito.inOrder(new Object[]{stateSnapshotController});
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).openDb();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == writeWorkflowInstanceEvent;
        }));
    }

    @Test
    public void shouldCreateSnapshotOnClose() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.12
                public void processRecord(TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        StateSnapshotController stateSnapshotController = this.streamProcessorRule.getStateSnapshotController();
        InOrder inOrder = Mockito.inOrder(new Object[]{stateSnapshotController});
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).openDb();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).getLastValidSnapshotPosition();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).takeSnapshot(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldNotCreateSnapshotWhenNoEventProcessed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.13
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        this.streamProcessorRule.closeStreamProcessor();
        StateSnapshotController stateSnapshotController = this.streamProcessorRule.getStateSnapshotController();
        InOrder inOrder = Mockito.inOrder(new Object[]{stateSnapshotController});
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).openDb();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, Mockito.never())).takeSnapshot(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldNotCreateSnapshotsIfNoProcessorProcessEvent() throws Exception {
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors;
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.getClock().addTime(SNAPSHOT_INTERVAL);
        StateSnapshotController stateSnapshotController = this.streamProcessorRule.getStateSnapshotController();
        InOrder inOrder = Mockito.inOrder(new Object[]{stateSnapshotController});
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).openDb();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).getLastValidSnapshotPosition();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, Mockito.never())).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, Mockito.never())).commitSnapshot((Snapshot) ArgumentMatchers.argThat(snapshot -> {
            return snapshot.getPosition() == writeWorkflowInstanceEvent;
        }));
    }

    @Test
    public void shouldNotCreateSnapshotsIfNewEventExist() throws Exception {
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        this.streamProcessorRule.getClock().addTime(SNAPSHOT_INTERVAL);
        StateSnapshotController stateSnapshotController = this.streamProcessorRule.getStateSnapshotController();
        InOrder inOrder = Mockito.inOrder(new Object[]{stateSnapshotController});
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).openDb();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, TIMEOUT.times(1))).getLastValidSnapshotPosition();
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, Mockito.never())).takeTempSnapshot(ArgumentMatchers.anyLong());
        ((StateSnapshotController) inOrder.verify(stateSnapshotController, Mockito.never())).commitSnapshot((Snapshot) ArgumentMatchers.any());
    }

    @Test
    public void shouldWriteResponse() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.14
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    typedResponseWriter.writeEventOnCommand(3L, WorkflowInstanceIntent.ELEMENT_COMPLETING, typedRecord.getValue(), typedRecord);
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        CommandResponseWriter commandResponseWriter = this.streamProcessorRule.getCommandResponseWriter();
        InOrder inOrder = Mockito.inOrder(new Object[]{commandResponseWriter});
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).intent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.WORKFLOW_INSTANCE);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldNotWriteResponseOnFailedEventProcessing() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.15
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    typedResponseWriter.writeEventOnCommand(3L, WorkflowInstanceIntent.ELEMENT_COMPLETING, typedRecord.getValue(), typedRecord);
                    countDownLatch.countDown();
                    throw new RuntimeException("expected");
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        CommandResponseWriter commandResponseWriter = this.streamProcessorRule.getCommandResponseWriter();
        InOrder inOrder = Mockito.inOrder(new Object[]{commandResponseWriter});
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).key(3L);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).intent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).recordType(RecordType.EVENT);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, TIMEOUT.times(1))).valueType(ValueType.WORKFLOW_INSTANCE);
        ((CommandResponseWriter) inOrder.verify(commandResponseWriter, Mockito.never())).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldInvokeOnProcessedListener() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processor.StreamProcessorTest.16
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        ((Consumer) Mockito.verify(this.streamProcessorRule.getProcessedListener(), Mockito.timeout(1000L).times(1))).accept((TypedRecord) ArgumentMatchers.any());
    }
}
