package io.zeebe.engine.processing.message;

import io.zeebe.engine.util.EngineRule;
import io.zeebe.engine.util.client.WorkflowInstanceClient;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.impl.SubscriptionUtil;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.collection.Maps;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.test.util.record.WorkflowInstances;
import io.zeebe.util.buffer.BufferUtil;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.MapEntry;
import org.assertj.core.groups.Tuple;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/zeebe/engine/processing/message/MessageCorrelationMultiplePartitionsTest.class */
public final class MessageCorrelationMultiplePartitionsTest {
    private static final String PROCESS_ID = "process";

    @Rule
    public final EngineRule engine = EngineRule.multiplePartition(3);
    private static final Map<Integer, String> CORRELATION_KEYS = Maps.of(new Map.Entry[]{MapEntry.entry(1, "item-2"), MapEntry.entry(2, "item-1"), MapEntry.entry(3, "item-0")});
    private static final BpmnModelInstance WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("message").zeebeCorrelationKeyExpression("key");
    }).endEvent("end").done();

    @Before
    public void init() {
        Assertions.assertThat(getPartitionId(CORRELATION_KEYS.get(1))).isEqualTo(1);
        Assertions.assertThat(getPartitionId(CORRELATION_KEYS.get(2))).isEqualTo(2);
        Assertions.assertThat(getPartitionId(CORRELATION_KEYS.get(3))).isEqualTo(3);
        this.engine.deployment().withXmlResource(WORKFLOW).deploy();
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnDifferentPartitions() {
        IntStream.range(0, 10).forEach(i -> {
            WorkflowInstanceClient.WorkflowInstanceCreationClient ofBpmnProcessId = this.engine.workflowInstance().ofBpmnProcessId("process");
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(1)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(2)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).limit(30L)).extracting(record -> {
            return Assertions.tuple(new Object[]{Integer.valueOf(record.getPartitionId()), record.getValue().getCorrelationKey()});
        }).containsOnly(new Tuple[]{Assertions.tuple(new Object[]{1, CORRELATION_KEYS.get(1)}), Assertions.tuple(new Object[]{2, CORRELATION_KEYS.get(2)}), Assertions.tuple(new Object[]{3, CORRELATION_KEYS.get(3)})});
    }

    @Test
    public void shouldCorrelateMessageOnDifferentPartitions() {
        this.engine.forEachPartition(num -> {
            this.engine.message().onPartition(num.intValue()).withName("message").withCorrelationKey(CORRELATION_KEYS.get(num)).withVariables(MsgPackUtil.asMsgPack("p", "p" + num)).publish();
        });
        WorkflowInstanceClient.WorkflowInstanceCreationClient ofBpmnProcessId = this.engine.workflowInstance().ofBpmnProcessId("process");
        Assertions.assertThat(Arrays.asList((String) WorkflowInstances.getCurrentVariables(ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(1)).create()).get("p"), (String) WorkflowInstances.getCurrentVariables(ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(2)).create()).get("p"), (String) WorkflowInstances.getCurrentVariables(ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(3)).create()).get("p"))).contains(new String[]{"\"p1\"", "\"p2\"", "\"p3\""});
    }

    @Test
    public void shouldOpenMessageSubscriptionsOnSamePartitionsAfterRestart() {
        WorkflowInstanceClient.WorkflowInstanceCreationClient ofBpmnProcessId = this.engine.workflowInstance().ofBpmnProcessId("process");
        IntStream.range(0, 5).forEach(i -> {
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(1)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(2)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).limit(15L).count()).isEqualTo(15L);
        this.engine.stop();
        RecordingExporter.reset();
        this.engine.start();
        IntStream.range(0, 5).forEach(i2 -> {
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(1)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(2)).create();
            ofBpmnProcessId.withVariable("key", CORRELATION_KEYS.get(3)).create();
        });
        Assertions.assertThat(RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).limit(30L)).extracting(record -> {
            return Assertions.tuple(new Object[]{Integer.valueOf(record.getPartitionId()), record.getValue().getCorrelationKey()});
        }).hasSize(30).containsOnly(new Tuple[]{Assertions.tuple(new Object[]{1, CORRELATION_KEYS.get(1)}), Assertions.tuple(new Object[]{2, CORRELATION_KEYS.get(2)}), Assertions.tuple(new Object[]{3, CORRELATION_KEYS.get(3)})});
    }

    private int getPartitionId(String str) {
        return SubscriptionUtil.getSubscriptionPartitionId(BufferUtil.wrapString(str), this.engine.getPartitionIds().size());
    }
}
