package io.zeebe.engine.processor;

import io.zeebe.db.DbContext;
import io.zeebe.db.TransactionOperation;
import io.zeebe.db.ZeebeDbTransaction;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.util.retry.EndlessRetryStrategy;
import io.zeebe.util.retry.RetryStrategy;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/engine/processor/ReProcessingStateMachine.class */
public final class ReProcessingStateMachine {
    private static final Logger LOG;
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{}' with processor '{}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT = "Expected to find last source event position '%d', but last position was '%d'. Failed to reprocess on processor '%s'";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT = "Expected to find last source event position '%d', but found no next event. Failed to reprocess on processor '%s'";
    private static final String LOG_STMT_REPROCESSING_FINISHED = "Processor {} finished reprocessing at event position {}";
    private static final String LOG_STMT_FAILED_ON_PROCESSING = "Event {} failed on processing last time, will call #onError to update workflow instance blacklist.";
    private static final Consumer<Long> NOOP_LONG_CONSUMER;
    public static final Consumer NOOP_SIDE_EFFECT_CONSUMER;
    private final int producerId;
    private final ZeebeState zeebeState;
    private final ActorControl actor;
    private final String streamProcessorName;
    private final Map<ValueType, UnifiedRecordValue> eventCache;
    private final RecordProcessorMap recordProcessorMap;
    private final EventFilter eventFilter;
    private final LogStreamReader logStreamReader;
    private final DbContext dbContext;
    private final RetryStrategy updateStateRetryStrategy;
    private final RetryStrategy processRetryStrategy;
    private final BooleanSupplier abortCondition;
    private long lastSourceEventPosition;
    private ActorFuture<Void> recoveryFuture;
    private LoggedEvent currentEvent;
    private TypedRecordProcessor eventProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ErrorRecord errorRecord = new ErrorRecord();
    protected final RecordMetadata metadata = new RecordMetadata();
    private final TypedEventImpl typedEvent = new TypedEventImpl();
    private final TypedStreamWriter noopstreamWriter = new NoopStreamWriter();
    private final TypedResponseWriter noopResponseWriter = new NoopResponseWriter();
    private final Set<Long> failedEventPositions = new HashSet();

    /* loaded from: input_file:io/zeebe/engine/processor/ReProcessingStateMachine$NoopResponseWriter.class */
    private static final class NoopResponseWriter implements TypedResponseWriter {
        private NoopResponseWriter() {
        }

        @Override // io.zeebe.engine.processor.TypedResponseWriter
        public void writeRejectionOnCommand(TypedRecord<?> typedRecord, RejectionType rejectionType, String str) {
        }

        @Override // io.zeebe.engine.processor.TypedResponseWriter
        public void writeEvent(TypedRecord<?> typedRecord) {
        }

        @Override // io.zeebe.engine.processor.TypedResponseWriter
        public void writeEventOnCommand(long j, Intent intent, UnpackedObject unpackedObject, TypedRecord<?> typedRecord) {
        }

        @Override // io.zeebe.engine.processor.TypedResponseWriter
        public boolean flush() {
            return false;
        }
    }

    /* loaded from: input_file:io/zeebe/engine/processor/ReProcessingStateMachine$NoopStreamWriter.class */
    private static final class NoopStreamWriter implements TypedStreamWriter {
        private NoopStreamWriter() {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void appendRejection(TypedRecord<? extends UnpackedObject> typedRecord, RejectionType rejectionType, String str) {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void appendRejection(TypedRecord<? extends UnpackedObject> typedRecord, RejectionType rejectionType, String str, Consumer<RecordMetadata> consumer) {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void appendNewEvent(long j, Intent intent, UnpackedObject unpackedObject) {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void appendFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject) {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void appendFollowUpEvent(long j, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        }

        @Override // io.zeebe.engine.processor.TypedCommandWriter
        public void appendNewCommand(Intent intent, UnpackedObject unpackedObject) {
        }

        @Override // io.zeebe.engine.processor.TypedCommandWriter
        public void appendFollowUpCommand(long j, Intent intent, UnpackedObject unpackedObject) {
        }

        @Override // io.zeebe.engine.processor.TypedCommandWriter
        public void appendFollowUpCommand(long j, Intent intent, UnpackedObject unpackedObject, Consumer<RecordMetadata> consumer) {
        }

        @Override // io.zeebe.engine.processor.TypedCommandWriter
        public void reset() {
        }

        @Override // io.zeebe.engine.processor.TypedStreamWriter
        public void configureSourceContext(int i, long j) {
        }

        @Override // io.zeebe.engine.processor.TypedCommandWriter
        public long flush() {
            return 0L;
        }
    }

    public ReProcessingStateMachine(ProcessingContext processingContext) {
        this.actor = processingContext.getActor();
        this.streamProcessorName = processingContext.getStreamProcessorName();
        this.eventFilter = processingContext.getEventFilter();
        this.logStreamReader = processingContext.getLogStreamReader();
        this.producerId = processingContext.getProducerId();
        this.eventCache = processingContext.getEventCache();
        this.recordProcessorMap = processingContext.getRecordProcessorMap();
        this.dbContext = processingContext.getDbContext();
        this.zeebeState = processingContext.getZeebeState();
        this.abortCondition = processingContext.getAbortCondition();
        this.updateStateRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.processRetryStrategy = new EndlessRetryStrategy(this.actor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<Void> startRecover(long j) {
        this.recoveryFuture = new CompletableActorFuture();
        long position = this.logStreamReader.getPosition();
        LOG.info("Start scanning the log for error events.");
        this.lastSourceEventPosition = scanLog(j);
        LOG.info("Finished scanning the log for error events.");
        if (this.lastSourceEventPosition > j) {
            LOG.info("Processor {} starts reprocessing, until last source event position {}", this.streamProcessorName, Long.valueOf(this.lastSourceEventPosition));
            this.logStreamReader.seek(position);
            reprocessNextEvent();
        } else {
            this.recoveryFuture.complete((Object) null);
        }
        return this.recoveryFuture;
    }

    private long scanLog(long j) {
        long j2 = -1;
        if (this.logStreamReader.hasNext()) {
            j2 = j;
            while (this.logStreamReader.hasNext()) {
                LoggedEvent loggedEvent = (LoggedEvent) this.logStreamReader.next();
                this.metadata.reset();
                loggedEvent.readMetadata(this.metadata);
                long j3 = -1;
                if (this.metadata.getValueType() == ValueType.ERROR) {
                    loggedEvent.readValue(this.errorRecord);
                    j3 = this.errorRecord.getErrorEventPosition();
                }
                if (j3 >= 0) {
                    LOG.debug("Found error-prone event {} on reprocessing, will add position {} to the blacklist.", loggedEvent, Long.valueOf(j3));
                    this.failedEventPositions.add(Long.valueOf(j3));
                }
                if (loggedEvent.getProducerId() == this.producerId) {
                    long sourceEventPosition = loggedEvent.getSourceEventPosition();
                    if (sourceEventPosition > 0 && sourceEventPosition > j2) {
                        j2 = sourceEventPosition;
                    }
                }
            }
            this.logStreamReader.seek(j + 1);
        }
        return j2;
    }

    private void readNextEvent() {
        if (!this.logStreamReader.hasNext()) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT, Long.valueOf(this.lastSourceEventPosition), this.streamProcessorName));
        }
        this.currentEvent = (LoggedEvent) this.logStreamReader.next();
        if (this.currentEvent.getPosition() > this.lastSourceEventPosition) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT, Long.valueOf(this.lastSourceEventPosition), Long.valueOf(this.currentEvent.getPosition()), this.streamProcessorName));
        }
    }

    private void reprocessNextEvent() {
        try {
            readNextEvent();
            if (this.eventFilter == null || this.eventFilter.applies(this.currentEvent)) {
                reprocessEvent(this.currentEvent);
            } else {
                onRecordReprocessed(this.currentEvent);
            }
        } catch (RuntimeException e) {
            this.recoveryFuture.completeExceptionally(e);
        }
    }

    private void reprocessEvent(LoggedEvent loggedEvent) {
        try {
            this.metadata.reset();
            loggedEvent.readMetadata(this.metadata);
            this.eventProcessor = this.recordProcessorMap.get(this.metadata.getRecordType(), this.metadata.getValueType(), this.metadata.getIntent().value());
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, new Object[]{loggedEvent, this.streamProcessorName, e});
        }
        if (this.eventProcessor == null) {
            onRecordReprocessed(loggedEvent);
            return;
        }
        UnifiedRecordValue unifiedRecordValue = this.eventCache.get(this.metadata.getValueType());
        unifiedRecordValue.reset();
        loggedEvent.readValue(unifiedRecordValue);
        this.typedEvent.wrap(loggedEvent, this.metadata, unifiedRecordValue);
        processUntilDone(loggedEvent.getPosition(), this.typedEvent);
    }

    private void processUntilDone(long j, TypedRecord<?> typedRecord) {
        TransactionOperation chooseOperationForEvent = chooseOperationForEvent(j, typedRecord);
        this.actor.runOnCompletion(this.processRetryStrategy.runWithRetry(() -> {
            if (this.zeebeDbTransaction != null) {
                this.zeebeDbTransaction.rollback();
            }
            this.zeebeDbTransaction = this.dbContext.getCurrentTransaction();
            this.zeebeDbTransaction.run(chooseOperationForEvent);
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            updateStateUntilDone();
        });
    }

    private TransactionOperation chooseOperationForEvent(long j, TypedRecord<?> typedRecord) {
        TransactionOperation transactionOperation;
        if (this.failedEventPositions.contains(Long.valueOf(j))) {
            LOG.info(LOG_STMT_FAILED_ON_PROCESSING, typedRecord);
            transactionOperation = () -> {
                this.zeebeState.tryToBlacklist(typedRecord, NOOP_LONG_CONSUMER);
            };
        } else {
            transactionOperation = () -> {
                if (!this.zeebeState.isOnBlacklist(this.typedEvent)) {
                    this.eventProcessor.processRecord(j, this.typedEvent, this.noopResponseWriter, this.noopstreamWriter, NOOP_SIDE_EFFECT_CONSUMER);
                }
                this.zeebeState.markAsProcessed(j);
            };
        }
        return transactionOperation;
    }

    private void updateStateUntilDone() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            this.zeebeDbTransaction = null;
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            onRecordReprocessed(this.currentEvent);
        });
    }

    private void onRecordReprocessed(LoggedEvent loggedEvent) {
        if (loggedEvent.getPosition() != this.lastSourceEventPosition) {
            this.actor.submit(this::reprocessNextEvent);
        } else {
            LOG.info(LOG_STMT_REPROCESSING_FINISHED, this.streamProcessorName, Long.valueOf(loggedEvent.getPosition()));
            onRecovered();
        }
    }

    private void onRecovered() {
        this.recoveryFuture.complete((Object) null);
        this.failedEventPositions.clear();
    }

    static {
        $assertionsDisabled = !ReProcessingStateMachine.class.desiredAssertionStatus();
        LOG = Loggers.PROCESSOR_LOGGER;
        NOOP_LONG_CONSUMER = l -> {
        };
        NOOP_SIDE_EFFECT_CONSUMER = obj -> {
        };
    }
}
