package de.otto.edison.eventsourcing.consumer;

import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:de/otto/edison/eventsourcing/consumer/EventSourceConsumerProcess.class */
public class EventSourceConsumerProcess {
    private static final Logger LOG = LoggerFactory.getLogger(EventSourceConsumerProcess.class);
    private static final String THREAD_NAME_PREFIX = "edison-eventsourcing-consumer-";
    private final ExecutorService executorService;
    private final AtomicBoolean stopThread = new AtomicBoolean(false);
    private final Multimap<EventSource, EventConsumer> eventSourceWithConsumer = LinkedHashMultimap.create();

    public EventSourceConsumerProcess(List<EventSource> list, List<EventConsumer> list2) {
        matchEventConsumersWithEventSourcesByStreamName(list, list2);
        if (this.eventSourceWithConsumer.size() <= 0) {
            this.executorService = null;
        } else {
            this.executorService = Executors.newFixedThreadPool(this.eventSourceWithConsumer.size(), new CustomizableThreadFactory(THREAD_NAME_PREFIX));
        }
    }

    private void matchEventConsumersWithEventSourcesByStreamName(List<EventSource> list, List<EventConsumer> list2) {
        list2.forEach(eventConsumer -> {
            list.stream().filter(eventSource -> {
                return eventSource.getStreamName().equals(eventConsumer.streamName());
            }).findAny().ifPresent(eventSource2 -> {
                this.eventSourceWithConsumer.put(eventSource2, eventConsumer);
            });
        });
    }

    @PostConstruct
    public void init() {
        LOG.info("Initializing EventSourceConsumerProcess...");
        this.eventSourceWithConsumer.keySet().forEach(eventSource -> {
            this.executorService.submit(() -> {
                try {
                    LOG.info("Starting {}...", eventSource.getStreamName());
                    eventSource.consumeAll(obj -> {
                        return this.stopThread.get();
                    }, new DelegateEventConsumer(this.eventSourceWithConsumer.get(eventSource)).consumerFunction());
                } catch (Exception e) {
                    LOG.error("Starting failed: " + e.getMessage(), e);
                }
            });
        });
    }

    @PreDestroy
    public void shutdown() {
        LOG.info("Shutting down...");
        this.stopThread.set(true);
        if (this.executorService != null) {
            try {
                this.executorService.shutdownNow();
                this.executorService.awaitTermination(2L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("...done.");
    }
}
