package io.zeebe.engine.processing.deployment.distribute;

import io.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.state.deployment.DeploymentsState;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.util.sched.ActorControl;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/zeebe/engine/processing/deployment/distribute/DeploymentDistributeProcessor.class */
public final class DeploymentDistributeProcessor implements TypedRecordProcessor<DeploymentRecord> {
    private final DeploymentsState deploymentsState;
    private final DeploymentDistributor deploymentDistributor;
    private final ActorControl actor;

    public DeploymentDistributeProcessor(ActorControl actorControl, DeploymentsState deploymentsState, DeploymentDistributor deploymentDistributor) {
        this.deploymentsState = deploymentsState;
        this.deploymentDistributor = deploymentDistributor;
        this.actor = actorControl;
    }

    @Override // io.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware
    public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
        this.actor.submit(() -> {
            reprocessPendingDeployments(readonlyProcessingContext.getLogStreamWriter());
        });
    }

    private void reprocessPendingDeployments(TypedStreamWriter typedStreamWriter) {
        this.deploymentsState.foreachPending((pendingDeploymentDistribution, j) -> {
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
            DirectBuffer deployment = pendingDeploymentDistribution.getDeployment();
            expandableArrayBuffer.putBytes(0, deployment, 0, deployment.capacity());
            distributeDeployment(j, pendingDeploymentDistribution.getSourcePosition(), expandableArrayBuffer, typedStreamWriter);
        });
    }

    @Override // io.zeebe.engine.processing.streamprocessor.TypedRecordProcessor
    public void processRecord(long j, TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        DeploymentRecord mo22getValue = typedRecord.mo22getValue();
        long key = typedRecord.getKey();
        ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
        mo22getValue.write(expandableArrayBuffer, 0);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer();
        unsafeBuffer.wrap(expandableArrayBuffer, 0, mo22getValue.getLength());
        distributeDeployment(key, j, unsafeBuffer, typedStreamWriter);
    }

    private void distributeDeployment(long j, long j2, DirectBuffer directBuffer, TypedStreamWriter typedStreamWriter) {
        this.actor.runOnCompletion(this.deploymentDistributor.pushDeployment(j, j2, directBuffer), (r9, th) -> {
            writeCreatingDeploymentCommand(typedStreamWriter, j);
        });
    }

    private void writeCreatingDeploymentCommand(TypedStreamWriter typedStreamWriter, long j) {
        PendingDeploymentDistribution removePendingDeployment = this.deploymentDistributor.removePendingDeployment(j);
        DirectBuffer deployment = removePendingDeployment.getDeployment();
        long sourcePosition = removePendingDeployment.getSourcePosition();
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        deploymentRecord.wrap(deployment);
        new RecordMetadata().intent(DeploymentIntent.DISTRIBUTED).valueType(ValueType.DEPLOYMENT).recordType(RecordType.EVENT);
        this.actor.runUntilDone(() -> {
            typedStreamWriter.reset();
            typedStreamWriter.configureSourceContext(sourcePosition);
            typedStreamWriter.appendFollowUpEvent(j, DeploymentIntent.DISTRIBUTED, deploymentRecord);
            if (typedStreamWriter.flush() < 0) {
                this.actor.yield();
            } else {
                this.actor.done();
            }
        });
    }
}
