package io.zeebe.engine.processor.workflow.deployment.distribute;

import io.zeebe.engine.processor.ReadonlyProcessingContext;
import io.zeebe.engine.processor.SideEffectProducer;
import io.zeebe.engine.processor.TypedRecord;
import io.zeebe.engine.processor.TypedRecordProcessor;
import io.zeebe.engine.processor.TypedResponseWriter;
import io.zeebe.engine.processor.TypedStreamWriter;
import io.zeebe.engine.state.deployment.DeploymentsState;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.util.sched.ActorControl;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;

/* loaded from: input_file:io/zeebe/engine/processor/workflow/deployment/distribute/DeploymentDistributeProcessor.class */
public class DeploymentDistributeProcessor implements TypedRecordProcessor<DeploymentRecord> {
    private final LogStreamWriterImpl logStreamWriter;
    private final DeploymentsState deploymentsState;
    private ActorControl actor;
    private final DeploymentDistributor deploymentDistributor;
    private int streamProcessorId;
    private int partitionId;

    public DeploymentDistributeProcessor(DeploymentsState deploymentsState, LogStreamWriterImpl logStreamWriterImpl, DeploymentDistributor deploymentDistributor) {
        this.deploymentsState = deploymentsState;
        this.logStreamWriter = logStreamWriterImpl;
        this.deploymentDistributor = deploymentDistributor;
    }

    @Override // io.zeebe.engine.processor.StreamProcessorLifecycleAware
    public void onOpen(ReadonlyProcessingContext readonlyProcessingContext) {
        this.partitionId = readonlyProcessingContext.getLogStream().getPartitionId();
        this.streamProcessorId = readonlyProcessingContext.getProducerId();
        this.actor = readonlyProcessingContext.getActor();
        this.actor.submit(this::reprocessPendingDeployments);
    }

    private void reprocessPendingDeployments() {
        this.deploymentsState.foreachPending((pendingDeploymentDistribution, j) -> {
            ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
            DirectBuffer deployment = pendingDeploymentDistribution.getDeployment();
            expandableArrayBuffer.putBytes(0, deployment, 0, deployment.capacity());
            distributeDeployment(j, pendingDeploymentDistribution.getSourcePosition(), expandableArrayBuffer);
        });
    }

    @Override // io.zeebe.engine.processor.TypedRecordProcessor
    public void processRecord(long j, TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
        DeploymentRecord mo8getValue = typedRecord.mo8getValue();
        long key = typedRecord.getKey();
        ExpandableArrayBuffer expandableArrayBuffer = new ExpandableArrayBuffer();
        mo8getValue.write(expandableArrayBuffer, 0);
        distributeDeployment(key, j, expandableArrayBuffer);
    }

    private void distributeDeployment(long j, long j2, DirectBuffer directBuffer) {
        this.actor.runOnCompletion(this.deploymentDistributor.pushDeployment(j, j2, directBuffer), (r7, th) -> {
            writeCreatingDeploymentCommand(j);
        });
    }

    private void writeCreatingDeploymentCommand(long j) {
        PendingDeploymentDistribution removePendingDeployment = this.deploymentDistributor.removePendingDeployment(j);
        DirectBuffer deployment = removePendingDeployment.getDeployment();
        long sourcePosition = removePendingDeployment.getSourcePosition();
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        deploymentRecord.wrap(deployment);
        RecordMetadata recordMetadata = new RecordMetadata();
        recordMetadata.partitionId(this.partitionId).intent(DeploymentIntent.DISTRIBUTED).valueType(ValueType.DEPLOYMENT).recordType(RecordType.EVENT);
        this.actor.runUntilDone(() -> {
            if (this.logStreamWriter.key(j).producerId(this.streamProcessorId).sourceRecordPosition(sourcePosition).valueWriter(deploymentRecord).metadataWriter(recordMetadata).tryWrite() < 0) {
                this.actor.yield();
            } else {
                this.actor.done();
            }
        });
    }
}
