package io.zeebe.engine.processing.streamprocessor;

import io.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.zeebe.engine.state.mutable.MutableZeebeState;
import io.zeebe.engine.util.StreamProcessorRule;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.RecordValue;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.health.HealthStatus;
import io.zeebe.util.sched.Actor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest.class */
public class StreamProcessorHealthTest {

    @Rule
    public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
    private StreamProcessor streamProcessor;
    private TypedStreamWriter mockedLogStreamWriter;
    private AtomicBoolean shouldFlushThrowException;
    private AtomicInteger invocation;
    private AtomicBoolean shouldFailErrorHandlingInTransaction;
    private AtomicBoolean shouldProcessingThrowException;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest$HealthStatusCheck.class */
    public static final class HealthStatusCheck extends Actor {
        private final StreamProcessor streamProcessor;

        private HealthStatusCheck(StreamProcessor streamProcessor) {
            this.streamProcessor = streamProcessor;
        }

        public static HealthStatusCheck of(StreamProcessor streamProcessor) {
            return new HealthStatusCheck(streamProcessor);
        }

        public boolean hasHealthStatus(HealthStatus healthStatus) {
            return ((Boolean) this.actor.call(() -> {
                return Boolean.valueOf(this.streamProcessor.getHealthStatus() == healthStatus);
            }).join(5L, TimeUnit.SECONDS)).booleanValue();
        }
    }

    /* loaded from: input_file:io/zeebe/engine/processing/streamprocessor/StreamProcessorHealthTest$WrappedStreamWriter.class */
    private final class WrappedStreamWriter implements TypedStreamWriter {
        private WrappedStreamWriter() {
        }

        public void appendRejection(TypedRecord<? extends RecordValue> typedRecord, RejectionType rejectionType, String str) {
        }

        public void appendRejection(TypedRecord<? extends RecordValue> typedRecord, RejectionType rejectionType, String str, UnaryOperator<RecordMetadata> unaryOperator) {
        }

        public void configureSourceContext(long j) {
        }

        public void appendFollowUpEvent(long j, Intent intent, RecordValue recordValue) {
            if (StreamProcessorHealthTest.this.shouldFailErrorHandlingInTransaction.get()) {
                throw new RuntimeException("Expected failure on append followup event");
            }
        }

        public void appendFollowUpEvent(long j, Intent intent, RecordValue recordValue, UnaryOperator<RecordMetadata> unaryOperator) {
            if (StreamProcessorHealthTest.this.shouldFailErrorHandlingInTransaction.get()) {
                throw new RuntimeException("Expected failure on append followup event");
            }
        }

        public void appendNewCommand(Intent intent, RecordValue recordValue) {
        }

        public void appendFollowUpCommand(long j, Intent intent, RecordValue recordValue) {
        }

        public void appendFollowUpCommand(long j, Intent intent, RecordValue recordValue, UnaryOperator<RecordMetadata> unaryOperator) {
        }

        public void reset() {
        }

        public long flush() {
            if (StreamProcessorHealthTest.this.shouldFlushThrowException.get()) {
                throw new RuntimeException("Expected failure on flush");
            }
            return 1L;
        }
    }

    @Before
    public void before() {
        this.invocation = new AtomicInteger();
        this.shouldFlushThrowException = new AtomicBoolean();
        this.shouldFailErrorHandlingInTransaction = new AtomicBoolean();
        this.shouldProcessingThrowException = new AtomicBoolean();
    }

    @After
    public void tearDown() {
        this.shouldFlushThrowException.set(false);
        this.shouldFailErrorHandlingInTransaction.set(false);
        this.shouldProcessingThrowException.set(false);
    }

    @Test
    public void shouldBeHealthyOnStart() {
        this.streamProcessor = this.streamProcessorRule.startTypedStreamProcessor((typedRecordProcessors, readonlyProcessingContext) -> {
            return typedRecordProcessors.onEvent(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING, (TypedRecordProcessor) Mockito.mock(TypedRecordProcessor.class));
        });
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY;
        });
    }

    @Test
    public void shouldMarkUnhealthyWhenReprocessingRetryLoop() {
        this.shouldProcessingThrowException.set(true);
        this.streamProcessorRule.writeProcessInstanceEventWithSource(ProcessInstanceIntent.ELEMENT_ACTIVATED, 1, this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, 1));
        TestUtil.waitUntil(() -> {
            return this.streamProcessorRule.events().onlyProcessInstanceRecords().withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED).exists();
        });
        this.streamProcessor = getErrorProneStreamProcessor();
        HealthStatusCheck of = HealthStatusCheck.of(this.streamProcessor);
        this.streamProcessorRule.getActorSchedulerRule().submitActor(of);
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.HEALTHY);
        });
        TestUtil.waitUntil(() -> {
            return this.invocation.get() > 1;
        });
        this.streamProcessorRule.getClock().addTime(StreamProcessor.HEALTH_CHECK_TICK_DURATION.multipliedBy(1L));
        int i = this.invocation.get();
        TestUtil.waitUntil(() -> {
            return i < this.invocation.get();
        });
        this.streamProcessorRule.getClock().addTime(StreamProcessor.HEALTH_CHECK_TICK_DURATION.multipliedBy(1L));
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.UNHEALTHY);
        });
    }

    @Test
    public void shouldMarkUnhealthyWhenOnErrorHandlingWriteEventFails() {
        this.streamProcessor = getErrorProneStreamProcessor();
        HealthStatusCheck of = HealthStatusCheck.of(this.streamProcessor);
        this.streamProcessorRule.getActorSchedulerRule().submitActor(of);
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.HEALTHY);
        });
        this.shouldFlushThrowException.set(true);
        this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, 1);
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.UNHEALTHY);
        });
    }

    @Test
    public void shouldMarkUnhealthyWhenProcessingOnWriteEventFails() {
        this.streamProcessor = getErrorProneStreamProcessor();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY;
        });
        this.shouldProcessingThrowException.set(false);
        this.shouldFlushThrowException.set(true);
        this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, 1);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.UNHEALTHY;
        });
    }

    @Test
    public void shouldMarkUnhealthyWhenExceptionErrorHandlingInTransaction() {
        this.shouldProcessingThrowException.set(true);
        this.streamProcessor = getErrorProneStreamProcessor();
        HealthStatusCheck of = HealthStatusCheck.of(this.streamProcessor);
        this.streamProcessorRule.getActorSchedulerRule().submitActor(of);
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.HEALTHY);
        });
        this.shouldFailErrorHandlingInTransaction.set(true);
        this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, 1);
        TestUtil.waitUntil(() -> {
            return of.hasHealthStatus(HealthStatus.UNHEALTHY);
        });
    }

    @Test
    public void shouldBecomeHealthyWhenErrorIsResolved() {
        this.shouldFlushThrowException.set(true);
        this.streamProcessor = getErrorProneStreamProcessor();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY;
        });
        this.streamProcessorRule.writeProcessInstanceEvent(ProcessInstanceIntent.ELEMENT_ACTIVATING, 1);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.UNHEALTHY;
        });
        this.shouldFlushThrowException.set(false);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.getHealthStatus() == HealthStatus.HEALTHY;
        });
    }

    private StreamProcessor getErrorProneStreamProcessor() {
        this.streamProcessor = this.streamProcessorRule.startTypedStreamProcessor(processingContext -> {
            MutableZeebeState zeebeState = processingContext.getZeebeState();
            this.mockedLogStreamWriter = new WrappedStreamWriter();
            processingContext.logStreamWriter(this.mockedLogStreamWriter);
            return TypedRecordProcessors.processors(zeebeState.getKeyGenerator(), processingContext.getWriters()).onEvent(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING, new TypedRecordProcessor<UnifiedRecordValue>() { // from class: io.zeebe.engine.processing.streamprocessor.StreamProcessorHealthTest.1
                public void processRecord(long j, TypedRecord<UnifiedRecordValue> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter, Consumer<SideEffectProducer> consumer) {
                    StreamProcessorHealthTest.this.invocation.getAndIncrement();
                    if (StreamProcessorHealthTest.this.shouldProcessingThrowException.get()) {
                        throw new RuntimeException("Expected failure on processing");
                    }
                }
            });
        });
        return this.streamProcessor;
    }
}
