package io.zeebe.engine.processing.streamprocessor;

import io.zeebe.db.DbContext;
import io.zeebe.db.TransactionOperation;
import io.zeebe.db.ZeebeDbTransaction;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.NoopResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.ReprocessingStreamWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.util.retry.EndlessRetryStrategy;
import io.zeebe.util.retry.RetryStrategy;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/ReProcessingStateMachine.class */
public final class ReProcessingStateMachine {
    private static final Logger LOG;
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{} {}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_FOLLOW_UP_EVENT = "Expected to find last follow-up event position '%d', but last position was '%d'. Failed to reprocess on processor";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT = "Expected to find last follow-up event position '%d', but found no next event. Failed to reprocess on processor";
    private static final String LOG_STMT_REPROCESSING_FINISHED = "Processor finished reprocessing at event position {}";
    private static final String LOG_STMT_FAILED_ON_PROCESSING = "Event {} failed on processing last time, will call #onError to update workflow instance blacklist.";
    private static final String ERROR_INCONSISTENT_LOG = "Expected that position '%d' of current event is higher then position '%d' of last event, but was not. Inconsistent log detected!";
    private static final Consumer<SideEffectProducer> NOOP_SIDE_EFFECT_CONSUMER;
    private static final Consumer<Long> NOOP_LONG_CONSUMER;
    private final ZeebeState zeebeState;
    private final ActorControl actor;
    private final TypedEventImpl typedEvent;
    private final RecordValues recordValues;
    private final RecordProcessorMap recordProcessorMap;
    private final EventFilter eventFilter;
    private final LogStreamReader logStreamReader;
    private final DbContext dbContext;
    private final RetryStrategy updateStateRetryStrategy;
    private final RetryStrategy processRetryStrategy;
    private final BooleanSupplier abortCondition;
    private long lastSourceEventPosition;
    private long lastFollowUpEventPosition;
    private long snapshotPosition;
    private ActorFuture<Long> recoveryFuture;
    private LoggedEvent currentEvent;
    private TypedRecordProcessor eventProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;
    private final boolean detectReprocessingInconsistency;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final RecordMetadata metadata = new RecordMetadata();
    private final ErrorRecord errorRecord = new ErrorRecord();
    private final ReprocessingStreamWriter reprocessingStreamWriter = new ReprocessingStreamWriter();
    private final TypedResponseWriter noopResponseWriter = new NoopResponseWriter();
    private final Set<Long> failedEventPositions = new HashSet();

    public ReProcessingStateMachine(ProcessingContext processingContext) {
        this.actor = processingContext.getActor();
        this.eventFilter = processingContext.getEventFilter();
        this.logStreamReader = processingContext.getLogStreamReader();
        this.recordValues = processingContext.getRecordValues();
        this.recordProcessorMap = processingContext.getRecordProcessorMap();
        this.dbContext = processingContext.getDbContext();
        this.zeebeState = processingContext.getZeebeState();
        this.abortCondition = processingContext.getAbortCondition();
        this.typedEvent = new TypedEventImpl(processingContext.getLogStream().getPartitionId());
        this.updateStateRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.processRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.detectReprocessingInconsistency = processingContext.isDetectReprocessingInconsistency();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<Long> startRecover(long j) {
        this.recoveryFuture = new CompletableActorFuture();
        this.snapshotPosition = j;
        LOG.trace("Start scanning the log for error events.");
        this.lastSourceEventPosition = scanLog(j);
        LOG.trace("Finished scanning the log for error events.");
        if (this.lastSourceEventPosition > j) {
            LOG.info("Processor starts reprocessing, until last source event position {}", Long.valueOf(this.lastSourceEventPosition));
            this.logStreamReader.seekToNextEvent(j);
            reprocessNextEvent();
        } else if (j > 0) {
            this.recoveryFuture.complete(Long.valueOf(j));
        } else {
            this.recoveryFuture.complete(-1L);
        }
        return this.recoveryFuture;
    }

    private long scanLog(long j) {
        long j2 = -1;
        if (this.logStreamReader.hasNext()) {
            j2 = j;
            long j3 = j;
            while (this.logStreamReader.hasNext()) {
                LoggedEvent loggedEvent = (LoggedEvent) this.logStreamReader.next();
                long position = loggedEvent.getPosition();
                if (j3 >= position) {
                    throw new IllegalStateException(String.format(ERROR_INCONSISTENT_LOG, Long.valueOf(position), Long.valueOf(j3)));
                }
                j3 = position;
                this.metadata.reset();
                loggedEvent.readMetadata(this.metadata);
                long j4 = -1;
                if (this.metadata.getValueType() == ValueType.ERROR) {
                    loggedEvent.readValue(this.errorRecord);
                    j4 = this.errorRecord.getErrorEventPosition();
                }
                if (j4 >= 0) {
                    LOG.debug("Found error-prone event {} on reprocessing, will add position {} to the blacklist.", loggedEvent, Long.valueOf(j4));
                    this.failedEventPositions.add(Long.valueOf(j4));
                }
                long sourceEventPosition = loggedEvent.getSourceEventPosition();
                if (sourceEventPosition > 0) {
                    if (sourceEventPosition > j2) {
                        j2 = sourceEventPosition;
                    }
                    if (position > this.lastFollowUpEventPosition) {
                        this.lastFollowUpEventPosition = position;
                    }
                }
            }
            this.logStreamReader.seek(j + 1);
        }
        return j2;
    }

    private void readNextEvent() {
        if (!this.logStreamReader.hasNext()) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT, Long.valueOf(this.lastFollowUpEventPosition)));
        }
        this.currentEvent = (LoggedEvent) this.logStreamReader.next();
        if (this.currentEvent.getPosition() > this.lastFollowUpEventPosition) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_FOLLOW_UP_EVENT, Long.valueOf(this.lastFollowUpEventPosition), Long.valueOf(this.currentEvent.getPosition())));
        }
    }

    private void reprocessNextEvent() {
        try {
            readNextEvent();
            if (this.eventFilter == null || this.eventFilter.applies(this.currentEvent)) {
                reprocessEvent(this.currentEvent);
            } else {
                onRecordReprocessed(this.currentEvent);
            }
        } catch (RuntimeException e) {
            this.recoveryFuture.completeExceptionally(new ProcessingException("Unable to reprocess record", this.currentEvent, this.metadata, e));
        }
    }

    private void reprocessEvent(LoggedEvent loggedEvent) {
        try {
            this.metadata.reset();
            loggedEvent.readMetadata(this.metadata);
            this.eventProcessor = this.recordProcessorMap.get(this.metadata.getRecordType(), this.metadata.getValueType(), this.metadata.getIntent().value());
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, new Object[]{loggedEvent, this.metadata, e});
        }
        if (this.eventProcessor == null) {
            onRecordReprocessed(loggedEvent);
            return;
        }
        this.typedEvent.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
        if (this.detectReprocessingInconsistency) {
            verifyRecordMatchesToReprocessing(this.typedEvent);
        }
        if (loggedEvent.getPosition() > this.lastSourceEventPosition) {
            onRecordReprocessed(loggedEvent);
        } else {
            this.reprocessingStreamWriter.configureSourceContext(loggedEvent.getPosition());
            processUntilDone(loggedEvent.getPosition(), this.typedEvent);
        }
    }

    private void processUntilDone(long j, TypedRecord<?> typedRecord) {
        TransactionOperation chooseOperationForEvent = chooseOperationForEvent(j, typedRecord);
        this.actor.runOnCompletion(this.processRetryStrategy.runWithRetry(() -> {
            if (this.zeebeDbTransaction != null) {
                this.zeebeDbTransaction.rollback();
            }
            this.zeebeDbTransaction = this.dbContext.getCurrentTransaction();
            this.zeebeDbTransaction.run(chooseOperationForEvent);
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            updateStateUntilDone();
        });
    }

    private TransactionOperation chooseOperationForEvent(long j, TypedRecord<?> typedRecord) {
        TransactionOperation transactionOperation;
        if (this.failedEventPositions.contains(Long.valueOf(j))) {
            LOG.info(LOG_STMT_FAILED_ON_PROCESSING, typedRecord);
            transactionOperation = () -> {
                this.zeebeState.tryToBlacklist(typedRecord, NOOP_LONG_CONSUMER);
            };
        } else {
            transactionOperation = () -> {
                if (!this.zeebeState.isOnBlacklist(this.typedEvent)) {
                    this.eventProcessor.processRecord(j, this.typedEvent, this.noopResponseWriter, this.reprocessingStreamWriter, NOOP_SIDE_EFFECT_CONSUMER);
                }
                this.zeebeState.markAsProcessed(j);
            };
        }
        return transactionOperation;
    }

    private void updateStateUntilDone() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            this.zeebeDbTransaction = null;
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            onRecordReprocessed(this.currentEvent);
        });
    }

    private void onRecordReprocessed(LoggedEvent loggedEvent) {
        this.reprocessingStreamWriter.removeRecord(loggedEvent.getKey(), loggedEvent.getSourceEventPosition());
        if (loggedEvent.getPosition() < this.lastFollowUpEventPosition) {
            this.actor.submit(this::reprocessNextEvent);
            return;
        }
        LOG.info(LOG_STMT_REPROCESSING_FINISHED, Long.valueOf(loggedEvent.getPosition()));
        this.logStreamReader.seekToNextEvent(this.lastSourceEventPosition);
        onRecovered(this.lastSourceEventPosition);
    }

    private void onRecovered(long j) {
        this.recoveryFuture.complete(Long.valueOf(j));
        this.failedEventPositions.clear();
    }

    private void verifyRecordMatchesToReprocessing(TypedRecord<?> typedRecord) {
        if (typedRecord.getSourceRecordPosition() < 0 || typedRecord.getSourceRecordPosition() <= this.snapshotPosition) {
            return;
        }
        this.reprocessingStreamWriter.getRecords().stream().filter(reprocessingRecord -> {
            return reprocessingRecord.getSourceRecordPosition() < typedRecord.getSourceRecordPosition();
        }).findFirst().ifPresent(reprocessingRecord2 -> {
            throw new InconsistentReprocessingException("Records were created on reprocessing but not written on the log stream.", this.typedEvent, reprocessingRecord2);
        });
        this.reprocessingStreamWriter.getRecords().stream().filter(reprocessingRecord3 -> {
            return reprocessingRecord3.getSourceRecordPosition() == typedRecord.getSourceRecordPosition();
        }).findFirst().ifPresent(reprocessingRecord4 -> {
            if (reprocessingRecord4.getKey() != typedRecord.getKey()) {
                throw new InconsistentReprocessingException("The key of the record on the log stream doesn't match to the record from reprocessing.", this.typedEvent, reprocessingRecord4);
            }
            if (reprocessingRecord4.getIntent() != typedRecord.getIntent()) {
                throw new InconsistentReprocessingException("The intent of the record on the log stream doesn't match to the record from reprocessing.", this.typedEvent, reprocessingRecord4);
            }
        });
    }

    static {
        $assertionsDisabled = !ReProcessingStateMachine.class.desiredAssertionStatus();
        LOG = Loggers.PROCESSOR_LOGGER;
        NOOP_SIDE_EFFECT_CONSUMER = sideEffectProducer -> {
        };
        NOOP_LONG_CONSUMER = l -> {
        };
    }
}
