package io.zeebe.engine.processor;

import io.zeebe.engine.state.DefaultZeebeDbFactory;
import io.zeebe.engine.util.RecordStream;
import io.zeebe.engine.util.Records;
import io.zeebe.engine.util.TestStreams;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.util.SynchronousLogStream;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.protocol.record.value.deployment.ResourceType;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/zeebe/engine/processor/TypedStreamProcessorTest.class */
public final class TypedStreamProcessorTest {
    private static final String STREAM_NAME = "foo";
    protected SynchronousLogStream stream;
    private final TemporaryFolder tempFolder = new TemporaryFolder();
    private final AutoCloseableRule closeables = new AutoCloseableRule();
    private final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.closeables);
    private TestStreams streams;
    private KeyGenerator keyGenerator;
    private CommandResponseWriter mockCommandResponseWriter;

    /* loaded from: input_file:io/zeebe/engine/processor/TypedStreamProcessorTest$BatchProcessor.class */
    protected class BatchProcessor implements TypedRecordProcessor<DeploymentRecord> {
        protected BatchProcessor() {
        }

        public void processRecord(TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
            typedStreamWriter.appendNewEvent(TypedStreamProcessorTest.this.keyGenerator.nextKey(), DeploymentIntent.CREATED, typedRecord.getValue());
            typedStreamWriter.flush();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/engine/processor/TypedStreamProcessorTest$ErrorProneProcessor.class */
    public static class ErrorProneProcessor implements TypedRecordProcessor<DeploymentRecord> {
        protected ErrorProneProcessor() {
        }

        public void processRecord(TypedRecord<DeploymentRecord> typedRecord, TypedResponseWriter typedResponseWriter, TypedStreamWriter typedStreamWriter) {
            if (typedRecord.getKey() == 0) {
                throw new RuntimeException("expected");
            }
            typedStreamWriter.appendFollowUpEvent(typedRecord.getKey(), DeploymentIntent.CREATED, typedRecord.getValue());
            typedStreamWriter.flush();
        }
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.streams = new TestStreams(this.tempFolder, this.closeables, this.actorSchedulerRule.get());
        this.mockCommandResponseWriter = this.streams.getMockedResponseWriter();
        this.stream = this.streams.createLogStream("foo");
        AtomicLong atomicLong = new AtomicLong();
        this.keyGenerator = () -> {
            return atomicLong.getAndIncrement();
        };
    }

    @Test
    public void shouldWriteSourceEventAndProducerOnBatch() {
        this.streams.startStreamProcessor("foo", DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, processingContext -> {
            return TypedRecordProcessors.processors().onCommand(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, new BatchProcessor());
        });
        Assertions.assertThat(((LoggedEvent) ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streams.events("foo").filter(loggedEvent -> {
                return Records.isEvent(loggedEvent, ValueType.DEPLOYMENT, DeploymentIntent.CREATED);
            }).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get()).getSourceEventPosition()).isEqualTo(this.streams.newRecord("foo").event(deployment("foo", ResourceType.BPMN_XML)).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).write());
    }

    @Test
    public void shouldSkipFailingEvent() {
        this.streams.startStreamProcessor("foo", DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, processingContext -> {
            return TypedRecordProcessors.processors().onCommand(ValueType.DEPLOYMENT, DeploymentIntent.CREATE, new ErrorProneProcessor());
        });
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Mockito.when(Boolean.valueOf(this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).then(invocationOnMock -> {
            int intValue = ((Integer) invocationOnMock.getArgument(0)).intValue();
            atomicLong.set(((Long) invocationOnMock.getArgument(1)).longValue());
            atomicInteger.set(intValue);
            return true;
        });
        long nextKey = this.keyGenerator.nextKey();
        this.streams.newRecord("foo").event(deployment("foo", ResourceType.BPMN_XML)).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).requestId(255L).requestStreamId(99).key(nextKey).write();
        long write = this.streams.newRecord("foo").event(deployment("foo2", ResourceType.BPMN_XML)).recordType(RecordType.COMMAND).intent(DeploymentIntent.CREATE).key(this.keyGenerator.nextKey()).write();
        LoggedEvent loggedEvent = (LoggedEvent) ((Optional) TestUtil.doRepeatedly(() -> {
            return this.streams.events("foo").filter(loggedEvent2 -> {
                return Records.isEvent(loggedEvent2, ValueType.DEPLOYMENT, DeploymentIntent.CREATED);
            }).findFirst();
        }).until(optional -> {
            return Boolean.valueOf(optional.isPresent());
        })).get();
        Assertions.assertThat(loggedEvent.getKey()).isEqualTo(1L);
        Assertions.assertThat(loggedEvent.getSourceEventPosition()).isEqualTo(write);
        ((CommandResponseWriter) Mockito.verify(this.mockCommandResponseWriter)).tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        Assertions.assertThat(atomicLong.get()).isEqualTo(255L);
        Assertions.assertThat(atomicInteger.get()).isEqualTo(99);
        Record record = (Record) new RecordStream(this.streams.events("foo")).onlyDeploymentRecords().onlyRejections().withIntent(DeploymentIntent.CREATE).getFirst();
        Assertions.assertThat(record.getKey()).isEqualTo(nextKey);
        Assertions.assertThat(record.getRejectionType()).isEqualTo(RejectionType.PROCESSING_ERROR);
    }

    protected DeploymentRecord deployment(String str, ResourceType resourceType) {
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResourceType(resourceType).setResource(BufferUtil.wrapString("foo")).setResourceName(BufferUtil.wrapString(str));
        return deploymentRecord;
    }
}
