package io.zeebe.engine.processor.workflow.message;

import io.zeebe.engine.processor.SideEffectProducer;
import io.zeebe.engine.processor.TypedRecord;
import io.zeebe.engine.processor.TypedRecordProcessor;
import io.zeebe.engine.processor.TypedResponseWriter;
import io.zeebe.engine.processor.TypedStreamWriter;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandSender;
import io.zeebe.engine.state.message.Message;
import io.zeebe.engine.state.message.MessageState;
import io.zeebe.engine.state.message.MessageSubscription;
import io.zeebe.engine.state.message.MessageSubscriptionState;
import io.zeebe.protocol.RejectionType;
import io.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/engine/processor/workflow/message/RejectMessageCorrelationProcessor.class */
public class RejectMessageCorrelationProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private MessageSubscription subscription;

    public RejectMessageCorrelationProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.commandSender = subscriptionCommandSender;
    }

    @Override // io.zeebe.engine.processor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        MessageSubscriptionRecord mo8getValue = typedRecord.mo8getValue();
        long messageKey = mo8getValue.getMessageKey();
        long workflowInstanceKey = mo8getValue.getWorkflowInstanceKey();
        if (!this.messageState.existMessageCorrelation(messageKey, workflowInstanceKey)) {
            typedStreamWriter.appendRejection(typedRecord, RejectionType.INVALID_STATE, String.format("Expected message %d to be correlated in workflow instance %d but no correlation was found", Long.valueOf(messageKey), Long.valueOf(workflowInstanceKey)));
            return;
        }
        this.messageState.removeMessageCorrelation(messageKey, workflowInstanceKey);
        findSubscriptionToCorrelate(consumer, mo8getValue, messageKey, workflowInstanceKey);
        typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), MessageSubscriptionIntent.REJECTED, typedRecord.mo8getValue());
    }

    private void findSubscriptionToCorrelate(Consumer<SideEffectProducer> consumer, MessageSubscriptionRecord messageSubscriptionRecord, long j, long j2) {
        Message message = this.messageState.getMessage(j);
        if (message == null) {
            return;
        }
        this.subscriptionState.visitSubscriptions(messageSubscriptionRecord.getMessageNameBuffer(), messageSubscriptionRecord.getCorrelationKeyBuffer(), messageSubscription -> {
            if (messageSubscription.getWorkflowInstanceKey() != j2 || messageSubscription.isCorrelating()) {
                return true;
            }
            messageSubscription.setMessageKey(j);
            messageSubscription.setMessageVariables(message.getVariables());
            correlateMessage(messageSubscription, consumer);
            return false;
        });
    }

    private void correlateMessage(MessageSubscription messageSubscription, Consumer<SideEffectProducer> consumer) {
        this.subscriptionState.updateToCorrelatingState(messageSubscription, messageSubscription.getMessageVariables(), ActorClock.currentTimeMillis(), messageSubscription.getMessageKey());
        this.messageState.putMessageCorrelation(messageSubscription.getMessageKey(), messageSubscription.getWorkflowInstanceKey());
        this.subscription = messageSubscription;
        consumer.accept(this::sendCorrelateCommand);
    }

    private boolean sendCorrelateCommand() {
        return this.commandSender.correlateWorkflowInstanceSubscription(this.subscription.getWorkflowInstanceKey(), this.subscription.getElementInstanceKey(), this.subscription.getMessageName(), this.subscription.getMessageKey(), this.subscription.getMessageVariables());
    }
}
