package io.zeebe.engine.processing.streamprocessor;

import io.zeebe.db.TransactionContext;
import io.zeebe.db.ZeebeDbTransaction;
import io.zeebe.engine.metrics.StreamProcessorMetrics;
import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriterImpl;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.engine.state.mutable.MutableBlackListState;
import io.zeebe.engine.state.mutable.MutableLastProcessedPositionState;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.intent.ErrorIntent;
import io.zeebe.util.exception.RecoverableException;
import io.zeebe.util.retry.AbortableRetryStrategy;
import io.zeebe.util.retry.RecoverableRetryStrategy;
import io.zeebe.util.retry.RetryStrategy;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/ProcessingStateMachine.class */
public final class ProcessingStateMachine {
    public static final String ERROR_MESSAGE_WRITE_EVENT_ABORTED = "Expected to write one or more follow up events for event '{}' without errors, but exception was thrown.";
    private static final String ERROR_MESSAGE_ROLLBACK_ABORTED = "Expected to roll back the current transaction for event '{}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED = "Expected to execute side effects for event '{}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_UPDATE_STATE_FAILED = "Expected to successfully update state for event '{}', but caught an exception. Retry.";
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT = "Expected to successfully process event '{}' with processor, but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING = "Expected to process event '{}' successfully on stream processor, but caught recoverable exception. Retry processing.";
    private static final String PROCESSING_ERROR_MESSAGE = "Expected to process event '%s' without errors, but exception occurred with message '%s' .";
    private static final String NOTIFY_PROCESSED_LISTENER_ERROR_MESSAGE = "Expected to invoke processed listener for record {} successfully, but exception was thrown.";
    private static final String NOTIFY_SKIPPED_LISTENER_ERROR_MESSAGE = "Expected to invoke skipped listener for record {} successfully, but exception was thrown.";
    private static final String LOG_ERROR_EVENT_COMMITTED = "Error event was committed, we continue with processing.";
    private static final String LOG_ERROR_EVENT_WRITTEN = "Error record was written at {}, we will continue with processing if event was committed. Current commit position is {}.";
    private final ZeebeState zeebeState;
    private final MutableLastProcessedPositionState lastProcessedPositionState;
    private final TypedResponseWriterImpl responseWriter;
    private final ActorControl actor;
    private final LogStream logStream;
    private final LogStreamReader logStreamReader;
    private final TypedStreamWriter logStreamWriter;
    private final TransactionContext transactionContext;
    private final RetryStrategy writeRetryStrategy;
    private final RetryStrategy sideEffectsRetryStrategy;
    private final RetryStrategy updateStateRetryStrategy;
    private final BooleanSupplier shouldProcessNext;
    private final BooleanSupplier abortCondition;
    private final RecordValues recordValues;
    private final RecordProcessorMap recordProcessorMap;
    private final TypedEventImpl typedEvent;
    private final StreamProcessorMetrics metrics;
    private final Consumer<TypedRecord> onProcessedListener;
    private final Consumer<LoggedEvent> onSkippedListener;
    private SideEffectProducer sideEffectProducer;
    private LoggedEvent currentEvent;
    private TypedRecordProcessor<?> currentProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;
    private boolean onErrorHandling;
    private volatile boolean onErrorHandlingLoop;
    private int onErrorRetries;
    private long processingStartTime;
    private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
    private static final Duration PROCESSING_RETRY_DELAY = Duration.ofMillis(250);
    private static final MetadataFilter PROCESSING_FILTER = recordMetadata -> {
        return recordMetadata.getRecordType() == RecordType.COMMAND || !MigratedStreamProcessors.isMigrated(recordMetadata.getValueType());
    };
    private final EventFilter eventFilter = new MetadataEventFilter(new RecordProtocolVersionFilter().and(PROCESSING_FILTER));
    private final RecordMetadata metadata = new RecordMetadata();
    private final ErrorRecord errorRecord = new ErrorRecord();
    private long writtenEventPosition = -1;
    private long lastSuccessfulProcessedEventPosition = -1;
    private long lastWrittenEventPosition = -1;
    private long errorRecordPosition = -1;

    public ProcessingStateMachine(ProcessingContext processingContext, BooleanSupplier booleanSupplier) {
        this.actor = processingContext.getActor();
        this.recordProcessorMap = processingContext.getRecordProcessorMap();
        this.recordValues = processingContext.getRecordValues();
        this.logStreamReader = processingContext.getLogStreamReader();
        this.logStreamWriter = processingContext.getLogStreamWriter();
        this.logStream = processingContext.getLogStream();
        this.zeebeState = processingContext.getZeebeState();
        this.transactionContext = processingContext.getTransactionContext();
        this.abortCondition = processingContext.getAbortCondition();
        this.lastProcessedPositionState = processingContext.getLastProcessedPositionState();
        this.writeRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.sideEffectsRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.updateStateRetryStrategy = new RecoverableRetryStrategy(this.actor);
        this.shouldProcessNext = booleanSupplier;
        int partitionId = this.logStream.getPartitionId();
        this.typedEvent = new TypedEventImpl(partitionId);
        this.responseWriter = new TypedResponseWriterImpl(processingContext.getWriters().response(), partitionId);
        this.metrics = new StreamProcessorMetrics(partitionId);
        this.onProcessedListener = processingContext.getOnProcessedListener();
        this.onSkippedListener = processingContext.getOnSkippedListener();
    }

    private void skipRecord() {
        notifySkippedListener(this.currentEvent);
        this.actor.submit(this::readNextEvent);
        this.metrics.eventSkipped();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readNextEvent() {
        if (this.onErrorRetries > 0) {
            this.onErrorHandlingLoop = false;
            this.onErrorRetries = 0;
        }
        if (this.onErrorHandling) {
            this.logStream.getCommitPositionAsync().onComplete((l, th) -> {
                if (th != null) {
                    LOG.error("Error on retrieving commit position", th);
                } else if (l.longValue() >= this.errorRecordPosition) {
                    LOG.info(LOG_ERROR_EVENT_COMMITTED);
                    this.onErrorHandling = false;
                    tryToReadNextEvent();
                }
            });
        } else {
            tryToReadNextEvent();
        }
    }

    private void tryToReadNextEvent() {
        if (this.shouldProcessNext.getAsBoolean() && this.logStreamReader.hasNext() && this.currentProcessor == null) {
            this.currentEvent = (LoggedEvent) this.logStreamReader.next();
            if (this.eventFilter.applies(this.currentEvent)) {
                processEvent(this.currentEvent);
            } else {
                skipRecord();
            }
        }
    }

    private void processEvent(LoggedEvent loggedEvent) {
        this.metadata.reset();
        loggedEvent.readMetadata(this.metadata);
        this.currentProcessor = chooseNextProcessor(loggedEvent);
        if (this.currentProcessor == null) {
            skipRecord();
            return;
        }
        this.processingStartTime = ActorClock.currentTimeMillis();
        try {
            this.typedEvent.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
            if (MigratedStreamProcessors.isMigrated(this.typedEvent) && this.typedEvent.getRecordType() != RecordType.COMMAND) {
                this.currentProcessor = null;
                skipRecord();
            } else {
                this.metrics.processingLatency(this.metadata.getRecordType(), loggedEvent.getTimestamp(), this.processingStartTime);
                processInTransaction(this.typedEvent);
                this.metrics.eventProcessed();
                writeEvent();
            }
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, loggedEvent, e);
            onError(e, this::writeEvent);
        } catch (RecoverableException e2) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING, loggedEvent, e2);
            this.actor.runDelayed(PROCESSING_RETRY_DELAY, () -> {
                processEvent(this.currentEvent);
            });
        }
    }

    private TypedRecordProcessor<?> chooseNextProcessor(LoggedEvent loggedEvent) {
        TypedRecordProcessor typedRecordProcessor = null;
        try {
            typedRecordProcessor = this.recordProcessorMap.get(this.metadata.getRecordType(), this.metadata.getValueType(), this.metadata.getIntent().value());
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, loggedEvent, e);
        }
        return typedRecordProcessor;
    }

    private void processInTransaction(TypedEventImpl typedEventImpl) throws Exception {
        this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
        this.zeebeDbTransaction.run(() -> {
            long position = typedEventImpl.getPosition();
            resetOutput(position);
            this.sideEffectProducer = this.responseWriter;
            if (!this.zeebeState.getBlackListState().isOnBlacklist(typedEventImpl)) {
                this.currentProcessor.processRecord(position, typedEventImpl, this.responseWriter, this.logStreamWriter, this::setSideEffectProducer);
            }
            this.lastProcessedPositionState.markAsProcessed(position);
        });
    }

    private void resetOutput(long j) {
        this.responseWriter.reset();
        this.logStreamWriter.reset();
        this.logStreamWriter.configureSourceContext(j);
    }

    public void setSideEffectProducer(SideEffectProducer sideEffectProducer) {
        this.sideEffectProducer = sideEffectProducer;
    }

    private void onError(Throwable th, Runnable runnable) {
        this.onErrorRetries++;
        if (this.onErrorRetries > 1) {
            this.onErrorHandlingLoop = true;
        }
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.rollback();
            return true;
        }, this.abortCondition), (bool, th2) -> {
            if (th2 != null) {
                LOG.error(ERROR_MESSAGE_ROLLBACK_ABORTED, this.currentEvent, th2);
            }
            try {
                errorHandlingInTransaction(th);
                this.onErrorHandling = true;
                runnable.run();
            } catch (Exception e) {
                onError(e, runnable);
            }
        });
    }

    private void errorHandlingInTransaction(Throwable th) throws Exception {
        this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
        this.zeebeDbTransaction.run(() -> {
            long position = this.typedEvent.getPosition();
            resetOutput(position);
            writeRejectionOnCommand(th);
            this.errorRecord.initErrorRecord(th, position);
            MutableBlackListState blackListState = this.zeebeState.getBlackListState();
            TypedEventImpl typedEventImpl = this.typedEvent;
            ErrorRecord errorRecord = this.errorRecord;
            Objects.requireNonNull(errorRecord);
            blackListState.tryToBlacklist(typedEventImpl, (v1) -> {
                r2.setWorkflowInstanceKey(v1);
            });
            this.logStreamWriter.appendFollowUpEvent(this.typedEvent.getKey(), ErrorIntent.CREATED, this.errorRecord);
        });
    }

    private void writeRejectionOnCommand(Throwable th) {
        String format = String.format(PROCESSING_ERROR_MESSAGE, this.typedEvent, th.getMessage());
        LOG.error(format, th);
        if (this.typedEvent.getRecordType() == RecordType.COMMAND) {
            this.logStreamWriter.appendRejection(this.typedEvent, RejectionType.PROCESSING_ERROR, format);
            this.responseWriter.writeRejectionOnCommand(this.typedEvent, RejectionType.PROCESSING_ERROR, format);
        }
    }

    private void writeEvent() {
        this.actor.runOnCompletion(this.writeRetryStrategy.runWithRetry(() -> {
            long flush = this.logStreamWriter.flush();
            if (flush > 0) {
                this.writtenEventPosition = flush;
            }
            return flush >= 0;
        }, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_WRITE_EVENT_ABORTED, this.currentEvent, th);
                onError(th, this::writeEvent);
            } else {
                updateState();
                this.metrics.eventWritten();
            }
        });
    }

    private void updateState() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            if (this.onErrorHandling) {
                this.errorRecordPosition = this.writtenEventPosition;
                this.logStream.getCommitPositionAsync().onComplete((l, th) -> {
                    if (th == null) {
                        LOG.info(LOG_ERROR_EVENT_WRITTEN, Long.valueOf(this.errorRecordPosition), l);
                    }
                });
            }
            this.lastSuccessfulProcessedEventPosition = this.currentEvent.getPosition();
            this.metrics.setLastProcessedPosition(this.lastSuccessfulProcessedEventPosition);
            this.lastWrittenEventPosition = this.writtenEventPosition;
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (th == null) {
                executeSideEffects();
            } else {
                LOG.error(ERROR_MESSAGE_UPDATE_STATE_FAILED, this.currentEvent, th);
                onError(th, this::updateState);
            }
        });
    }

    private void executeSideEffects() {
        RetryStrategy retryStrategy = this.sideEffectsRetryStrategy;
        SideEffectProducer sideEffectProducer = this.sideEffectProducer;
        Objects.requireNonNull(sideEffectProducer);
        this.actor.runOnCompletion(retryStrategy.runWithRetry(sideEffectProducer::flush, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED, this.currentEvent, th);
            }
            notifyProcessedListener(this.typedEvent);
            this.metrics.processingDuration(this.metadata.getRecordType(), this.processingStartTime, ActorClock.currentTimeMillis());
            this.currentProcessor = null;
            this.actor.submit(this::readNextEvent);
        });
    }

    private void notifyProcessedListener(TypedRecord typedRecord) {
        try {
            this.onProcessedListener.accept(typedRecord);
        } catch (Exception e) {
            LOG.error(NOTIFY_PROCESSED_LISTENER_ERROR_MESSAGE, typedRecord, e);
        }
    }

    private void notifySkippedListener(LoggedEvent loggedEvent) {
        try {
            this.onSkippedListener.accept(loggedEvent);
        } catch (Exception e) {
            LOG.error(NOTIFY_SKIPPED_LISTENER_ERROR_MESSAGE, loggedEvent, e);
        }
    }

    public long getLastSuccessfulProcessedEventPosition() {
        return this.lastSuccessfulProcessedEventPosition;
    }

    public long getLastWrittenEventPosition() {
        return this.lastWrittenEventPosition;
    }

    public boolean isMakingProgress() {
        return !this.onErrorHandlingLoop;
    }

    public void startProcessing(long j) {
        if (this.lastSuccessfulProcessedEventPosition == -1) {
            this.lastSuccessfulProcessedEventPosition = j;
        }
        this.actor.submit(this::readNextEvent);
    }
}
