package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.zeebe.engine.state.immutable.MessageState;
import io.zeebe.engine.state.immutable.MessageSubscriptionState;
import io.zeebe.engine.state.message.StoredMessage;
import io.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord;
import io.zeebe.protocol.record.RecordValue;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.class */
public final class MessageSubscriptionRejectProcessor implements TypedRecordProcessor<MessageSubscriptionRecord> {
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;
    private final SubscriptionCommandSender commandSender;
    private final StateWriter stateWriter;
    private final TypedRejectionWriter rejectionWriter;

    public MessageSubscriptionRejectProcessor(MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender, Writers writers) {
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
        this.commandSender = subscriptionCommandSender;
        this.stateWriter = writers.state();
        this.rejectionWriter = writers.rejection();
    }

    @Override // io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<MessageSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        RecordValue recordValue = (MessageSubscriptionRecord) typedRecord.mo20getValue();
        if (!this.messageState.existMessageCorrelation(recordValue.getMessageKey(), recordValue.getBpmnProcessIdBuffer())) {
            rejectCommand(typedRecord);
        } else {
            this.stateWriter.appendFollowUpEvent(typedRecord.getKey(), MessageSubscriptionIntent.REJECTED, recordValue);
            findSubscriptionToCorrelate(consumer, recordValue);
        }
    }

    private void findSubscriptionToCorrelate(Consumer<SideEffectProducer> consumer, MessageSubscriptionRecord messageSubscriptionRecord) {
        long messageKey = messageSubscriptionRecord.getMessageKey();
        StoredMessage message = this.messageState.getMessage(messageKey);
        if (message == null) {
            return;
        }
        this.subscriptionState.visitSubscriptions(messageSubscriptionRecord.getMessageNameBuffer(), messageSubscriptionRecord.getCorrelationKeyBuffer(), messageSubscription -> {
            RecordValue record = messageSubscription.getRecord();
            boolean z = record.getBpmnProcessIdBuffer().equals(messageSubscriptionRecord.getBpmnProcessIdBuffer()) && !messageSubscription.isCorrelating();
            if (z) {
                record.setMessageKey(messageKey).setVariables(message.getMessage().getVariablesBuffer());
                this.stateWriter.appendFollowUpEvent(messageSubscription.getKey(), MessageSubscriptionIntent.CORRELATING, record);
                consumer.accept(() -> {
                    return sendCorrelateCommand(record);
                });
            }
            return !z;
        });
    }

    private boolean sendCorrelateCommand(MessageSubscriptionRecord messageSubscriptionRecord) {
        return this.commandSender.correlateProcessMessageSubscription(messageSubscriptionRecord.getProcessInstanceKey(), messageSubscriptionRecord.getElementInstanceKey(), messageSubscriptionRecord.getBpmnProcessIdBuffer(), messageSubscriptionRecord.getMessageNameBuffer(), messageSubscriptionRecord.getMessageKey(), messageSubscriptionRecord.getVariablesBuffer(), messageSubscriptionRecord.getCorrelationKeyBuffer());
    }

    private void rejectCommand(TypedRecord<MessageSubscriptionRecord> typedRecord) {
        MessageSubscriptionRecord mo20getValue = typedRecord.mo20getValue();
        this.rejectionWriter.appendRejection(typedRecord, RejectionType.INVALID_STATE, String.format("Expected message '%d' to be correlated for process with BPMN process id '%s' but no correlation was found", Long.valueOf(mo20getValue.getMessageKey()), mo20getValue.getBpmnProcessId()));
    }
}
