package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.common.EventTriggerBehavior;
import io.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processing.streamprocessor.SkipFailingEventsTest;
import io.zeebe.engine.util.StreamProcessorRule;
import io.zeebe.engine.util.TypedRecordStream;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.MessageIntent;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/zeebe/engine/processing/message/MessageStreamProcessorTest.class */
public final class MessageStreamProcessorTest {

    @Rule
    public final StreamProcessorRule rule = new StreamProcessorRule();

    @Mock
    private SubscriptionCommandSender mockSubscriptionCommandSender;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.openProcessMessageSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.correlateProcessMessageSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockSubscriptionCommandSender.closeProcessMessageSubscription(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class)))).thenReturn(true);
        this.rule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            MessageEventProcessors.addMessageProcessors((EventTriggerBehavior) Mockito.mock(EventTriggerBehavior.class), typedRecordProcessors, readonlyProcessingContext.getZeebeState(), this.mockSubscriptionCommandSender, readonlyProcessingContext.getWriters());
            return typedRecordProcessors;
        });
    }

    @Test
    public void shouldRejectDuplicatedOpenMessageSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.CREATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.INVALID_STATE);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).openProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterPublishedMessage() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).exists();
        });
        this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateProcessMessageSubscription(messageSubscription.getProcessInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getBpmnProcessIdBuffer(), messageSubscription.getMessageNameBuffer(), ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey(), message.getVariablesBuffer(), messageSubscription.getCorrelationKeyBuffer());
    }

    @Test
    public void shouldRetryToCorrelateMessageSubscriptionAfterOpenedSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.getClock().addTime(MessageObserver.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageObserver.SUBSCRIPTION_TIMEOUT));
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).correlateProcessMessageSubscription(messageSubscription.getProcessInstanceKey(), messageSubscription.getElementInstanceKey(), messageSubscription.getBpmnProcessIdBuffer(), messageSubscription.getMessageNameBuffer(), ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey(), message.getVariablesBuffer(), messageSubscription.getCorrelationKeyBuffer());
    }

    @Test
    public void shouldRejectCorrelateIfMessageSubscriptionClosed() {
        UnpackedObject messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageIntent.PUBLISH, message());
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
    }

    @Test
    public void shouldRejectDuplicatedCloseMessageSubscription() {
        UnpackedObject messageSubscription = messageSubscription();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        this.rule.writeCommand(MessageSubscriptionIntent.DELETE, messageSubscription);
        Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getIntent()).isEqualTo(MessageSubscriptionIntent.DELETE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getRejectionType()).isEqualTo(RejectionType.NOT_FOUND);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(2))).closeProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class));
    }

    @Test
    public void shouldNotCorrelateNewMessagesIfSubscriptionNotCorrelatable() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).exists();
        });
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L).times(1))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey()), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any());
    }

    @Test
    public void shouldCorrelateNewMessagesIfSubscriptionIsReusable() {
        UnpackedObject messageSubscription = messageSubscription();
        UnpackedObject message = message();
        messageSubscription.setInterrupting(false);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, messageSubscription);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).limit(2L)).count() == 2;
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        long key2 = ((Record) ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscription.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any());
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscription.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscription.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscription.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(), ArgumentMatchers.eq(key2), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.any());
    }

    @Test
    public void shouldCorrelateMultipleMessagesOneBeforeOpenOneAfter() {
        UnpackedObject interrupting = messageSubscription().setInterrupting(false);
        UnpackedObject variables = message().setVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar"));
        UnpackedObject variables2 = message().setVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "baz"));
        this.rule.writeCommand(MessageIntent.PUBLISH, variables);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, interrupting);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, interrupting);
        this.rule.writeCommand(MessageIntent.PUBLISH, variables2);
        assertAllMessagesReceived(interrupting);
    }

    @Test
    public void shouldCorrelateMultipleMessagesTwoBeforeOpen() {
        UnpackedObject interrupting = messageSubscription().setInterrupting(false);
        UnpackedObject variables = message().setVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "bar"));
        UnpackedObject variables2 = message().setVariables(MsgPackUtil.asMsgPack(SkipFailingEventsTest.STREAM_NAME, "baz"));
        this.rule.writeCommand(MessageIntent.PUBLISH, variables);
        this.rule.writeCommand(MessageIntent.PUBLISH, variables2);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, interrupting);
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).exists();
        });
        this.rule.writeCommand(MessageSubscriptionIntent.CORRELATE, interrupting);
        assertAllMessagesReceived(interrupting);
    }

    @Test
    public void shouldCorrelateToFirstSubscriptionAfterRejection() {
        UnpackedObject message = message();
        UnpackedObject elementInstanceKey = messageSubscription().setElementInstanceKey(5L);
        UnpackedObject elementInstanceKey2 = messageSubscription().setElementInstanceKey(10L);
        this.rule.writeCommand(MessageIntent.PUBLISH, message);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, elementInstanceKey);
        this.rule.writeCommand(MessageSubscriptionIntent.CREATE, elementInstanceKey2);
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageSubscriptionRecords().withIntent(MessageSubscriptionIntent.CREATED).filter(record -> {
                return record.getValue().getElementInstanceKey() == elementInstanceKey2.getElementInstanceKey();
            })).exists();
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        elementInstanceKey.setMessageKey(key);
        this.rule.writeCommand(MessageSubscriptionIntent.REJECT, elementInstanceKey);
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(elementInstanceKey.getProcessInstanceKey()), ArgumentMatchers.eq(elementInstanceKey.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey.getCorrelationKeyBuffer()));
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(elementInstanceKey2.getProcessInstanceKey()), ArgumentMatchers.eq(elementInstanceKey2.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey2.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(DirectBuffer.class), (DirectBuffer) ArgumentMatchers.eq(elementInstanceKey2.getCorrelationKeyBuffer()));
    }

    private void assertAllMessagesReceived(MessageSubscriptionRecord messageSubscriptionRecord) {
        TestUtil.waitUntil(() -> {
            return ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).limit(2L)).count() == 2;
        });
        long key = ((Record) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).getFirst()).getKey();
        long key2 = ((Record) ((TypedRecordStream) this.rule.events().onlyMessageRecords().withIntent(MessageIntent.PUBLISHED).skip(1L)).getFirst()).getKey();
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscriptionRecord.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscriptionRecord.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getMessageNameBuffer()), ArgumentMatchers.eq(key), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getCorrelationKeyBuffer()));
        ((SubscriptionCommandSender) Mockito.verify(this.mockSubscriptionCommandSender, Mockito.timeout(5000L))).correlateProcessMessageSubscription(ArgumentMatchers.eq(messageSubscriptionRecord.getProcessInstanceKey()), ArgumentMatchers.eq(messageSubscriptionRecord.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getBpmnProcessIdBuffer()), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getMessageNameBuffer()), ArgumentMatchers.eq(key2), (DirectBuffer) ArgumentMatchers.any(), (DirectBuffer) ArgumentMatchers.eq(messageSubscriptionRecord.getCorrelationKeyBuffer()));
    }

    private MessageSubscriptionRecord messageSubscription() {
        MessageSubscriptionRecord messageSubscriptionRecord = new MessageSubscriptionRecord();
        messageSubscriptionRecord.setProcessInstanceKey(1L).setElementInstanceKey(2L).setBpmnProcessId(BufferUtil.wrapString("process")).setMessageKey(-1L).setMessageName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setInterrupting(true);
        return messageSubscriptionRecord;
    }

    private MessageRecord message() {
        MessageRecord messageRecord = new MessageRecord();
        messageRecord.setName(BufferUtil.wrapString("order canceled")).setCorrelationKey(BufferUtil.wrapString("order-123")).setTimeToLive(Duration.ofSeconds(10L).toMillis()).setVariables(MsgPackUtil.asMsgPack("orderId", "order-123"));
        return messageRecord;
    }

    private Record<MessageSubscriptionRecord> awaitAndGetFirstSubscriptionRejection() {
        TestUtil.waitUntil(() -> {
            return this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().isPresent();
        });
        return (Record) this.rule.events().onlyMessageSubscriptionRecords().onlyRejections().findFirst().get();
    }
}
