package events.dewdrop.read.readmodel.stream;

import events.dewdrop.aggregate.AggregateRoot;
import events.dewdrop.read.readmodel.ReadModel;
import events.dewdrop.structure.StreamNameGenerator;
import events.dewdrop.structure.api.Event;
import events.dewdrop.structure.datastore.StreamStore;
import events.dewdrop.structure.read.Direction;
import events.dewdrop.structure.serialize.EventSerializer;
import events.dewdrop.utils.EventHandlerUtils;
import events.dewdrop.utils.StreamUtils;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:events/dewdrop/read/readmodel/stream/StreamFactory.class */
public class StreamFactory {

    @Generated
    private static final Logger log = LogManager.getLogger(StreamFactory.class);
    private StreamNameGenerator streamNameGenerator;
    private StreamStore streamStore;
    private EventSerializer eventSerializer;

    public StreamFactory(StreamStore streamStore, EventSerializer eventSerializer, StreamNameGenerator streamNameGenerator) {
        Objects.requireNonNull(streamStore, "streamStore is required");
        Objects.requireNonNull(eventSerializer, "eventSerializer is required");
        Objects.requireNonNull(streamNameGenerator, "StreamNameGenerator is required");
        this.streamStore = streamStore;
        this.eventSerializer = eventSerializer;
        this.streamNameGenerator = streamNameGenerator;
    }

    <T extends Event> StreamDetails fromStreamAnnotation(events.dewdrop.read.readmodel.annotation.Stream stream, ReadModel<T> readModel) {
        List<Class<? extends Event>> eventHandlers = EventHandlerUtils.getEventHandlers(readModel);
        Consumer<T> handler = readModel.handler();
        Objects.requireNonNull(stream, "StreamAnnotation is required");
        Objects.requireNonNull(handler, "EventHandler is required");
        Optional<Method> empty = Optional.empty();
        if (readModel.getInMemoryCacheProcessor().isEmpty()) {
            empty = StreamUtils.getStreamStartPositionMethod(stream, readModel);
            if (empty.isEmpty()) {
                String simpleName = readModel.getReadModelWrapper().getOriginalReadModelClass().getSimpleName();
                log.error("Unable to create a valid stream for the ReadModel: {} - @Stream(name={}, streamType={}) - Create a method decorated with @StreamStartPosition(name = {}, streamType = {}) with the same name and streamType for the stream, which is required if the inMemoryCacheProcessor is not set. This should return a long which is your last position for that stream.", simpleName, stream.name(), stream.streamType(), stream.name(), stream.streamType());
                throw new IllegalStateException(String.format("Unable to create a valid stream for the ReadModel: %s - @Stream(name=%s, streamType=%s) - Create a method decorated with @StreamStartPosition(name = %s, streamType = %s) with the same name and streamType for the stream, which is required if the inMemoryCacheProcessor is not set.  This should return a long which is your last position for that stream.", simpleName, stream.name(), stream.streamType(), stream.name(), stream.streamType()));
            }
        }
        return StreamDetails.builder().streamType(stream.streamType()).direction(stream.direction()).eventHandler(handler).streamNameGenerator(this.streamNameGenerator).messageTypes(eventHandlers).name(stream.name()).subscribed(Boolean.valueOf(stream.subscribed())).startPositionMethod(empty).create();
    }

    StreamDetails fromAggregateRoot(AggregateRoot aggregateRoot, UUID uuid) {
        return StreamDetails.builder().streamType(StreamType.AGGREGATE).direction(Direction.FORWARD).aggregateRoot(aggregateRoot).streamNameGenerator(this.streamNameGenerator).id(uuid).create();
    }

    <T extends Event> StreamDetails fromEvent(Consumer<Event> consumer, Class<? extends Event> cls) {
        return StreamDetails.builder().streamType(StreamType.EVENT).direction(Direction.FORWARD).eventHandler(consumer).streamNameGenerator(this.streamNameGenerator).messageTypes(List.of(cls)).name(cls.getSimpleName()).subscribed(true).subscriptionStartStrategy(SubscriptionStartStrategy.START_END_ONLY).create();
    }

    public Stream constructStreamFromAggregateRoot(AggregateRoot aggregateRoot, UUID uuid) {
        return new Stream(fromAggregateRoot(aggregateRoot, uuid), this.streamStore, this.eventSerializer);
    }

    public <T extends Event> Stream constructStreamFromStream(events.dewdrop.read.readmodel.annotation.Stream stream, ReadModel<T> readModel) {
        return new Stream(fromStreamAnnotation(stream, readModel), this.streamStore, this.eventSerializer);
    }

    public Stream constructStreamForEvent(Consumer consumer, Class<? extends Event> cls) {
        return new Stream(fromEvent(consumer, cls), this.streamStore, this.eventSerializer);
    }
}
