package events.dewdrop.read.readmodel.stream.subscription;

import events.dewdrop.read.readmodel.stream.NameAndPosition;
import events.dewdrop.read.readmodel.stream.Stream;
import events.dewdrop.read.readmodel.stream.StreamListener;
import events.dewdrop.read.readmodel.stream.StreamReader;
import events.dewdrop.structure.api.Event;
import events.dewdrop.structure.read.Handler;
import events.dewdrop.structure.subscribe.EventProcessor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:events/dewdrop/read/readmodel/stream/subscription/Subscription.class */
public class Subscription<T extends Event> {

    @Generated
    private static final Logger log = LogManager.getLogger(Subscription.class);
    protected final StreamListener<T> listener;
    private final List<Class<? extends Event>> messageTypes;
    private final Handler<T> handler;
    private final Map<Class<?>, List<EventProcessor<T>>> handlers = new ConcurrentHashMap();
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

    Subscription(Handler<T> handler, List<Class<? extends Event>> list, StreamListener<T> streamListener) {
        this.messageTypes = list;
        this.handler = handler;
        this.listener = streamListener;
        registerHandlers();
    }

    public static Subscription getInstance(Stream stream) {
        return new Subscription(stream, stream.getStreamDetails().getMessageTypes(), StreamListener.getInstance(stream.getStreamStore(), stream.getEventSerializer()));
    }

    void registerToMessageType(EventProcessor<T> eventProcessor, Class<?> cls) {
        if (getHandlesFor(cls).stream().anyMatch(eventProcessor2 -> {
            return eventProcessor2.isSame(cls, eventProcessor);
        })) {
            return;
        }
        synchronized (this.handlers) {
            this.handlers.computeIfAbsent(cls, cls2 -> {
                return new ArrayList();
            });
            this.handlers.get(cls).add(eventProcessor);
        }
    }

    List<EventProcessor<T>> getHandlesFor(Class<?> cls) {
        Objects.requireNonNull(cls, "Type is required");
        synchronized (this.handlers) {
            if (this.handlers.containsKey(cls)) {
                return new ArrayList(this.handlers.get(cls));
            }
            return new ArrayList();
        }
    }

    public void registerHandlers() {
        EventProcessor<T> eventProcessor = new EventProcessor<>(this.handler, getMessageTypes());
        Iterator<Class<? extends Event>> it = getMessageTypes().iterator();
        while (it.hasNext()) {
            registerToMessageType(eventProcessor, it.next());
        }
    }

    public void publish(T t) {
        Objects.requireNonNull(t, "event is required");
        log.debug("Publishing event:{}, handlers: {}", t.getClass().getSimpleName(), Integer.valueOf(this.handlers.size()));
        getHandlesFor(t.getClass()).forEach(eventProcessor -> {
            eventProcessor.process(t);
        });
    }

    public boolean subscribeByNameAndPosition(StreamReader streamReader) {
        NameAndPosition nameAndPosition = streamReader.nameAndPosition();
        if (!streamReader.isStreamExists()) {
            return false;
        }
        boolean start = this.listener.start(nameAndPosition.getStreamName(), nameAndPosition.getPosition(), this);
        if (start) {
            log.info("Completed subscription to stream: {} from position:{}", nameAndPosition.getStreamName(), nameAndPosition.getPosition());
        }
        return start;
    }

    public void pollForCompletion(StreamReader streamReader) {
        CompletableFuture<NameAndPosition> completableFuture = new CompletableFuture<>();
        schedule(streamReader, completableFuture, () -> {
            NameAndPosition nameAndPosition = streamReader.nameAndPosition();
            if (nameAndPosition.isComplete()) {
                log.info("Finally discovered stream: {}", nameAndPosition.getStreamName());
                completableFuture.complete(nameAndPosition);
            }
            if (streamReader.isStreamExists()) {
                return;
            }
            log.info("Stream: {} still not found", streamReader.getStreamName());
        });
    }

    void schedule(StreamReader streamReader, CompletableFuture<NameAndPosition> completableFuture, Runnable runnable) {
        ScheduledFuture<?> scheduleAtFixedRate = this.executorService.scheduleAtFixedRate(runnable, 0L, 1L, TimeUnit.SECONDS);
        completableFuture.thenApply(nameAndPosition -> {
            subscribeByNameAndPosition(streamReader);
            return true;
        });
        completableFuture.whenComplete((nameAndPosition2, th) -> {
            scheduleAtFixedRate.cancel(false);
        });
    }

    @Generated
    public Map<Class<?>, List<EventProcessor<T>>> getHandlers() {
        return this.handlers;
    }

    @Generated
    public StreamListener<T> getListener() {
        return this.listener;
    }

    @Generated
    public List<Class<? extends Event>> getMessageTypes() {
        return this.messageTypes;
    }

    @Generated
    public Handler<T> getHandler() {
        return this.handler;
    }

    @Generated
    public ScheduledExecutorService getExecutorService() {
        return this.executorService;
    }

    @Generated
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof Subscription)) {
            return false;
        }
        Subscription subscription = (Subscription) obj;
        if (!subscription.canEqual(this)) {
            return false;
        }
        Map<Class<?>, List<EventProcessor<T>>> handlers = getHandlers();
        Map<Class<?>, List<EventProcessor<T>>> handlers2 = subscription.getHandlers();
        if (handlers == null) {
            if (handlers2 != null) {
                return false;
            }
        } else if (!handlers.equals(handlers2)) {
            return false;
        }
        StreamListener<T> listener = getListener();
        StreamListener<T> listener2 = subscription.getListener();
        if (listener == null) {
            if (listener2 != null) {
                return false;
            }
        } else if (!listener.equals(listener2)) {
            return false;
        }
        List<Class<? extends Event>> messageTypes = getMessageTypes();
        List<Class<? extends Event>> messageTypes2 = subscription.getMessageTypes();
        if (messageTypes == null) {
            if (messageTypes2 != null) {
                return false;
            }
        } else if (!messageTypes.equals(messageTypes2)) {
            return false;
        }
        Handler<T> handler = getHandler();
        Handler<T> handler2 = subscription.getHandler();
        if (handler == null) {
            if (handler2 != null) {
                return false;
            }
        } else if (!handler.equals(handler2)) {
            return false;
        }
        ScheduledExecutorService executorService = getExecutorService();
        ScheduledExecutorService executorService2 = subscription.getExecutorService();
        return executorService == null ? executorService2 == null : executorService.equals(executorService2);
    }

    @Generated
    protected boolean canEqual(Object obj) {
        return obj instanceof Subscription;
    }

    @Generated
    public int hashCode() {
        Map<Class<?>, List<EventProcessor<T>>> handlers = getHandlers();
        int hashCode = (1 * 59) + (handlers == null ? 43 : handlers.hashCode());
        StreamListener<T> listener = getListener();
        int hashCode2 = (hashCode * 59) + (listener == null ? 43 : listener.hashCode());
        List<Class<? extends Event>> messageTypes = getMessageTypes();
        int hashCode3 = (hashCode2 * 59) + (messageTypes == null ? 43 : messageTypes.hashCode());
        Handler<T> handler = getHandler();
        int hashCode4 = (hashCode3 * 59) + (handler == null ? 43 : handler.hashCode());
        ScheduledExecutorService executorService = getExecutorService();
        return (hashCode4 * 59) + (executorService == null ? 43 : executorService.hashCode());
    }

    @Generated
    public String toString() {
        return "Subscription(handlers=" + getHandlers() + ", listener=" + getListener() + ", messageTypes=" + getMessageTypes() + ", handler=" + getHandler() + ", executorService=" + getExecutorService() + ")";
    }
}
