package io.zeebe.engine.util;

import io.zeebe.db.ZeebeDb;
import io.zeebe.db.ZeebeDbFactory;
import io.zeebe.distributedlog.impl.DefaultDistributedLogstreamService;
import io.zeebe.distributedlog.impl.DistributedLogstreamPartition;
import io.zeebe.engine.processor.AsyncSnapshotDirector;
import io.zeebe.engine.processor.CommandResponseWriter;
import io.zeebe.engine.processor.ReadonlyProcessingContext;
import io.zeebe.engine.processor.StreamProcessor;
import io.zeebe.engine.processor.StreamProcessorLifecycleAware;
import io.zeebe.engine.processor.StreamProcessorServiceNames;
import io.zeebe.engine.processor.TypedEventRegistry;
import io.zeebe.engine.processor.TypedRecordProcessorFactory;
import io.zeebe.engine.processor.TypedRecordProcessors;
import io.zeebe.engine.state.StateStorageFactory;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.impl.service.LogStreamServiceNames;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.state.StateSnapshotController;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.DirectBuffer;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.FieldSetter;

/* loaded from: input_file:io/zeebe/engine/util/TestStreams.class */
public class TestStreams {
    private static final int MAX_SNAPSHOTS = 1;
    private final TemporaryFolder dataDirectory;
    private final AutoCloseableRule closeables;
    private final ServiceContainer serviceContainer;
    private final ActorScheduler actorScheduler;
    private ZeebeDb zeebeDb;
    private AsyncSnapshotDirector asyncSnapshotDirector;
    private static final Duration SNAPSHOT_INTERVAL = Duration.ofMinutes(1);
    private static final Map<Class<?>, ValueType> VALUE_TYPES = new HashMap();
    private final Map<String, LogStream> managedLogs = new HashMap();
    private final Map<String, StateSnapshotController> snapshotControllerMap = new HashMap();
    private final CommandResponseWriter mockCommandResponseWriter = (CommandResponseWriter) Mockito.mock(CommandResponseWriter.class);

    /* loaded from: input_file:io/zeebe/engine/util/TestStreams$FluentLogWriter.class */
    public static class FluentLogWriter {
        protected UnpackedObject value;
        protected LogStream logStream;
        protected RecordMetadata metadata = new RecordMetadata();
        protected long key = -1;
        private long sourceRecordPosition = -1;

        public FluentLogWriter(LogStream logStream) {
            this.logStream = logStream;
            this.metadata.protocolVersion(1);
        }

        public FluentLogWriter intent(Intent intent) {
            this.metadata.intent(intent);
            return this;
        }

        public FluentLogWriter requestId(long j) {
            this.metadata.requestId(j);
            return this;
        }

        public FluentLogWriter sourceRecordPosition(long j) {
            this.sourceRecordPosition = j;
            return this;
        }

        public FluentLogWriter requestStreamId(int i) {
            this.metadata.requestStreamId(i);
            return this;
        }

        public FluentLogWriter recordType(RecordType recordType) {
            this.metadata.recordType(recordType);
            return this;
        }

        public FluentLogWriter key(long j) {
            this.key = j;
            return this;
        }

        public FluentLogWriter event(UnpackedObject unpackedObject) {
            ValueType valueType = (ValueType) TestStreams.VALUE_TYPES.get(unpackedObject.getClass());
            if (valueType == null) {
                throw new RuntimeException("No event type registered for getValue " + unpackedObject.getClass());
            }
            this.metadata.valueType(valueType);
            this.value = unpackedObject;
            return this;
        }

        public long write() {
            LogStreamWriterImpl logStreamWriterImpl = new LogStreamWriterImpl(this.logStream);
            logStreamWriterImpl.sourceRecordPosition(this.sourceRecordPosition);
            if (this.key >= 0) {
                logStreamWriterImpl.key(this.key);
            } else {
                logStreamWriterImpl.keyNull();
            }
            logStreamWriterImpl.metadataWriter(this.metadata);
            logStreamWriterImpl.valueWriter(this.value);
            return ((Long) TestUtil.doRepeatedly(() -> {
                return Long.valueOf(logStreamWriterImpl.tryWrite());
            }).until(l -> {
                return Boolean.valueOf(l.longValue() >= 0);
            })).longValue();
        }
    }

    public TestStreams(TemporaryFolder temporaryFolder, AutoCloseableRule autoCloseableRule, ServiceContainer serviceContainer, ActorScheduler actorScheduler) {
        this.dataDirectory = temporaryFolder;
        this.closeables = autoCloseableRule;
        this.serviceContainer = serviceContainer;
        this.actorScheduler = actorScheduler;
        Mockito.when(this.mockCommandResponseWriter.intent((Intent) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.key(ArgumentMatchers.anyLong())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.partitionId(ArgumentMatchers.anyInt())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.recordType((RecordType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionType((RejectionType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.rejectionReason((DirectBuffer) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueType((ValueType) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(this.mockCommandResponseWriter.valueWriter((BufferWriter) ArgumentMatchers.any())).thenReturn(this.mockCommandResponseWriter);
        Mockito.when(Boolean.valueOf(this.mockCommandResponseWriter.tryWriteResponse(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong()))).thenReturn(true);
    }

    public CommandResponseWriter getMockedResponseWriter() {
        return this.mockCommandResponseWriter;
    }

    public LogStream createLogStream(String str) {
        return createLogStream(str, 0);
    }

    public LogStream createLogStream(String str, int i) {
        File file = null;
        try {
            file = this.dataDirectory.newFolder(new String[]{str, "segments"});
        } catch (IOException e) {
            e.printStackTrace();
        }
        LogStream logStream = (LogStream) Mockito.spy(LogStreams.createFsLogStream(i).logRootPath(file.getAbsolutePath()).serviceContainer(this.serviceContainer).logName(str).deleteOnClose(true).build().join());
        DistributedLogstreamPartition distributedLogstreamPartition = (DistributedLogstreamPartition) Mockito.mock(DistributedLogstreamPartition.class);
        DefaultDistributedLogstreamService defaultDistributedLogstreamService = new DefaultDistributedLogstreamService();
        try {
            FieldSetter.setField(defaultDistributedLogstreamService, DefaultDistributedLogstreamService.class.getDeclaredField("logStream"), logStream);
            FieldSetter.setField(defaultDistributedLogstreamService, DefaultDistributedLogstreamService.class.getDeclaredField("logStorage"), logStream.getLogStorage());
            FieldSetter.setField(defaultDistributedLogstreamService, DefaultDistributedLogstreamService.class.getDeclaredField("currentLeader"), "0");
        } catch (NoSuchFieldException e2) {
            e2.printStackTrace();
        }
        ((DistributedLogstreamPartition) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            if (arguments == null || arguments.length <= 1 || arguments[0] == null || arguments[1] == null) {
                return null;
            }
            return CompletableFuture.completedFuture(Long.valueOf(defaultDistributedLogstreamService.append("0", ((Long) arguments[1]).longValue(), (byte[]) arguments[0])));
        }).when(distributedLogstreamPartition)).asyncAppend((byte[]) ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyLong());
        this.serviceContainer.createService(LogStreamServiceNames.distributedLogPartitionServiceName(str), () -> {
            return distributedLogstreamPartition;
        }).install().join();
        logStream.openAppender().join();
        this.managedLogs.put(str, logStream);
        this.closeables.manage(logStream);
        return logStream;
    }

    public LogStream getLogStream(String str) {
        return this.managedLogs.get(str);
    }

    public Stream<LoggedEvent> events(String str) {
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.managedLogs.get(str));
        this.closeables.manage(bufferedLogStreamReader);
        bufferedLogStreamReader.seekToFirstEvent();
        Iterable iterable = () -> {
            return bufferedLogStreamReader;
        };
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    public FluentLogWriter newRecord(String str) {
        return new FluentLogWriter(getLogStream(str));
    }

    public StateStorageFactory getStateStorageFactory(LogStream logStream) {
        File file;
        try {
            file = this.dataDirectory.newFolder(new String[]{logStream.getLogName(), "state"});
        } catch (IOException e) {
            if (!e.getMessage().contains("exists")) {
                throw new RuntimeException(e);
            }
            file = new File(new File(this.dataDirectory.getRoot(), logStream.getLogName()), "state");
        }
        return new StateStorageFactory(file);
    }

    public StreamProcessor startStreamProcessor(String str, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory) {
        return buildStreamProcessor(getLogStream(str), zeebeDbFactory, typedRecordProcessorFactory, 1, SNAPSHOT_INTERVAL);
    }

    private StreamProcessor buildStreamProcessor(LogStream logStream, ZeebeDbFactory zeebeDbFactory, TypedRecordProcessorFactory typedRecordProcessorFactory, int i, Duration duration) {
        StateSnapshotController stateSnapshotController = (StateSnapshotController) Mockito.spy(new StateSnapshotController(zeebeDbFactory, getStateStorageFactory(logStream).create(), i));
        this.snapshotControllerMap.put(logStream.getLogName(), stateSnapshotController);
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.zeebeDb = stateSnapshotController.openDb();
        StreamProcessor streamProcessor = (StreamProcessor) StreamProcessor.builder().logStream(logStream).zeebeDb(this.zeebeDb).actorScheduler(this.actorScheduler).serviceContainer(this.serviceContainer).commandResponseWriter(this.mockCommandResponseWriter).streamProcessorFactory(processingContext -> {
            TypedRecordProcessors createProcessors = typedRecordProcessorFactory.createProcessors(processingContext);
            createProcessors.withListener(new StreamProcessorLifecycleAware() { // from class: io.zeebe.engine.util.TestStreams.1
                public void onOpen(ReadonlyProcessingContext readonlyProcessingContext) {
                    completableActorFuture.complete((Object) null);
                }
            });
            return createProcessors;
        }).build().join();
        completableActorFuture.join();
        this.asyncSnapshotDirector = new AsyncSnapshotDirector(streamProcessor, stateSnapshotController, logStream, duration);
        this.actorScheduler.submitActor(this.asyncSnapshotDirector);
        return streamProcessor;
    }

    public StateSnapshotController getStateSnapshotController(String str) {
        return this.snapshotControllerMap.get(str);
    }

    public void closeProcessor(String str) throws Exception {
        this.asyncSnapshotDirector.closeAsync().join();
        this.serviceContainer.removeService(StreamProcessorServiceNames.streamProcessorService(str)).join();
        this.zeebeDb.close();
    }

    static {
        TypedEventRegistry.EVENT_REGISTRY.forEach((valueType, cls) -> {
            VALUE_TYPES.put(cls, valueType);
        });
    }
}
