package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.common.EventHandle;
import io.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.state.KeyGenerator;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.engine.state.immutable.ElementInstanceState;
import io.zeebe.engine.state.immutable.WorkflowState;
import io.zeebe.engine.state.instance.ElementInstance;
import io.zeebe.engine.state.message.WorkflowInstanceSubscription;
import io.zeebe.engine.state.mutable.MutableWorkflowInstanceSubscriptionState;
import io.zeebe.protocol.impl.record.value.message.WorkflowInstanceSubscriptionRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/engine/processing/message/CorrelateWorkflowInstanceSubscription.class */
public final class CorrelateWorkflowInstanceSubscription implements TypedRecordProcessor<WorkflowInstanceSubscriptionRecord> {
    private static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10);
    private static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);
    private static final String NO_EVENT_OCCURRED_MESSAGE = "Expected to correlate a workflow instance subscription with element key '%d' and message name '%s', but the subscription is not active anymore";
    private static final String NO_SUBSCRIPTION_FOUND_MESSAGE = "Expected to correlate workflow instance subscription with element key '%d' and message name '%s', but no such subscription was found";
    private static final String ALREADY_CLOSING_MESSAGE = "Expected to correlate workflow instance subscription with element key '%d' and message name '%s', but it is already closing";
    private final MutableWorkflowInstanceSubscriptionState subscriptionState;
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final WorkflowState workflowState;
    private final ElementInstanceState elementInstanceState;
    private final KeyGenerator keyGenerator;
    private final EventHandle eventHandle;
    private final WorkflowInstanceRecord eventSubprocessRecord = new WorkflowInstanceRecord();
    private WorkflowInstanceSubscriptionRecord subscriptionRecord;
    private DirectBuffer correlationKey;

    public CorrelateWorkflowInstanceSubscription(MutableWorkflowInstanceSubscriptionState mutableWorkflowInstanceSubscriptionState, SubscriptionCommandSender subscriptionCommandSender, ZeebeState zeebeState) {
        this.subscriptionState = mutableWorkflowInstanceSubscriptionState;
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.workflowState = zeebeState.getWorkflowState();
        this.elementInstanceState = zeebeState.getElementInstanceState();
        this.keyGenerator = zeebeState.getKeyGenerator();
        this.eventHandle = new EventHandle(this.keyGenerator, zeebeState.getEventScopeInstanceState());
    }

    @Override // io.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware
    public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
        readonlyProcessingContext.getActor().runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, new PendingWorkflowInstanceSubscriptionChecker(this.subscriptionCommandSender, this.subscriptionState, SUBSCRIPTION_TIMEOUT.toMillis()));
    }

    @Override // io.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware
    public void onClose() {
    }

    @Override // io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(TypedRecord<WorkflowInstanceSubscriptionRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        this.subscriptionRecord = typedRecord.mo20getValue();
        WorkflowInstanceSubscription subscription = this.subscriptionState.getSubscription(this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getMessageNameBuffer());
        if (subscription == null || subscription.isClosing()) {
            RejectionType rejectionType = RejectionType.NOT_FOUND;
            String str = NO_SUBSCRIPTION_FOUND_MESSAGE;
            if (subscription != null) {
                rejectionType = RejectionType.INVALID_STATE;
                str = ALREADY_CLOSING_MESSAGE;
                this.correlationKey = subscription.getCorrelationKey();
            } else {
                this.correlationKey = this.subscriptionRecord.getCorrelationKeyBuffer();
            }
            consumer.accept(this::sendRejectionCommand);
            typedStreamWriter.appendRejection(typedRecord, rejectionType, String.format(str, Long.valueOf(this.subscriptionRecord.getElementInstanceKey()), BufferUtil.bufferAsString(this.subscriptionRecord.getMessageNameBuffer())));
            return;
        }
        if (subscription.shouldCloseOnCorrelate()) {
            this.subscriptionState.remove(subscription);
        }
        if (triggerCatchEvent(typedStreamWriter, subscription, typedRecord.mo20getValue().getVariablesBuffer())) {
            consumer.accept(this::sendAcknowledgeCommand);
            typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), WorkflowInstanceSubscriptionIntent.CORRELATED, this.subscriptionRecord);
        } else {
            this.correlationKey = subscription.getCorrelationKey();
            consumer.accept(this::sendRejectionCommand);
            typedStreamWriter.appendRejection(typedRecord, RejectionType.INVALID_STATE, String.format(NO_EVENT_OCCURRED_MESSAGE, Long.valueOf(this.subscriptionRecord.getElementInstanceKey()), BufferUtil.bufferAsString(this.subscriptionRecord.getMessageNameBuffer())));
        }
    }

    private boolean triggerCatchEvent(TypedStreamWriter typedStreamWriter, WorkflowInstanceSubscription workflowInstanceSubscription, DirectBuffer directBuffer) {
        ElementInstance elementInstanceState = this.elementInstanceState.getInstance(workflowInstanceSubscription.getElementInstanceKey());
        if (elementInstanceState == null) {
            return false;
        }
        return this.eventHandle.triggerEvent(typedStreamWriter, elementInstanceState, this.workflowState.getFlowElement(elementInstanceState.getValue().getWorkflowKey(), workflowInstanceSubscription.getTargetElementId(), ExecutableFlowElement.class), directBuffer);
    }

    private boolean sendAcknowledgeCommand() {
        return this.subscriptionCommandSender.correlateMessageSubscription(this.subscriptionRecord.getSubscriptionPartitionId(), this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getElementInstanceKey(), this.subscriptionRecord.getBpmnProcessIdBuffer(), this.subscriptionRecord.getMessageNameBuffer());
    }

    private boolean sendRejectionCommand() {
        return this.subscriptionCommandSender.rejectCorrelateMessageSubscription(this.subscriptionRecord.getWorkflowInstanceKey(), this.subscriptionRecord.getBpmnProcessIdBuffer(), this.subscriptionRecord.getMessageKey(), this.subscriptionRecord.getMessageNameBuffer(), this.correlationKey);
    }
}
