package io.zeebe.engine.processing.streamprocessor;

import io.zeebe.engine.processor.StreamProcessor;
import io.zeebe.engine.processor.workflow.multiinstance.MultiInstanceSubProcessTest;
import io.zeebe.engine.util.EngineRule;
import io.zeebe.engine.util.RecordToWrite;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.protocol.record.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.record.value.BpmnElementType;
import io.zeebe.protocol.record.value.JobRecordValue;
import io.zeebe.protocol.record.value.WorkflowInstanceRecordValue;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.zeebe.util.health.HealthStatus;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/ReprocessingIssueDetectionTest.class */
public final class ReprocessingIssueDetectionTest {

    @Rule
    public final EngineRule engine = EngineRule.singlePartition();

    @Rule
    public final RecordingExporterTestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();
    private long workflowInstanceKey;
    private Record<JobRecordValue> jobCreated;
    private Record<WorkflowInstanceRecordValue> serviceTaskActivated;
    private Record<WorkflowInstanceRecordValue> processActivated;

    @Before
    public void setup() {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("test");
        }).done()).deploy();
        this.workflowInstanceKey = this.engine.workflowInstance().ofBpmnProcessId("process").create();
        this.processActivated = (Record) RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.PROCESS).getFirst();
        this.jobCreated = (Record) RecordingExporter.jobRecords(JobIntent.CREATED).getFirst();
        this.serviceTaskActivated = (Record) RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_ACTIVATED).withElementType(BpmnElementType.SERVICE_TASK).getFirst();
        this.engine.stop();
    }

    @Test
    public void shouldDetectNoIssues() {
        this.engine.writeRecords(RecordToWrite.command().job(JobIntent.COMPLETE, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()), RecordToWrite.event().job(JobIntent.COMPLETED, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()).causedBy(0), RecordToWrite.event().workflowInstance(WorkflowInstanceIntent.ELEMENT_COMPLETING, (WorkflowInstanceRecordValue) this.serviceTaskActivated.getValue()).key(this.serviceTaskActivated.getKey()).causedBy(1));
        this.engine.startWithReprocessingDetection();
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords().withWorkflowInstanceKey(this.workflowInstanceKey).filterRootScope().limitToWorkflowInstanceCompleted()).extracting((v0) -> {
            return v0.getIntent();
        }).contains(new Intent[]{WorkflowInstanceIntent.ELEMENT_COMPLETED});
        StreamProcessor streamProcessor = this.engine.getStreamProcessor(1);
        Assertions.assertThat(streamProcessor.isFailed()).isFalse();
        Assertions.assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.HEALTHY);
    }

    @Test
    public void shouldDetectDifferentKey() {
        this.engine.writeRecords(RecordToWrite.command().job(JobIntent.COMPLETE, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()), RecordToWrite.event().job(JobIntent.COMPLETED, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()).causedBy(0), RecordToWrite.event().workflowInstance(WorkflowInstanceIntent.ELEMENT_COMPLETING, (WorkflowInstanceRecordValue) this.serviceTaskActivated.getValue()).key(123L).causedBy(1));
        this.engine.startWithReprocessingDetection();
        StreamProcessor streamProcessor = this.engine.getStreamProcessor(1);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(streamProcessor.isFailed()).isTrue();
            Assertions.assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.UNHEALTHY);
        });
    }

    @Test
    public void shouldDetectDifferentIntent() {
        this.engine.writeRecords(RecordToWrite.command().job(JobIntent.COMPLETE, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()), RecordToWrite.event().job(JobIntent.COMPLETED, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()).causedBy(0), RecordToWrite.event().workflowInstance(WorkflowInstanceIntent.ELEMENT_TERMINATING, (WorkflowInstanceRecordValue) this.serviceTaskActivated.getValue()).key(this.serviceTaskActivated.getKey()).causedBy(1));
        this.engine.startWithReprocessingDetection();
        StreamProcessor streamProcessor = this.engine.getStreamProcessor(1);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(streamProcessor.isFailed()).isTrue();
            Assertions.assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.UNHEALTHY);
        });
    }

    @Test
    public void shouldDetectMissingRecordOnLogStream() {
        this.engine.writeRecords(RecordToWrite.command().workflowInstance(WorkflowInstanceIntent.CANCEL, (WorkflowInstanceRecordValue) this.processActivated.getValue()).key(this.workflowInstanceKey), RecordToWrite.event().workflowInstance(WorkflowInstanceIntent.ELEMENT_TERMINATING, (WorkflowInstanceRecordValue) this.processActivated.getValue()).key(this.workflowInstanceKey).causedBy(0), RecordToWrite.command().job(JobIntent.COMPLETE, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()), RecordToWrite.event().job(JobIntent.COMPLETED, (JobRecordValue) this.jobCreated.getValue()).key(this.jobCreated.getKey()).causedBy(2));
        this.engine.startWithReprocessingDetection();
        StreamProcessor streamProcessor = this.engine.getStreamProcessor(1);
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(streamProcessor.isFailed()).isTrue();
            Assertions.assertThat(streamProcessor.getHealthStatus()).isEqualTo(HealthStatus.UNHEALTHY);
        });
    }
}
