package io.zeebe.engine.processing.streamprocessor;

import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.util.StreamProcessorRule;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.TestUtil;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationWithTimeout;

/* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.class */
public final class StreamProcessorReprocessingTest {
    private static final long TIMEOUT_MILLIS = 2000;
    private static final VerificationWithTimeout TIMEOUT = Mockito.timeout(TIMEOUT_MILLIS);

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();

    @Test
    public void shouldCallRecordProcessorLifecycle() {
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEventWithSource = this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, writeWorkflowInstanceEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecordProcessor);
        });
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEventWithSource), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEventWithSource), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldStopProcessingWhenPaused() throws Exception {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.1
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            });
        }).pauseProcessing();
        Assertions.assertThat(countDownLatch.await(15L, TimeUnit.SECONDS)).isTrue();
        Mockito.clearInvocations(new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 51966);
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord((TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(0))).processRecord((TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any());
    }

    @Test
    public void shouldContinueToProcessWhenResumed() throws Exception {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.2
                public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
                    countDownLatch.countDown();
                }
            });
        });
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        Assertions.assertThat(countDownLatch.await(15L, TimeUnit.SECONDS)).isTrue();
        Mockito.clearInvocations(new TypedRecordProcessor[]{typedRecordProcessor});
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 51966);
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
    }

    @Test
    public void shouldCallOnPausedAfterOnRecovered() {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        StreamProcessorLifecycleAware streamProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.withListener(streamProcessorLifecycleAware);
        });
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder(new Object[]{streamProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldCallOnPausedBeforeOnResumedNoMatterWhenResumedWasCalled() {
        IntStream.range(0, 5000).forEach(i -> {
            this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, i);
        });
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1));
        Awaitility.await().until(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        }, (v0) -> {
            return v0.exists();
        });
        StreamProcessorLifecycleAware streamProcessorLifecycleAware = (StreamProcessorLifecycleAware) Mockito.mock(StreamProcessorLifecycleAware.class);
        StreamProcessor startTypedStreamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.withListener(streamProcessorLifecycleAware);
        });
        startTypedStreamProcessor.resumeProcessing();
        startTypedStreamProcessor.pauseProcessing();
        startTypedStreamProcessor.resumeProcessing();
        InOrder inOrder = Mockito.inOrder(new Object[]{streamProcessorLifecycleAware});
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onPaused();
        ((StreamProcessorLifecycleAware) inOrder.verify(streamProcessorLifecycleAware, TIMEOUT.times(1))).onResumed();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldReprocessUntilLastSource() {
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEvent2 = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEvent3 = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEventWithSource = this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, writeWorkflowInstanceEvent3);
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecordProcessor);
        });
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEventWithSource), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent2), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent3), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEventWithSource), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotReprocessWithoutSourcePosition() {
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEvent2 = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1);
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecordProcessor);
        });
        ((TypedRecordProcessor) Mockito.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent2), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        this.streamProcessorRule.closeStreamProcessor();
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent2), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).onClose();
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldRetryProcessingRecordOnException() {
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, writeWorkflowInstanceEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((TypedRecordProcessor) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger.getAndIncrement() == 0) {
                throw new RuntimeException("recoverable");
            }
            return null;
        }).when(typedRecordProcessor)).processRecord(ArgumentMatchers.anyLong(), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(2))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldIgnoreRecordWhenNoProcessorExistForThisType() {
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1);
        long writeWorkflowInstanceEventWithSource = this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1, writeWorkflowInstanceEvent);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor);
        });
        InOrder inOrder = Mockito.inOrder(new Object[]{typedRecordProcessor});
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, Mockito.never())).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEvent), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).onRecovered((ReadonlyProcessingContext) ArgumentMatchers.any());
        ((TypedRecordProcessor) inOrder.verify(typedRecordProcessor, TIMEOUT.times(1))).processRecord(ArgumentMatchers.eq(writeWorkflowInstanceEventWithSource), (TypedRecord) ArgumentMatchers.any(), (TypedResponseWriter) ArgumentMatchers.any(), (TypedStreamWriter) ArgumentMatchers.any(), (Consumer) ArgumentMatchers.any());
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotWriteFollowUpEvent() throws Exception {
        this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1));
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.4
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecord.getValue());
                }
            }).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.3
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await();
        Assertions.assertThat(this.streamProcessorRule.events().onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).count()).isEqualTo(1L);
    }

    @Test
    public void shouldStartAfterLastProcessedEventInSnapshot() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.5
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                }
            });
        }, typedRecord -> {
            countDownLatch.countDown();
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch.await();
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors2, readonlyProcessingContext2) -> {
            return typedRecordProcessors2.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.6
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord2, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    arrayList.add(Long.valueOf(j));
                    countDownLatch2.countDown();
                }
            });
        });
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        countDownLatch2.await();
        Assertions.assertThat(arrayList).containsExactly(new Long[]{Long.valueOf(writeWorkflowInstanceEvent)});
    }

    @Test
    public void shouldNotReprocessEventAtLastProcessedEvent() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.7
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                }
            });
        }, typedRecord -> {
            countDownLatch.countDown();
        });
        this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEvent = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        countDownLatch.await();
        this.streamProcessorRule.snapshot();
        this.streamProcessorRule.closeStreamProcessor();
        long writeWorkflowInstanceEvent2 = this.streamProcessorRule.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1);
        long writeWorkflowInstanceEventWithSource = this.streamProcessorRule.writeWorkflowInstanceEventWithSource(WorkflowInstanceIntent.ELEMENT_ACTIVATING, 1, writeWorkflowInstanceEvent2);
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors2, readonlyProcessingContext2) -> {
            return typedRecordProcessors2.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorReprocessingTest.8
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord2, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    arrayList.add(Long.valueOf(j));
                    countDownLatch2.countDown();
                }
            });
        });
        countDownLatch2.await();
        Assertions.assertThat(arrayList).doesNotContain(new Long[]{Long.valueOf(writeWorkflowInstanceEvent)});
        Assertions.assertThat(arrayList).endsWith(Long.valueOf(writeWorkflowInstanceEvent2), new Long[]{Long.valueOf(writeWorkflowInstanceEventWithSource)});
    }
}
