package io.zeebe.engine.processor;

import io.zeebe.engine.state.DefaultZeebeDbFactory;
import io.zeebe.engine.util.ListLogStorage;
import io.zeebe.engine.util.RecordStream;
import io.zeebe.engine.util.StreamProcessingComposite;
import io.zeebe.engine.util.TestStreams;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.sched.clock.ControlledActorClock;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.Objects;
import org.agrona.CloseHelper;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/engine/processor/StreamProcessorInconsistentPositionTest.class */
public final class StreamProcessorInconsistentPositionTest {
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ControlledActorClock clock = new ControlledActorClock();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(this.clock);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.closeables);
    private StreamProcessingComposite firstStreamProcessorComposite;
    private StreamProcessingComposite secondStreamProcessorComposite;
    private TestStreams testStreams;

    @Before
    public void setup() {
        this.testStreams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        ListLogStorage listLogStorage = new ListLogStorage();
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(1), 1, listLogStorage);
        this.testStreams.createLogStream(StreamProcessingComposite.getLogName(2), 2, listLogStorage);
        this.firstStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 1, DefaultZeebeDbFactory.DEFAULT_DB_FACTORY);
        this.secondStreamProcessorComposite = new StreamProcessingComposite(this.testStreams, 2, DefaultZeebeDbFactory.DEFAULT_DB_FACTORY);
    }

    @After
    public void tearDown() {
        CloseHelper.quietClose(() -> {
            this.testStreams.closeProcessor(StreamProcessingComposite.getLogName(1));
        });
    }

    @Test
    public void shouldNotStartOnInconsistentLog() {
        long writeWorkflowInstanceEvent = this.firstStreamProcessorComposite.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATING);
        long writeWorkflowInstanceEvent2 = this.firstStreamProcessorComposite.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        TestUtil.waitUntil(() -> {
            return new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(1))).onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        long writeWorkflowInstanceEvent3 = this.secondStreamProcessorComposite.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETING);
        long writeWorkflowInstanceEvent4 = this.secondStreamProcessorComposite.writeWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
        TestUtil.waitUntil(() -> {
            return new RecordStream(this.testStreams.events(StreamProcessingComposite.getLogName(2))).onlyWorkflowInstanceRecords().withIntent(WorkflowInstanceIntent.ELEMENT_COMPLETED).exists();
        });
        Assertions.assertThat(writeWorkflowInstanceEvent).isEqualTo(writeWorkflowInstanceEvent3);
        Assertions.assertThat(writeWorkflowInstanceEvent2).isEqualTo(writeWorkflowInstanceEvent4);
        TypedRecordProcessor typedRecordProcessor = (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class);
        StreamProcessor startTypedStreamProcessor = this.firstStreamProcessorComposite.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATING, typedRecordProcessor).onEvent(ValueType.WORKFLOW_INSTANCE, WorkflowInstanceIntent.ELEMENT_ACTIVATED, typedRecordProcessor);
        });
        Objects.requireNonNull(startTypedStreamProcessor);
        TestUtil.waitUntil(startTypedStreamProcessor::isFailed);
        Assertions.assertThat(startTypedStreamProcessor.isFailed()).isTrue();
    }
}
