package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.common.EventHandle;
import io.zeebe.engine.processing.common.EventTriggerBehavior;
import io.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.zeebe.engine.state.immutable.ElementInstanceState;
import io.zeebe.engine.state.immutable.ProcessMessageSubscriptionState;
import io.zeebe.engine.state.immutable.ProcessState;
import io.zeebe.engine.state.instance.ElementInstance;
import io.zeebe.engine.state.message.ProcessMessageSubscription;
import io.zeebe.engine.state.mutable.MutableZeebeState;
import io.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
import io.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.zeebe.protocol.record.RecordValue;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.zeebe.util.buffer.BufferUtil;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/engine/processing/message/ProcessMessageSubscriptionCorrelateProcessor.class */
public final class ProcessMessageSubscriptionCorrelateProcessor implements TypedRecordProcessor<ProcessMessageSubscriptionRecord> {
    private static final String NO_EVENT_OCCURRED_MESSAGE = "Expected to correlate a process message subscription with element key '%d' and message name '%s', but the subscription is not active anymore";
    private static final String NO_SUBSCRIPTION_FOUND_MESSAGE = "Expected to correlate process message subscription with element key '%d' and message name '%s', but no such subscription was found";
    private static final String ALREADY_CLOSING_MESSAGE = "Expected to correlate process message subscription with element key '%d' and message name '%s', but it is already closing";
    private final ProcessMessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final ProcessState processState;
    private final ElementInstanceState elementInstanceState;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;
    private final EventHandle eventHandle;

    public ProcessMessageSubscriptionCorrelateProcessor(ProcessMessageSubscriptionState processMessageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender, MutableZeebeState mutableZeebeState, EventTriggerBehavior eventTriggerBehavior, Writers writers) {
        this.subscriptionState = processMessageSubscriptionState;
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.processState = mutableZeebeState.getProcessState();
        this.elementInstanceState = mutableZeebeState.getElementInstanceState();
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
        this.eventHandle = new EventHandle(mutableZeebeState.getKeyGenerator(), mutableZeebeState.getEventScopeInstanceState(), writers, this.processState, eventTriggerBehavior);
    }

    @Override // io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<ProcessMessageSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        RecordValue recordValue = (ProcessMessageSubscriptionRecord) typedRecord.mo24getValue();
        long elementInstanceKey = recordValue.getElementInstanceKey();
        ProcessMessageSubscription subscription = this.subscriptionState.getSubscription(elementInstanceKey, recordValue.getMessageNameBuffer());
        if (subscription == null) {
            rejectCommand(typedRecord, RejectionType.NOT_FOUND, NO_SUBSCRIPTION_FOUND_MESSAGE);
            return;
        }
        if (subscription.isClosing()) {
            rejectCommand(typedRecord, RejectionType.INVALID_STATE, ALREADY_CLOSING_MESSAGE);
            return;
        }
        ElementInstance elementInstanceState = this.elementInstanceState.getInstance(elementInstanceKey);
        if (!this.eventHandle.canTriggerElement(elementInstanceState)) {
            rejectCommand(typedRecord, RejectionType.INVALID_STATE, NO_EVENT_OCCURRED_MESSAGE);
            return;
        }
        recordValue.setElementId(subscription.getTargetElementId()).setInterrupting(subscription.shouldCloseOnCorrelate());
        this.stateWriter.appendFollowUpEvent(typedRecord.getKey(), ProcessMessageSubscriptionIntent.CORRELATED, recordValue);
        this.eventHandle.activateElement(getCatchEvent(elementInstanceState.getValue(), recordValue.getElementIdBuffer()), elementInstanceKey, elementInstanceState.getValue(), recordValue.getVariablesBuffer());
        sendAcknowledgeCommand(recordValue);
    }

    private ExecutableFlowElement getCatchEvent(ProcessInstanceRecord processInstanceRecord, DirectBuffer directBuffer) {
        return this.processState.getFlowElement(processInstanceRecord.getProcessDefinitionKey(), directBuffer, ExecutableFlowElement.class);
    }

    private void rejectCommand(TypedRecord<ProcessMessageSubscriptionRecord> typedRecord, RejectionType rejectionType, String str) {
        ProcessMessageSubscriptionRecord mo24getValue = typedRecord.mo24getValue();
        this.rejectionWriter.appendRejection(typedRecord, rejectionType, String.format(str, Long.valueOf(mo24getValue.getElementInstanceKey()), BufferUtil.bufferAsString(mo24getValue.getMessageNameBuffer())));
        sendRejectionCommand(mo24getValue);
    }

    private void sendAcknowledgeCommand(ProcessMessageSubscriptionRecord processMessageSubscriptionRecord) {
        this.subscriptionCommandSender.correlateMessageSubscription(processMessageSubscriptionRecord.getSubscriptionPartitionId(), processMessageSubscriptionRecord.getProcessInstanceKey(), processMessageSubscriptionRecord.getElementInstanceKey(), processMessageSubscriptionRecord.getBpmnProcessIdBuffer(), processMessageSubscriptionRecord.getMessageNameBuffer());
    }

    private void sendRejectionCommand(ProcessMessageSubscriptionRecord processMessageSubscriptionRecord) {
        this.subscriptionCommandSender.rejectCorrelateMessageSubscription(processMessageSubscriptionRecord.getProcessInstanceKey(), processMessageSubscriptionRecord.getBpmnProcessIdBuffer(), processMessageSubscriptionRecord.getMessageKey(), processMessageSubscriptionRecord.getMessageNameBuffer(), processMessageSubscriptionRecord.getCorrelationKeyBuffer());
    }
}
