package io.zeebe.engine.processor;

import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.spi.SnapshotController;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.SchedulingHints;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/engine/processor/AsyncSnapshotDirector.class */
public class AsyncSnapshotDirector extends Actor {
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private static final String LOG_MSG_WAIT_UNTIL_COMMITTED = "Finished taking snapshot, need to wait until last written event position {} is committed, current commit position is {}. After that snapshot can be marked as valid.";
    private static final String ERROR_MSG_ON_RESOLVE_PROCESSED_POS = "Unexpected error in resolving last processed position.";
    private static final String ERROR_MSG_ON_RESOLVE_WRITTEN_POS = "Unexpected error in resolving last written position.";
    private static final String ERROR_MSG_MOVE_SNAPSHOT = "Unexpected exception occurred on moving valid snapshot.";
    private static final String LOG_MSG_ENFORCE_SNAPSHOT = "Enforce snapshot creation. Last successful processed position is {}.";
    private static final String ERROR_MSG_ENFORCED_SNAPSHOT = "Unexpected exception occurred on creating snapshot, was enforced to do so.";
    private static final int INITIAL_POSITION = -1;
    private final SnapshotController snapshotController;
    private final LogStream logStream;
    private final String name;
    private final Duration snapshotRate;
    private final SnapshotMetrics metrics;
    private final String processorName;
    private final StreamProcessor streamProcessor;
    private ActorCondition commitCondition;
    private boolean pendingSnapshot;
    private long lowerBoundSnapshotPosition;
    private long lastValidSnapshotPosition;
    private final Runnable prepareTakingSnapshot = this::prepareTakingSnapshot;
    private long lastWrittenEventPosition = -1;

    public AsyncSnapshotDirector(StreamProcessor streamProcessor, SnapshotController snapshotController, LogStream logStream, Duration duration, SnapshotMetrics snapshotMetrics) {
        this.streamProcessor = streamProcessor;
        this.snapshotController = snapshotController;
        this.logStream = logStream;
        this.processorName = streamProcessor.getName();
        this.name = this.processorName + "-snapshot-director";
        this.snapshotRate = duration;
        this.metrics = snapshotMetrics;
    }

    protected void onActorStarting() {
        this.actor.setSchedulingHints(SchedulingHints.ioBound());
        this.actor.runAtFixedRate(this.snapshotRate, this.prepareTakingSnapshot);
        this.commitCondition = this.actor.onCondition(getConditionNameForPosition(), this::onCommitCheck);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.commitCondition);
        this.lastValidSnapshotPosition = this.snapshotController.getLastValidSnapshotPosition();
        LOG.debug("The position of the last valid snapshot is '{}'. Taking snapshots beyond this position.", Long.valueOf(this.lastValidSnapshotPosition));
    }

    public String getName() {
        return this.name;
    }

    private String getConditionNameForPosition() {
        return getName() + "-wait-for-endPosition-committed";
    }

    private void prepareTakingSnapshot() {
        if (this.pendingSnapshot) {
            return;
        }
        this.actor.runOnCompletion(this.streamProcessor.getLastProcessedPositionAsync(), (l, th) -> {
            if (th != null) {
                LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, th);
            } else if (l.longValue() <= this.lastValidSnapshotPosition) {
                LOG.debug("No changes since last snapshot we will skip snapshot creation. Last valid snapshot position {}, new lower bound position {}", Long.valueOf(this.lastValidSnapshotPosition), l);
            } else {
                this.lowerBoundSnapshotPosition = l.longValue();
                takeSnapshot();
            }
        });
    }

    private void takeSnapshot() {
        this.pendingSnapshot = true;
        SnapshotController snapshotController = this.snapshotController;
        snapshotController.getClass();
        createSnapshot(snapshotController::takeTempSnapshot);
        this.actor.runOnCompletion(this.streamProcessor.getLastWrittenPositionAsync(), (l, th) -> {
            if (th != null) {
                this.pendingSnapshot = false;
                LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, th);
            } else {
                long commitPosition = this.logStream.getCommitPosition();
                this.lastWrittenEventPosition = l.longValue();
                LOG.debug(LOG_MSG_WAIT_UNTIL_COMMITTED, l, Long.valueOf(commitPosition));
                onCommitCheck();
            }
        });
    }

    private void createSnapshot(Runnable runnable) {
        long currentTimeMillis = System.currentTimeMillis();
        runnable.run();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LOG.info("Creation of snapshot for {} took {} ms.", this.processorName, Long.valueOf(currentTimeMillis2));
        this.metrics.recordSnapshotCreationTime(currentTimeMillis2);
    }

    private void onCommitCheck() {
        long commitPosition = this.logStream.getCommitPosition();
        if (this.pendingSnapshot) {
            try {
                if (commitPosition >= this.lastWrittenEventPosition) {
                    try {
                        this.lastValidSnapshotPosition = this.lowerBoundSnapshotPosition;
                        this.snapshotController.moveValidSnapshot(this.lowerBoundSnapshotPosition);
                        SnapshotController snapshotController = this.snapshotController;
                        ActorControl actorControl = this.actor;
                        actorControl.getClass();
                        snapshotController.replicateLatestSnapshot(actorControl::submit);
                        this.pendingSnapshot = false;
                    } catch (Exception e) {
                        LOG.error(ERROR_MSG_MOVE_SNAPSHOT, e);
                        this.pendingSnapshot = false;
                    }
                }
            } catch (Throwable th) {
                this.pendingSnapshot = false;
                throw th;
            }
        }
    }

    protected void enforceSnapshotCreation(long j, long j2) {
        if (this.logStream.getCommitPosition() < j || j2 <= this.lastValidSnapshotPosition) {
            return;
        }
        LOG.debug(LOG_MSG_ENFORCE_SNAPSHOT, Long.valueOf(j2));
        try {
            createSnapshot(() -> {
                this.snapshotController.takeSnapshot(j2);
            });
        } catch (Exception e) {
            LOG.error(ERROR_MSG_ENFORCED_SNAPSHOT, e);
        }
    }

    protected void onActorCloseRequested() {
        this.logStream.removeOnCommitPositionUpdatedCondition(this.commitCondition);
    }

    public ActorFuture<Void> closeAsync() {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            this.actor.runOnCompletion(this.streamProcessor.getLastWrittenPositionAsync(), (l, th) -> {
                if (th == null) {
                    this.actor.runOnCompletion(this.streamProcessor.getLastProcessedPositionAsync(), (l, th) -> {
                        if (th == null) {
                            enforceSnapshotCreation(l.longValue(), l.longValue());
                            close();
                            completableActorFuture.complete((Object) null);
                        } else {
                            LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, th);
                            close();
                            completableActorFuture.completeExceptionally(th);
                        }
                    });
                    return;
                }
                LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, th);
                close();
                completableActorFuture.completeExceptionally(th);
            });
        });
        return completableActorFuture;
    }

    private void close() {
        this.metrics.close();
        this.actor.close();
    }
}
