package io.zeebe.engine.processor;

import io.zeebe.db.DbContext;
import io.zeebe.db.ZeebeDb;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamBatchWriter;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.util.LangUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/engine/processor/StreamProcessor.class */
public class StreamProcessor extends Actor {
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final ActorScheduler actorScheduler;
    private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
    private final LogStream logStream;
    private final int partitionId;
    private final ZeebeDb zeebeDb;
    private final ProcessingContext processingContext;
    private final TypedRecordProcessorFactory typedRecordProcessorFactory;
    private LogStreamReader logStreamReader;
    private ActorCondition onCommitPositionUpdatedCondition;
    private ProcessingStateMachine processingStateMachine;
    private CompletableActorFuture<Void> openFuture;
    private final String actorName;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private long snapshotPosition = -1;
    private Phase phase = Phase.REPROCESSING;
    private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed((Object) null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/engine/processor/StreamProcessor$Phase.class */
    public enum Phase {
        REPROCESSING,
        PROCESSING,
        FAILED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamProcessor(StreamProcessorBuilder streamProcessorBuilder) {
        this.actorScheduler = streamProcessorBuilder.getActorScheduler();
        this.lifecycleAwareListeners = streamProcessorBuilder.getLifecycleListeners();
        this.typedRecordProcessorFactory = streamProcessorBuilder.getTypedRecordProcessorFactory();
        this.zeebeDb = streamProcessorBuilder.getZeebeDb();
        this.processingContext = streamProcessorBuilder.getProcessingContext().eventCache(new RecordValues()).actor(this.actor).abortCondition(this::isClosed);
        this.logStream = this.processingContext.getLogStream();
        this.partitionId = this.logStream.getPartitionId();
        this.actorName = buildActorName(streamProcessorBuilder.getNodeId(), "StreamProcessor-" + this.partitionId);
    }

    public static StreamProcessorBuilder builder() {
        return new StreamProcessorBuilder();
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamBatchWriter(), this::onRetrievingWriter);
    }

    private void onRetrievingWriter(LogStreamBatchWriter logStreamBatchWriter, Throwable th) {
        if (th == null) {
            this.processingContext.maxFragmentSize(logStreamBatchWriter.getMaxFragmentLength()).logStreamWriter(new TypedStreamWriterImpl(logStreamBatchWriter));
            this.actor.runOnCompletionBlockingCurrentPhase(this.logStream.newLogStreamReader(), this::onRetrievingReader);
        } else {
            LOG.error("Unexpected error on retrieving batch writer from log stream.", th);
            this.actor.close();
        }
    }

    private void onRetrievingReader(LogStreamReader logStreamReader, Throwable th) {
        if (th == null) {
            this.logStreamReader = logStreamReader;
            this.processingContext.logStreamReader(logStreamReader);
        } else {
            LOG.error("Unexpected error on retrieving reader from log stream.", th);
            this.actor.close();
        }
    }

    protected void onActorStarted() {
        try {
            LOG.debug("Recovering state of partition {} from snapshot", Integer.valueOf(this.partitionId));
            this.snapshotPosition = recoverFromSnapshot();
            initProcessors();
        } catch (Throwable th) {
            onFailure(th);
            LangUtil.rethrowUnchecked(th);
        }
        try {
            this.processingStateMachine = new ProcessingStateMachine(this.processingContext, this::isOpened);
            this.openFuture.complete((Object) null);
            this.actor.runOnCompletion(new ReProcessingStateMachine(this.processingContext).startRecover(this.snapshotPosition), (r5, th2) -> {
                if (th2 == null) {
                    onRecovered();
                } else {
                    LOG.error("Unexpected error on recovery happens.", th2);
                    onFailure(th2);
                }
            });
        } catch (RuntimeException e) {
            onFailure(e);
            throw e;
        }
    }

    protected void onActorClosing() {
        this.processingContext.getLogStreamReader().close();
        this.processingContext.getLogStreamWriter().close();
        if (this.onCommitPositionUpdatedCondition != null) {
            this.logStream.removeOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
            this.onCommitPositionUpdatedCondition = null;
        }
    }

    protected void onActorClosed() {
        this.closeFuture.complete((Object) null);
        LOG.debug("Closed stream processor controller {}.", getName());
    }

    protected void onActorCloseRequested() {
        if (isFailed()) {
            return;
        }
        this.lifecycleAwareListeners.forEach((v0) -> {
            v0.onClose();
        });
    }

    public ActorFuture<Void> openAsync() {
        if (this.isOpened.compareAndSet(false, true)) {
            this.openFuture = new CompletableActorFuture<>();
            this.actorScheduler.submitActor(this);
        }
        return this.openFuture;
    }

    private void initProcessors() {
        TypedRecordProcessors createProcessors = this.typedRecordProcessorFactory.createProcessors(this.processingContext);
        this.lifecycleAwareListeners.addAll(createProcessors.getLifecycleListeners());
        RecordProcessorMap recordProcessorMap = createProcessors.getRecordProcessorMap();
        Iterator<TypedRecordProcessor> values = recordProcessorMap.values();
        List<StreamProcessorLifecycleAware> list = this.lifecycleAwareListeners;
        Objects.requireNonNull(list);
        values.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        this.processingContext.recordProcessorMap(recordProcessorMap);
    }

    private long recoverFromSnapshot() {
        long lastSuccessfulProcessedRecordPosition = recoverState().getLastSuccessfulProcessedRecordPosition();
        if (!this.logStreamReader.seekToNextEvent(lastSuccessfulProcessedRecordPosition)) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, Long.valueOf(lastSuccessfulProcessedRecordPosition), getName()));
        }
        LOG.info("Recovered state of partition {} from snapshot at position {}", Integer.valueOf(this.partitionId), Long.valueOf(lastSuccessfulProcessedRecordPosition));
        return lastSuccessfulProcessedRecordPosition;
    }

    private ZeebeState recoverState() {
        DbContext createContext = this.zeebeDb.createContext();
        ZeebeState zeebeState = new ZeebeState(this.partitionId, this.zeebeDb, createContext);
        this.processingContext.dbContext(createContext);
        this.processingContext.zeebeState(zeebeState);
        return zeebeState;
    }

    private void onRecovered() {
        this.phase = Phase.PROCESSING;
        ActorControl actorControl = this.actor;
        String str = getName() + "-on-commit-position-updated";
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        this.onCommitPositionUpdatedCondition = actorControl.onCondition(str, processingStateMachine::readNextEvent);
        this.logStream.registerOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
        this.lifecycleAwareListeners.forEach(streamProcessorLifecycleAware -> {
            streamProcessorLifecycleAware.onRecovered(this.processingContext);
        });
        ActorControl actorControl2 = this.actor;
        ProcessingStateMachine processingStateMachine2 = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine2);
        actorControl2.submit(processingStateMachine2::readNextEvent);
    }

    public ActorFuture<Void> closeAsync() {
        if (this.isOpened.compareAndSet(true, false)) {
            this.closeFuture = new CompletableActorFuture<>();
            this.actor.close();
        }
        return this.closeFuture;
    }

    private void onFailure(Throwable th) {
        this.phase = Phase.FAILED;
        this.openFuture.completeExceptionally(th);
        this.closeFuture = new CompletableActorFuture<>();
        this.isOpened.set(false);
        this.actor.close();
    }

    public boolean isOpened() {
        return this.isOpened.get();
    }

    public boolean isClosed() {
        return !this.isOpened.get();
    }

    public boolean isFailed() {
        return this.phase == Phase.FAILED;
    }

    public ActorFuture<Long> getLastProcessedPositionAsync() {
        ActorControl actorControl = this.actor;
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        return actorControl.call(processingStateMachine::getLastSuccessfulProcessedEventPosition);
    }

    public ActorFuture<Long> getLastWrittenPositionAsync() {
        ActorControl actorControl = this.actor;
        ProcessingStateMachine processingStateMachine = this.processingStateMachine;
        Objects.requireNonNull(processingStateMachine);
        return actorControl.call(processingStateMachine::getLastWrittenEventPosition);
    }
}
