package io.zeebe.broker.system.partitions.impl;

import io.atomix.raft.snapshot.TransientSnapshot;
import io.zeebe.broker.system.partitions.StateController;
import io.zeebe.engine.processor.RandomDuration;
import io.zeebe.engine.processor.StreamProcessor;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.SchedulingHints;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/system/partitions/impl/AsyncSnapshotDirector.class */
public final class AsyncSnapshotDirector extends Actor {
    public static final Duration MINIMUM_SNAPSHOT_PERIOD = Duration.ofMinutes(1);
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private static final String LOG_MSG_WAIT_UNTIL_COMMITTED = "Finished taking snapshot, need to wait until last written event position {} is committed, current commit position is {}. After that snapshot can be marked as valid.";
    private static final String ERROR_MSG_ON_RESOLVE_PROCESSED_POS = "Unexpected error in resolving last processed position.";
    private static final String ERROR_MSG_ON_RESOLVE_WRITTEN_POS = "Unexpected error in resolving last written position.";
    private static final String ERROR_MSG_MOVE_SNAPSHOT = "Unexpected exception occurred on moving valid snapshot.";
    private final StateController stateController;
    private final LogStream logStream;
    private final Duration snapshotRate;
    private final String processorName;
    private final StreamProcessor streamProcessor;
    private final String actorName;
    private ActorCondition commitCondition;
    private Long lastWrittenEventPosition;
    private TransientSnapshot pendingSnapshot;
    private long lowerBoundSnapshotPosition;
    private boolean takingSnapshot;

    public AsyncSnapshotDirector(int i, StreamProcessor streamProcessor, StateController stateController, LogStream logStream, Duration duration) {
        this.streamProcessor = streamProcessor;
        this.stateController = stateController;
        this.logStream = logStream;
        this.processorName = streamProcessor.getName();
        this.snapshotRate = duration;
        this.actorName = buildActorName(i, "SnapshotDirector-" + logStream.getPartitionId());
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.setSchedulingHints(SchedulingHints.ioBound());
        this.actor.runDelayed(RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, this.snapshotRate), this::scheduleSnapshotOnRate);
        this.lastWrittenEventPosition = null;
        this.commitCondition = this.actor.onCondition(getConditionNameForPosition(), this::onCommitCheck);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.commitCondition);
    }

    protected void onActorCloseRequested() {
        this.logStream.removeOnCommitPositionUpdatedCondition(this.commitCondition);
    }

    public ActorFuture<Void> closeAsync() {
        return this.actor.isClosed() ? CompletableActorFuture.completed((Object) null) : super.closeAsync();
    }

    private void scheduleSnapshotOnRate() {
        this.actor.runAtFixedRate(this.snapshotRate, this::prepareTakingSnapshot);
        prepareTakingSnapshot();
    }

    private String getConditionNameForPosition() {
        return getName() + "-wait-for-endPosition-committed";
    }

    private void prepareTakingSnapshot() {
        if (this.takingSnapshot) {
            return;
        }
        this.takingSnapshot = true;
        this.actor.runOnCompletion(this.streamProcessor.getLastProcessedPositionAsync(), (l, th) -> {
            if (th != null) {
                LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, th);
                this.takingSnapshot = false;
            } else if (l.longValue() == -1) {
                LOG.debug("We will skip taking this snapshot, because we haven't processed something yet.");
                this.takingSnapshot = false;
            } else {
                this.lowerBoundSnapshotPosition = l.longValue();
                takeSnapshot();
            }
        });
    }

    private void takeSnapshot() {
        long j = this.lowerBoundSnapshotPosition;
        this.logStream.getCommitPositionAsync().onComplete((l, th) -> {
            if (th != null) {
                this.takingSnapshot = false;
                LOG.error("Unexpected error on retrieving commit position", th);
                return;
            }
            Optional<TransientSnapshot> takeTransientSnapshot = this.stateController.takeTransientSnapshot(j);
            if (takeTransientSnapshot.isEmpty()) {
                LOG.warn("Failed to obtain a pending snapshot directory for position {}", Long.valueOf(j));
                this.takingSnapshot = false;
            } else {
                LOG.debug("Created snapshot for {}", this.processorName);
                this.pendingSnapshot = takeTransientSnapshot.get();
                this.actor.runOnCompletion(this.streamProcessor.getLastWrittenPositionAsync(), (l, th) -> {
                    if (th == null) {
                        LOG.info(LOG_MSG_WAIT_UNTIL_COMMITTED, l, l);
                        this.lastWrittenEventPosition = l;
                        onCommitCheck();
                    } else {
                        this.lastWrittenEventPosition = null;
                        this.takingSnapshot = false;
                        this.pendingSnapshot = null;
                        LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, th);
                    }
                });
            }
        });
    }

    private void onCommitCheck() {
        this.logStream.getCommitPositionAsync().onComplete((l, th) -> {
            if (this.pendingSnapshot == null || this.lastWrittenEventPosition == null || l.longValue() < this.lastWrittenEventPosition.longValue()) {
                return;
            }
            LOG.info("Current commit position {} is greater then {}, snapshot is valid.", l, this.lastWrittenEventPosition);
            try {
                try {
                    this.pendingSnapshot.persist();
                    this.lastWrittenEventPosition = null;
                    this.takingSnapshot = false;
                    this.pendingSnapshot = null;
                } catch (Exception e) {
                    LOG.error(ERROR_MSG_MOVE_SNAPSHOT, e);
                    this.lastWrittenEventPosition = null;
                    this.takingSnapshot = false;
                    this.pendingSnapshot = null;
                }
            } catch (Throwable th) {
                this.lastWrittenEventPosition = null;
                this.takingSnapshot = false;
                this.pendingSnapshot = null;
                throw th;
            }
        });
    }
}
