package io.zeebe.engine.util;

import io.zeebe.engine.processor.CommandResponseWriter;
import io.zeebe.engine.processor.ReadonlyProcessingContext;
import io.zeebe.engine.processor.RecordValues;
import io.zeebe.engine.processor.StreamProcessorLifecycleAware;
import io.zeebe.engine.processor.TypedEventImpl;
import io.zeebe.engine.processor.workflow.EngineProcessors;
import io.zeebe.engine.processor.workflow.deployment.distribute.DeploymentDistributor;
import io.zeebe.engine.processor.workflow.deployment.distribute.PendingDeploymentDistribution;
import io.zeebe.engine.processor.workflow.message.command.PartitionCommandSender;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandMessageHandler;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processor.workflow.multiinstance.MultiInstanceSubProcessTest;
import io.zeebe.engine.state.DefaultZeebeDbFactory;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.engine.util.client.DeploymentClient;
import io.zeebe.engine.util.client.IncidentClient;
import io.zeebe.engine.util.client.JobActivationClient;
import io.zeebe.engine.util.client.JobClient;
import io.zeebe.engine.util.client.PublishMessageClient;
import io.zeebe.engine.util.client.VariableClient;
import io.zeebe.engine.util.client.WorkflowInstanceClient;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.protocol.record.value.JobRecordValue;
import io.zeebe.test.util.TestUtil;
import io.zeebe.test.util.record.RecordingExporter;
import io.zeebe.test.util.record.RecordingExporterTestWatcher;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.clock.ControlledActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/engine/util/EngineRule.class */
public final class EngineRule extends ExternalResource {
    private static final int PARTITION_ID = 1;
    private static final RecordingExporter RECORDING_EXPORTER = new RecordingExporter();
    private final StreamProcessorRule environmentRule;
    private final RecordingExporterTestWatcher recordingExporterTestWatcher;
    private final int partitionCount;
    private final boolean explicitStart;
    private Consumer<String> jobsAvailableCallback;
    private final Int2ObjectHashMap<SubscriptionCommandMessageHandler> subscriptionHandlers;
    private ExecutorService subscriptionHandlerExecutor;

    /* loaded from: input_file:io/zeebe/engine/util/EngineRule$DeploymentDistributionImpl.class */
    private final class DeploymentDistributionImpl implements DeploymentDistributor {
        private final Map<Long, PendingDeploymentDistribution> pendingDeployments = new HashMap();

        private DeploymentDistributionImpl() {
        }

        public ActorFuture<Void> pushDeployment(long j, long j2, DirectBuffer directBuffer) {
            this.pendingDeployments.put(Long.valueOf(j), new PendingDeploymentDistribution(directBuffer, j2, EngineRule.this.partitionCount));
            EngineRule.this.forEachPartition(num -> {
                if (num.intValue() == 1) {
                    return;
                }
                DeploymentRecord deploymentRecord = new DeploymentRecord();
                deploymentRecord.wrap(directBuffer);
                new Thread(() -> {
                    EngineRule.this.environmentRule.writeCommandOnPartition(num.intValue(), j, DeploymentIntent.CREATE, deploymentRecord);
                }).start();
            });
            return CompletableActorFuture.completed((Object) null);
        }

        public PendingDeploymentDistribution removePendingDeployment(long j) {
            return this.pendingDeployments.remove(Long.valueOf(j));
        }
    }

    /* loaded from: input_file:io/zeebe/engine/util/EngineRule$PartitionCommandSenderImpl.class */
    private class PartitionCommandSenderImpl implements PartitionCommandSender {
        private PartitionCommandSenderImpl() {
        }

        public boolean sendCommand(int i, BufferWriter bufferWriter) {
            byte[] bArr = new byte[bufferWriter.getLength()];
            bufferWriter.write(new UnsafeBuffer(bArr), 0);
            ((SubscriptionCommandMessageHandler) EngineRule.this.subscriptionHandlers.get(i)).apply(bArr);
            return true;
        }
    }

    /* loaded from: input_file:io/zeebe/engine/util/EngineRule$ProcessingExporterTransistor.class */
    private static class ProcessingExporterTransistor implements StreamProcessorLifecycleAware {
        private final RecordValues recordValues = new RecordValues();
        private final RecordMetadata metadata = new RecordMetadata();
        private LogStreamReader logStreamReader;
        private TypedEventImpl typedEvent;

        private ProcessingExporterTransistor() {
        }

        public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
            this.typedEvent = new TypedEventImpl(readonlyProcessingContext.getLogStream().getPartitionId());
            ActorCondition onCondition = readonlyProcessingContext.getActor().onCondition("on-commit", this::onNewEventCommitted);
            LogStream logStream = readonlyProcessingContext.getLogStream();
            logStream.registerOnCommitPositionUpdatedCondition(onCondition);
            logStream.newLogStreamReader().onComplete((logStreamReader, th) -> {
                if (th == null) {
                    this.logStreamReader = logStreamReader;
                    onNewEventCommitted();
                }
            });
        }

        private void onNewEventCommitted() {
            if (this.logStreamReader == null) {
                return;
            }
            while (this.logStreamReader.hasNext()) {
                LoggedEvent loggedEvent = (LoggedEvent) this.logStreamReader.next();
                this.metadata.reset();
                loggedEvent.readMetadata(this.metadata);
                this.typedEvent.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
                EngineRule.RECORDING_EXPORTER.export(this.typedEvent);
            }
        }
    }

    private EngineRule(int i) {
        this(i, false);
    }

    private EngineRule(int i, boolean z) {
        this.recordingExporterTestWatcher = new RecordingExporterTestWatcher();
        this.jobsAvailableCallback = str -> {
        };
        this.subscriptionHandlers = new Int2ObjectHashMap<>();
        this.partitionCount = i;
        this.explicitStart = z;
        this.environmentRule = new StreamProcessorRule(1, i, DefaultZeebeDbFactory.DEFAULT_DB_FACTORY);
    }

    public static EngineRule singlePartition() {
        return new EngineRule(1);
    }

    public static EngineRule multiplePartition(int i) {
        return new EngineRule(i);
    }

    public static EngineRule explicitStart() {
        return new EngineRule(1, true);
    }

    public Statement apply(Statement statement, Description description) {
        return this.environmentRule.apply(super.apply(this.recordingExporterTestWatcher.apply(statement, description), description), description);
    }

    protected void before() {
        this.subscriptionHandlerExecutor = Executors.newSingleThreadExecutor();
        if (this.explicitStart) {
            return;
        }
        startProcessors();
    }

    protected void after() {
        this.subscriptionHandlerExecutor.shutdown();
        this.subscriptionHandlers.clear();
    }

    public void start() {
        startProcessors();
    }

    public void stop() {
        StreamProcessorRule streamProcessorRule = this.environmentRule;
        Objects.requireNonNull(streamProcessorRule);
        forEachPartition((v1) -> {
            r1.closeStreamProcessor(v1);
        });
    }

    public EngineRule withJobsAvailableCallback(Consumer<String> consumer) {
        this.jobsAvailableCallback = consumer;
        return this;
    }

    private void startProcessors() {
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[deploymentRecord.getLength()]);
        deploymentRecord.write(unsafeBuffer, 0);
        Mockito.when(((PendingDeploymentDistribution) Mockito.mock(PendingDeploymentDistribution.class)).getDeployment()).thenReturn(unsafeBuffer);
        forEachPartition(num -> {
            this.environmentRule.startTypedStreamProcessor(num.intValue(), processingContext -> {
                return EngineProcessors.createEngineProcessors(processingContext, this.partitionCount, new SubscriptionCommandSender(num.intValue(), new PartitionCommandSenderImpl()), new DeploymentDistributionImpl(), (j, i) -> {
                }, this.jobsAvailableCallback).withListener(new ProcessingExporterTransistor());
            });
            Int2ObjectHashMap<SubscriptionCommandMessageHandler> int2ObjectHashMap = this.subscriptionHandlers;
            ExecutorService executorService = this.subscriptionHandlerExecutor;
            Objects.requireNonNull(executorService);
            Consumer consumer = executorService::submit;
            StreamProcessorRule streamProcessorRule = this.environmentRule;
            Objects.requireNonNull(streamProcessorRule);
            int2ObjectHashMap.put(num, new SubscriptionCommandMessageHandler(consumer, streamProcessorRule::getLogStreamRecordWriter));
        });
    }

    public void forEachPartition(Consumer<Integer> consumer) {
        int i = 1;
        for (int i2 = 0; i2 < this.partitionCount; i2++) {
            int i3 = i;
            i++;
            consumer.accept(Integer.valueOf(i3));
        }
    }

    public void increaseTime(Duration duration) {
        this.environmentRule.getClock().addTime(duration);
    }

    public void reprocess() {
        forEachPartition(num -> {
            try {
                this.environmentRule.closeStreamProcessor(num.intValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        int size = RecordingExporter.getRecords().size();
        RecordingExporter.reset();
        startProcessors();
        TestUtil.waitUntil(() -> {
            return RecordingExporter.getRecords().size() >= size;
        }, "Failed to reprocess all events, only re-exported %d but expected %d", new Object[]{Integer.valueOf(RecordingExporter.getRecords().size()), Integer.valueOf(size)});
    }

    public List<Integer> getPartitionIds() {
        return (List) IntStream.range(1, 1 + this.partitionCount).boxed().collect(Collectors.toList());
    }

    public ControlledActorClock getClock() {
        return this.environmentRule.getClock();
    }

    public ZeebeState getZeebeState() {
        return this.environmentRule.getZeebeState();
    }

    public DeploymentClient deployment() {
        return new DeploymentClient(this.environmentRule, this::forEachPartition);
    }

    public WorkflowInstanceClient workflowInstance() {
        return new WorkflowInstanceClient(this.environmentRule);
    }

    public PublishMessageClient message() {
        return new PublishMessageClient(this.environmentRule, this.partitionCount);
    }

    public VariableClient variables() {
        return new VariableClient(this.environmentRule);
    }

    public JobActivationClient jobs() {
        return new JobActivationClient(this.environmentRule);
    }

    public JobClient job() {
        return new JobClient(this.environmentRule);
    }

    public IncidentClient incident() {
        return new IncidentClient(this.environmentRule);
    }

    public Record<JobRecordValue> createJob(String str, String str2) {
        deployment().withXmlResource(str2, Bpmn.createExecutableProcess(str2).startEvent("start").serviceTask(MultiInstanceSubProcessTest.TASK_ELEMENT_ID, serviceTaskBuilder -> {
            serviceTaskBuilder.zeebeJobType(str).done();
        }).endEvent("end").done()).deploy();
        long create = workflowInstance().ofBpmnProcessId(str2).create();
        return (Record) RecordingExporter.jobRecords(JobIntent.CREATED).withType(str).filter(record -> {
            return record.getValue().getWorkflowInstanceKey() == create;
        }).getFirst();
    }

    public void writeRecords(RecordToWrite... recordToWriteArr) {
        this.environmentRule.writeBatch(recordToWriteArr);
    }

    public CommandResponseWriter getCommandResponseWriter() {
        return this.environmentRule.getCommandResponseWriter();
    }
}
