package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.bpmn.ProcessInstanceStreamProcessorRule;
import io.zeebe.engine.processing.bpmn.multiinstance.MultiInstanceSubProcessTest;
import io.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest;
import io.zeebe.engine.util.EngineRule;
import io.zeebe.engine.util.client.PublishMessageClient;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.zeebe.protocol.record.intent.ProcessInstanceSubscriptionIntent;
import io.zeebe.protocol.record.value.BpmnElementType;
import io.zeebe.protocol.record.value.MessageRecordValue;
import io.zeebe.protocol.record.value.ProcessInstanceSubscriptionRecordValue;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.record.ProcessInstances;
import io.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/zeebe/engine/processing/message/MessageCorrelationTest.class */
public final class MessageCorrelationTest {
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance RECEIVE_TASK_PROCESS = Bpmn.createExecutableProcess("process").startEvent().receiveTask("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKeyExpression("key");
    }).endEvent().done();
    private static final BpmnModelInstance SINGLE_MESSAGE_PROCESS = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKeyExpression("key");
    }).endEvent().done();
    private static final BpmnModelInstance SINGLE_MESSAGE_PROCESS_WITH_FEEL_EXPRESSION_MESSAGE_NAME = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.nameExpression("\"message\"").zeebeCorrelationKeyExpression("key");
    }).endEvent().done();
    private static final BpmnModelInstance TWO_MESSAGES_PROCESS = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("message1").message(messageBuilder -> {
        messageBuilder.name("ping").zeebeCorrelationKeyExpression("key");
    }).intermediateCatchEvent("message2").message(messageBuilder2 -> {
        messageBuilder2.name("ping").zeebeCorrelationKeyExpression("key");
    }).done();
    private static final BpmnModelInstance BOUNDARY_EVENTS_PROCESS = Bpmn.createExecutableProcess("process").startEvent().receiveTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).message(messageBuilder -> {
        messageBuilder.name("taskMsg").zeebeCorrelationKeyExpression("key");
    }).boundaryEvent("msg1").message(messageBuilder2 -> {
        messageBuilder2.name("msg1").zeebeCorrelationKeyExpression("key");
    }).endEvent("msg1End").moveToActivity(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).boundaryEvent("msg2").message(messageBuilder3 -> {
        messageBuilder3.name("msg2").zeebeCorrelationKeyExpression("key");
    }).endEvent("msg2End").moveToActivity(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).endEvent("taskEnd").done();

    @Rule
    public final EngineRule engine = EngineRule.singlePartition();

    @Test
    public void shouldCorrelateMessageIfEnteredBefore() {
        String uuid = UUID.randomUUID().toString();
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CREATED).exists()).isTrue();
        this.engine.message().withName("message").withCorrelationKey("order-123").withTimeToLive(1000L).withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar")).withId(uuid).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(create, ((Record) RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-123\""), Assertions.entry(SkipFailingEventsTest.STREAM_NAME, "\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageIfPublishedBefore() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar")).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create(), ((Record) RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-123\""), Assertions.entry(SkipFailingEventsTest.STREAM_NAME, "\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageToMessageWithFeelExpressionNameIfPublishedBefore() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS_WITH_FEEL_EXPRESSION_MESSAGE_NAME).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar")).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create(), ((Record) RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-123\""), Assertions.entry(SkipFailingEventsTest.STREAM_NAME, "\"bar\"")});
    }

    @Test
    public void shouldCorrelateMessageIfCorrelationKeyIsANumber() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar")).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", Integer.valueOf(ProcessInstanceStreamProcessorRule.PROCESS_KEY)).create(), ((Record) RecordingExporter.processInstanceRecords().withElementType(BpmnElementType.PROCESS).withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "123"), Assertions.entry(SkipFailingEventsTest.STREAM_NAME, "\"bar\"")});
    }

    @Test
    public void shouldCorrelateFirstPublishedMessage() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        PublishMessageClient withCorrelationKey = this.engine.message().withName("message").withCorrelationKey("order-123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create(), ((Record) RecordingExporter.processInstanceRecords().withElementType(BpmnElementType.PROCESS).withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-123\""), Assertions.entry("nr", "1")});
    }

    @Test
    public void shouldCorrelateMessageWithZeroTTL() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CREATED).exists()).isTrue();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar")).withTimeToLive(0L).publish();
        Assertions.assertThat(((Record) RecordingExporter.processInstanceRecords().withElementId("receive-message").withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getValue().getProcessInstanceKey()).isEqualTo(create);
    }

    @Test
    public void shouldCorrelateMessageByCorrelationKey() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        long create2 = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-456").create();
        this.engine.message().withName("message").withCorrelationKey("order-123").withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        this.engine.message().withName("message").withCorrelationKey("order-456").withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        Assertions.assertThat(ProcessInstances.getCurrentVariables(create, ((Record) RecordingExporter.processInstanceRecords().withProcessInstanceKey(create).withElementType(BpmnElementType.INTERMEDIATE_CATCH_EVENT).withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-123\""), Assertions.entry("nr", "1")});
        Assertions.assertThat(ProcessInstances.getCurrentVariables(create2, ((Record) RecordingExporter.processInstanceRecords().withProcessInstanceKey(create2).withElementType(BpmnElementType.INTERMEDIATE_CATCH_EVENT).withIntent(ProcessInstanceIntent.ELEMENT_COMPLETED).getFirst()).getPosition())).containsOnly(new Map.Entry[]{Assertions.entry("key", "\"order-456\""), Assertions.entry("nr", "2")});
    }

    @Test
    public void shouldCorrelateMessageToDifferentProcesses() {
        this.engine.deployment().withXmlResource("wf-1.bpmn", SINGLE_MESSAGE_PROCESS).withXmlResource("wf-2.bpmn", Bpmn.createExecutableProcess("process-2").startEvent().intermediateCatchEvent("catch", intermediateCatchEventBuilder -> {
            intermediateCatchEventBuilder.message(messageBuilder -> {
                messageBuilder.name("message").zeebeCorrelationKeyExpression("key");
            });
        }).endEvent().done()).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        long create2 = this.engine.processInstance().ofBpmnProcessId("process-2").withVariable("key", "order-123").create();
        Record<MessageRecordValue> publish = this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CORRELATED).limit(2L)).extracting((v0) -> {
            return v0.getValue();
        }).extracting(processInstanceSubscriptionRecordValue -> {
            return Assertions.tuple(new Object[]{Long.valueOf(processInstanceSubscriptionRecordValue.getMessageKey()), Long.valueOf(processInstanceSubscriptionRecordValue.getProcessInstanceKey())});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(publish.getKey()), Long.valueOf(create)}), Assertions.tuple(new Object[]{Long.valueOf(publish.getKey()), Long.valueOf(create2)})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOncePerProcess() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        long create2 = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CORRELATED).limit(2L)).extracting((v0) -> {
            return v0.getValue();
        }).extracting(processInstanceSubscriptionRecordValue -> {
            return Assertions.tuple(new Object[]{Long.valueOf(processInstanceSubscriptionRecordValue.getMessageKey()), Long.valueOf(processInstanceSubscriptionRecordValue.getProcessInstanceKey())});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.engine.message().withName("message").withCorrelationKey("order-123").publish().getKey()), Long.valueOf(create)}), Assertions.tuple(new Object[]{Long.valueOf(this.engine.message().withName("message").withCorrelationKey("order-123").publish().getKey()), Long.valueOf(create2)})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOncePerProcessAcrossVersions() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        long create = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        this.engine.deployment().withXmlResource("wf_v2.bpmn", Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("catch", intermediateCatchEventBuilder -> {
            intermediateCatchEventBuilder.message(messageBuilder -> {
                messageBuilder.name("message").zeebeCorrelationKeyExpression("key");
            });
        }).endEvent().done()).deploy();
        long create2 = this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CORRELATED).limit(2L)).extracting((v0) -> {
            return v0.getValue();
        }).extracting(processInstanceSubscriptionRecordValue -> {
            return Assertions.tuple(new Object[]{Long.valueOf(processInstanceSubscriptionRecordValue.getMessageKey()), Long.valueOf(processInstanceSubscriptionRecordValue.getProcessInstanceKey())});
        }).contains(new Tuple[]{Assertions.tuple(new Object[]{Long.valueOf(this.engine.message().withName("message").withCorrelationKey("order-123").publish().getKey()), Long.valueOf(create)}), Assertions.tuple(new Object[]{Long.valueOf(this.engine.message().withName("message").withCorrelationKey("order-123").publish().getKey()), Long.valueOf(create2)})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfPublishedBefore() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        PublishMessageClient withCorrelationKey = this.engine.message().withName("ping").withCorrelationKey("123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).map(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), ProcessInstances.getCurrentVariables(record2.getValue().getProcessInstanceKey(), record2.getPosition()).get("nr")});
        }).collect(Collectors.toList())).contains(new Object[]{Assertions.tuple(new Object[]{"message1", "1"}), Assertions.tuple(new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceIfEnteredBefore() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CREATED).exists()).isTrue();
        PublishMessageClient withCorrelationKey = this.engine.message().withName("ping").withCorrelationKey("123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CREATED).limit(2L).count()).isEqualTo(2L);
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).map(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), ProcessInstances.getCurrentVariables(record2.getValue().getProcessInstanceKey(), record2.getPosition()).get("nr")});
        }).collect(Collectors.toList())).contains(new Object[]{Assertions.tuple(new Object[]{"message1", "1"}), Assertions.tuple(new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateMessageOnlyOnceToInstance() {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().parallelGateway().intermediateCatchEvent("message1").message(messageBuilder -> {
            messageBuilder.name("ping").zeebeCorrelationKeyExpression("key");
        }).moveToLastGateway().intermediateCatchEvent("message2").message(messageBuilder2 -> {
            messageBuilder2.name("ping").zeebeCorrelationKeyExpression("key");
        }).done()).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CREATED).limit(2L).count()).isEqualTo(2L);
        PublishMessageClient withCorrelationKey = this.engine.message().withName("ping").withCorrelationKey("123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).map(record2 -> {
            return (String) ProcessInstances.getCurrentVariables(record2.getValue().getProcessInstanceKey(), record2.getPosition()).get("nr");
        }).collect(Collectors.toList())).contains(new Object[]{"1", "2"});
    }

    @Test
    public void shouldCorrelateOnlyOneMessagePerCatchElement() {
        this.engine.deployment().withXmlResource(TWO_MESSAGES_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CREATED).exists()).isTrue();
        PublishMessageClient withCorrelationKey = this.engine.message().withName("ping").withCorrelationKey("123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 1)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack("nr", 2)).publish();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).filter(record -> {
            return record.getValue().getElementId().startsWith("message");
        }).limit(2L).map(record2 -> {
            return Assertions.tuple(new Object[]{record2.getValue().getElementId(), ProcessInstances.getCurrentVariables(record2.getValue().getProcessInstanceKey(), record2.getPosition()).get("nr")});
        }).collect(Collectors.toList())).contains(new Object[]{Assertions.tuple(new Object[]{"message1", "1"}), Assertions.tuple(new Object[]{"message2", "2"})});
    }

    @Test
    public void shouldCorrelateCorrectBoundaryEvent() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        awaitSubscriptionsOpened(3);
        this.engine.message().withName("msg1").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 1)).publish();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(record -> {
            return record.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"msg1End"}).doesNotContain(new String[]{"taskEnd", "msg2End"});
    }

    @Test
    public void shouldNotTriggerBoundaryEventIfReceiveTaskTriggeredFirst() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        awaitSubscriptionsOpened(3);
        this.engine.message().withName("taskMsg").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 1)).publish();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(record -> {
            return record.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"taskEnd"}).doesNotContain(new String[]{"msg1End", "msg2End"});
    }

    @Test
    public void shouldNotTriggerReceiveTaskIfBoundaryEventTriggeredFirst() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        awaitSubscriptionsOpened(3);
        this.engine.message().withName("msg2").withCorrelationKey("123").withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 1)).publish();
        Assertions.assertThat(RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted()).filteredOn(record -> {
            return record.getIntent() == ProcessInstanceIntent.ELEMENT_ACTIVATED;
        }).extracting((v0) -> {
            return v0.getValue();
        }).extracting((v0) -> {
            return v0.getElementId();
        }).contains(new String[]{"msg2End"}).doesNotContain(new String[]{"taskEnd", "msg1End"});
    }

    @Test
    public void testIntermediateMessageEventLifeCycle() {
        this.engine.deployment().withXmlResource(SINGLE_MESSAGE_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList())).filteredOn(record -> {
            return record.getValue().getElementId().equals("receive-message");
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{ProcessInstanceIntent.ELEMENT_ACTIVATING, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.EVENT_OCCURRED, ProcessInstanceIntent.ELEMENT_COMPLETING, ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testReceiveTaskLifeCycle() {
        this.engine.deployment().withXmlResource(RECEIVE_TASK_PROCESS).deploy();
        this.engine.message().withName("message").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList())).filteredOn(record -> {
            return record.getValue().getElementId().equals("receive-message");
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{ProcessInstanceIntent.ELEMENT_ACTIVATING, ProcessInstanceIntent.ELEMENT_ACTIVATED, ProcessInstanceIntent.EVENT_OCCURRED, ProcessInstanceIntent.ELEMENT_COMPLETING, ProcessInstanceIntent.ELEMENT_COMPLETED});
    }

    @Test
    public void testBoundaryMessageEventLifecycle() {
        this.engine.deployment().withXmlResource(BOUNDARY_EVENTS_PROCESS).deploy();
        this.engine.message().withName("msg1").withCorrelationKey("order-123").publish();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "order-123").create();
        Assertions.assertThat((List) RecordingExporter.processInstanceRecords().limitToProcessInstanceCompleted().collect(Collectors.toList())).extracting(record -> {
            return Assertions.tuple(new Object[]{record.getValue().getElementId(), record.getIntent()});
        }).containsSequence(new Tuple[]{Assertions.tuple(new Object[]{MultiInstanceSubProcessTest.TASK_ELEMENT_ID, ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple(new Object[]{MultiInstanceSubProcessTest.TASK_ELEMENT_ID, ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{MultiInstanceSubProcessTest.TASK_ELEMENT_ID, ProcessInstanceIntent.EVENT_OCCURRED}), Assertions.tuple(new Object[]{MultiInstanceSubProcessTest.TASK_ELEMENT_ID, ProcessInstanceIntent.ELEMENT_TERMINATING}), Assertions.tuple(new Object[]{MultiInstanceSubProcessTest.TASK_ELEMENT_ID, ProcessInstanceIntent.ELEMENT_TERMINATED}), Assertions.tuple(new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_ACTIVATING}), Assertions.tuple(new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_ACTIVATED}), Assertions.tuple(new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_COMPLETING}), Assertions.tuple(new Object[]{"msg1", ProcessInstanceIntent.ELEMENT_COMPLETED})});
    }

    @Test
    public void shouldCorrelateToNonInterruptingBoundaryEvent() {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("type");
        }).boundaryEvent("msg1").cancelActivity(false).message(messageBuilder -> {
            messageBuilder.name("msg1").zeebeCorrelationKeyExpression("key");
        }).endEvent("msg1End").moveToActivity(MultiInstanceSubProcessTest.TASK_ELEMENT_ID).endEvent("taskEnd").done()).deploy();
        this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create();
        PublishMessageClient withCorrelationKey = this.engine.message().withName("msg1").withCorrelationKey("123");
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 0)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 1)).publish();
        withCorrelationKey.withVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, 2)).publish();
        Assertions.assertThat(awaitMessagesCorrelated(3)).hasSize(3);
        Assertions.assertThat(RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.BOUNDARY_EVENT).limit(3L).count()).isEqualTo(3L);
        Assertions.assertThat(RecordingExporter.variableRecords().withName(SkipFailingEventsTest.STREAM_NAME).limit(3L)).extracting(record -> {
            return record.getValue().getValue();
        }).containsExactly(new String[]{"0", "1", "2"});
    }

    @Test
    public void shouldCorrelateMessageAgainAfterRejection() {
        this.engine.message().withName("a").withCorrelationKey("123").publish();
        this.engine.message().withName("b").withCorrelationKey("123").publish();
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("process").startEvent().eventBasedGateway("split").intermediateCatchEvent("element-a", intermediateCatchEventBuilder -> {
            intermediateCatchEventBuilder.message(messageBuilder -> {
                messageBuilder.name("a").zeebeCorrelationKeyExpression("key");
            });
        }).intermediateCatchEvent("element-ab", intermediateCatchEventBuilder2 -> {
            intermediateCatchEventBuilder2.message(messageBuilder -> {
                messageBuilder.name("b").zeebeCorrelationKeyExpression("key");
            });
        }).exclusiveGateway("merge").endEvent().moveToNode("split").intermediateCatchEvent("element-b", intermediateCatchEventBuilder3 -> {
            intermediateCatchEventBuilder3.message(messageBuilder -> {
                messageBuilder.name("b").zeebeCorrelationKeyExpression("key");
            });
        }).intermediateCatchEvent("element-ba", intermediateCatchEventBuilder4 -> {
            intermediateCatchEventBuilder4.message(messageBuilder -> {
                messageBuilder.name("a").zeebeCorrelationKeyExpression("key");
            });
        }).connectTo("merge").done()).deploy();
        Assertions.assertThat(RecordingExporter.records().limitToProcessInstance(this.engine.processInstance().ofBpmnProcessId("process").withVariable("key", "123").create())).extracting(new Function[]{(v0) -> {
            return v0.getRecordType();
        }, (v0) -> {
            return v0.getIntent();
        }}).containsSubsequence(new Tuple[]{Assertions.tuple(new Object[]{RecordType.COMMAND_REJECTION, ProcessInstanceSubscriptionIntent.CORRELATE}), Assertions.tuple(new Object[]{RecordType.COMMAND, MessageSubscriptionIntent.REJECT}), Assertions.tuple(new Object[]{RecordType.EVENT, MessageSubscriptionIntent.REJECTED}), Assertions.tuple(new Object[]{RecordType.COMMAND, ProcessInstanceSubscriptionIntent.CORRELATE})});
        Assertions.assertThat(RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CORRELATED).limit(2L)).extracting(record -> {
            return record.getValue().getMessageName();
        }).containsExactlyInAnyOrder(new String[]{"a", "b"});
        RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED).withElementType(BpmnElementType.PROCESS).await();
    }

    @Test
    public void shouldNotCorrelateMessageAfterTTL() {
        this.engine.deployment().withXmlResource(Bpmn.createExecutableProcess("wf").startEvent().serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType("test");
        }).intermediateCatchEvent("catch", intermediateCatchEventBuilder -> {
            intermediateCatchEventBuilder.message(messageBuilder -> {
                messageBuilder.name("a").zeebeCorrelationKeyExpression("key");
            });
        }).done()).deploy();
        this.engine.processInstance().ofBpmnProcessId("wf").withVariable("key", "key-1").create();
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 1)).withTimeToLive(Duration.ZERO).publish();
        Duration ofSeconds = Duration.ofSeconds(10L);
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 2)).withTimeToLive(ofSeconds).publish();
        this.engine.message().withName("a").withCorrelationKey("key-1").withVariables(Map.of("x", 3)).withTimeToLive(ofSeconds.multipliedBy(2L)).publish();
        Record record = (Record) RecordingExporter.jobRecords(JobIntent.CREATED).getFirst();
        this.engine.getClock().addTime(ofSeconds);
        this.engine.job().withKey(record.getKey()).complete();
        io.zeebe.protocol.record.Assertions.assertThat(((Record) RecordingExporter.variableRecords().withName("x").getFirst()).getValue()).hasValue("3");
    }

    private List<Record<ProcessInstanceSubscriptionRecordValue>> awaitMessagesCorrelated(int i) {
        return RecordingExporter.processInstanceSubscriptionRecords(ProcessInstanceSubscriptionIntent.CORRELATED).limit(i).asList();
    }

    private List<Record<ProcessInstanceSubscriptionRecordValue>> awaitSubscriptionsOpened(int i) {
        return RecordingExporter.processInstanceSubscriptionRecords().withIntent(ProcessInstanceSubscriptionIntent.CREATED).limit(i).asList();
    }
}
