package io.zeebe.broker.engine.impl;

import io.atomix.cluster.MemberId;
import io.atomix.core.Atomix;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.topology.TopologyPartitionListenerImpl;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.management.deployment.PushDeploymentRequest;
import io.zeebe.broker.system.management.deployment.PushDeploymentResponse;
import io.zeebe.engine.processor.workflow.deployment.distribute.DeploymentDistributor;
import io.zeebe.engine.processor.workflow.deployment.distribute.PendingDeploymentDistribution;
import io.zeebe.engine.state.deployment.DeploymentsState;
import io.zeebe.protocol.impl.encoding.ErrorResponse;
import io.zeebe.protocol.record.ErrorCode;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/engine/impl/DeploymentDistributorImpl.class */
public final class DeploymentDistributorImpl implements DeploymentDistributor {
    public static final Duration PUSH_REQUEST_TIMEOUT = Duration.ofSeconds(15);
    public static final Duration RETRY_DELAY = Duration.ofMillis(100);
    private static final Logger LOG = Loggers.WORKFLOW_REPOSITORY_LOGGER;
    private final TopologyPartitionListenerImpl partitionListener;
    private final ActorControl actor;
    private final DeploymentsState deploymentsState;
    private final IntArrayList partitionsToDistributeTo;
    private final Atomix atomix;
    private final PushDeploymentResponse pushDeploymentResponse = new PushDeploymentResponse();
    private final ErrorResponse errorResponse = new ErrorResponse();
    private final transient Long2ObjectHashMap<ActorFuture<Void>> pendingDeploymentFutures = new Long2ObjectHashMap<>();
    private final Map<String, IntArrayList> deploymentResponses = new HashMap();

    public DeploymentDistributorImpl(ClusterCfg clusterCfg, Atomix atomix, TopologyPartitionListenerImpl topologyPartitionListenerImpl, DeploymentsState deploymentsState, ActorControl actorControl) {
        this.atomix = atomix;
        this.partitionListener = topologyPartitionListenerImpl;
        this.actor = actorControl;
        this.deploymentsState = deploymentsState;
        this.partitionsToDistributeTo = partitionsToDistributeTo(clusterCfg);
    }

    private IntArrayList partitionsToDistributeTo(ClusterCfg clusterCfg) {
        IntArrayList intArrayList = new IntArrayList();
        intArrayList.addAll(clusterCfg.getPartitionIds());
        intArrayList.removeInt(1);
        return intArrayList;
    }

    public ActorFuture<Void> pushDeployment(long j, long j2, DirectBuffer directBuffer) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.deploymentsState.putPendingDeployment(j, new PendingDeploymentDistribution(directBuffer, j2, this.partitionsToDistributeTo.size()));
        this.pendingDeploymentFutures.put(j, completableActorFuture);
        pushDeploymentToPartitions(j);
        return completableActorFuture;
    }

    public PendingDeploymentDistribution removePendingDeployment(long j) {
        return this.deploymentsState.removePendingDeployment(j);
    }

    private void pushDeploymentToPartitions(long j) {
        if (!this.partitionsToDistributeTo.isEmpty()) {
            deployOnMultiplePartitions(j);
        } else {
            LOG.trace("No other partitions to distribute deployment {}. Deployment finished", Long.valueOf(j));
            ((ActorFuture) this.pendingDeploymentFutures.remove(j)).complete((Object) null);
        }
    }

    private void deployOnMultiplePartitions(long j) {
        LOG.trace("Distribute deployment {} to other partitions.", Long.valueOf(j));
        PushDeploymentRequest deploymentKey = new PushDeploymentRequest().deployment(this.deploymentsState.getPendingDeployment(j).getDeployment()).deploymentKey(j);
        IntArrayList intArrayList = new IntArrayList();
        intArrayList.addAll(this.partitionsToDistributeTo);
        prepareToDistribute(intArrayList, deploymentKey);
    }

    private void prepareToDistribute(IntArrayList intArrayList, PushDeploymentRequest pushDeploymentRequest) {
        this.actor.runDelayed(PUSH_REQUEST_TIMEOUT, () -> {
            String deploymentResponseTopic = getDeploymentResponseTopic(pushDeploymentRequest.deploymentKey());
            IntArrayList partitionResponses = getPartitionResponses(deploymentResponseTopic);
            if (partitionResponses.isEmpty()) {
                return;
            }
            LOG.warn("Failed to receive deployment response for partitions {} (topic '{}'). Retrying", partitionResponses, deploymentResponseTopic);
            prepareToDistribute(partitionResponses, pushDeploymentRequest);
        });
        distributeDeployment(intArrayList, pushDeploymentRequest);
    }

    private void distributeDeployment(IntArrayList intArrayList, PushDeploymentRequest pushDeploymentRequest) {
        IntArrayList distributeDeploymentToPartitions = distributeDeploymentToPartitions(intArrayList, pushDeploymentRequest);
        if (distributeDeploymentToPartitions.isEmpty()) {
            LOG.debug("Pushed deployment {} to all partitions.", Long.valueOf(pushDeploymentRequest.deploymentKey()));
        } else {
            this.actor.runDelayed(RETRY_DELAY, () -> {
                distributeDeployment(distributeDeploymentToPartitions, pushDeploymentRequest);
            });
        }
    }

    private IntArrayList distributeDeploymentToPartitions(IntArrayList intArrayList, PushDeploymentRequest pushDeploymentRequest) {
        Int2IntHashMap partitionLeaders = this.partitionListener.getPartitionLeaders();
        Iterator it = intArrayList.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (partitionLeaders.containsKey(num)) {
                int intValue = partitionLeaders.get(num).intValue();
                it.remove();
                pushDeploymentToPartition(intValue, num.intValue(), pushDeploymentRequest);
            }
        }
        return intArrayList;
    }

    private void pushDeploymentToPartition(int i, int i2, PushDeploymentRequest pushDeploymentRequest) {
        pushDeploymentRequest.partitionId(i2);
        byte[] bytes = pushDeploymentRequest.toBytes();
        MemberId memberId = new MemberId(Integer.toString(i));
        createResponseSubscription(pushDeploymentRequest.deploymentKey(), pushDeploymentRequest);
        this.atomix.getCommunicationService().send("deployment", bytes, memberId, PUSH_REQUEST_TIMEOUT).whenComplete((bArr, th) -> {
            this.actor.call(() -> {
                if (th != null) {
                    LOG.warn("Failed to push deployment to node {} for partition {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), th});
                    handleRetry(i, i2, pushDeploymentRequest);
                    return;
                }
                UnsafeBuffer unsafeBuffer = new UnsafeBuffer(bArr);
                if (this.errorResponse.tryWrap(unsafeBuffer)) {
                    this.errorResponse.wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
                    if (this.errorResponse.getErrorCode() == ErrorCode.PARTITION_LEADER_MISMATCH) {
                        this.errorResponse.getErrorData().getInt(0, ByteOrder.LITTLE_ENDIAN);
                        LOG.debug("Received partition leader mismatch error from partition {} for deployment {}. Retrying.", Integer.valueOf(i2), Long.valueOf(pushDeploymentRequest.deploymentKey()));
                    } else {
                        LOG.warn("Received rejected deployment push due to error of type {}: '{}'", this.errorResponse.getErrorCode().name(), BufferUtil.bufferAsString(this.errorResponse.getErrorData()));
                    }
                    handleRetry(i, i2, pushDeploymentRequest);
                }
            });
        });
    }

    private void createResponseSubscription(long j, PushDeploymentRequest pushDeploymentRequest) {
        String deploymentResponseTopic = getDeploymentResponseTopic(pushDeploymentRequest.deploymentKey());
        if (this.atomix.getEventService().getSubscriptions(deploymentResponseTopic).isEmpty()) {
            LOG.trace("Setting up deployment subscription for topic {}", deploymentResponseTopic);
            this.atomix.getEventService().subscribe(deploymentResponseTopic, bArr -> {
                CompletableFuture completableFuture = new CompletableFuture();
                this.actor.call(() -> {
                    LOG.trace("Receiving deployment response on topic {}", deploymentResponseTopic);
                    handleResponse(bArr, j, deploymentResponseTopic);
                    completableFuture.complete(null);
                    return completableFuture;
                });
                return completableFuture;
            });
        }
    }

    private void handleResponse(byte[] bArr, long j, String str) {
        DirectBuffer unsafeBuffer = new UnsafeBuffer(bArr);
        if (this.pushDeploymentResponse.tryWrap(unsafeBuffer)) {
            this.pushDeploymentResponse.wrap(unsafeBuffer);
            if (handlePushResponse()) {
                getPartitionResponses(str).removeInt(this.pushDeploymentResponse.partitionId());
                return;
            }
            return;
        }
        if (!this.errorResponse.tryWrap(unsafeBuffer)) {
            LOG.warn("Received unknown deployment response on topic {}", str);
        } else {
            this.errorResponse.wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
            LOG.warn("Received rejected deployment push due to error of type {}: '{}'", this.errorResponse.getErrorCode().name(), BufferUtil.bufferAsString(this.errorResponse.getErrorData()));
        }
    }

    private void handleRetry(int i, int i2, PushDeploymentRequest pushDeploymentRequest) {
        LOG.trace("Retry deployment push to partition {} after {}", Integer.valueOf(i2), RETRY_DELAY);
        this.actor.runDelayed(RETRY_DELAY, () -> {
            Int2IntHashMap partitionLeaders = this.partitionListener.getPartitionLeaders();
            if (partitionLeaders.containsKey(i2)) {
                pushDeploymentToPartition(partitionLeaders.get(i2), i2, pushDeploymentRequest);
            } else {
                pushDeploymentToPartition(i, i2, pushDeploymentRequest);
            }
        });
    }

    private boolean handlePushResponse() {
        long deploymentKey = this.pushDeploymentResponse.deploymentKey();
        PendingDeploymentDistribution pendingDeployment = this.deploymentsState.getPendingDeployment(deploymentKey);
        boolean z = pendingDeployment != null;
        if (z) {
            long decrementDistributionCount = pendingDeployment.decrementDistributionCount();
            this.deploymentsState.putPendingDeployment(deploymentKey, pendingDeployment);
            LOG.trace("Deployment {} was pushed to partition {} successfully.", Long.valueOf(deploymentKey), Integer.valueOf(this.pushDeploymentResponse.partitionId()));
            if (decrementDistributionCount == 0) {
                LOG.debug("Deployment {} distributed to all partitions successfully.", Long.valueOf(deploymentKey));
                ((ActorFuture) this.pendingDeploymentFutures.remove(deploymentKey)).complete((Object) null);
            }
        } else {
            LOG.trace("Ignoring unexpected push deployment response for deployment key {}", Long.valueOf(deploymentKey));
        }
        return z;
    }

    public static String getDeploymentResponseTopic(long j) {
        return String.format("deployment-response-%d", Long.valueOf(j));
    }

    private IntArrayList getPartitionResponses(String str) {
        return this.deploymentResponses.computeIfAbsent(str, str2 -> {
            IntArrayList intArrayList = new IntArrayList();
            intArrayList.addAll(this.partitionsToDistributeTo);
            return intArrayList;
        });
    }
}
