package de.otto.synapse.eventsource;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.logging.LogHelper;
import de.otto.synapse.messagestore.MessageStore;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

/* loaded from: input_file:de/otto/synapse/eventsource/DefaultEventSource.class */
public class DefaultEventSource extends AbstractEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultEventSource.class);
    private static final int LOG_MESSAGE_COUNTER_EVERY_NTH_MESSAGE = 100000;
    private final MessageStore messageStore;
    private final Marker marker;

    public DefaultEventSource(@Nonnull MessageStore messageStore, @Nonnull MessageLogReceiverEndpoint messageLogReceiverEndpoint) {
        super(messageLogReceiverEndpoint);
        this.messageStore = messageStore;
        this.marker = null;
    }

    public DefaultEventSource(@Nonnull MessageStore messageStore, @Nonnull MessageLogReceiverEndpoint messageLogReceiverEndpoint, @Nonnull Marker marker) {
        super(messageLogReceiverEndpoint);
        this.messageStore = messageStore;
        this.marker = marker;
    }

    @Override // de.otto.synapse.eventsource.EventSource
    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull Predicate<ShardResponse> predicate) {
        return consumeMessageStore().thenCompose(channelPosition -> {
            return getMessageLogReceiverEndpoint().consumeUntil(channelPosition, predicate);
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (channelPosition2, th) -> {
            if (th != null) {
                LOG.error(this.marker, "Failed to start consuming from EventSource {}: {}. Closing MessageStore.", new Object[]{getChannelName(), th.getMessage(), th});
            }
            try {
                this.messageStore.close();
            } catch (Exception e) {
                LOG.error(this.marker, "Unable to close() MessageStore: " + e.getMessage(), e);
            }
            return channelPosition2;
        });
    }

    private CompletableFuture<ChannelPosition> consumeMessageStore() {
        int i = 1;
        if (this.messageStore.isCompacting()) {
            i = Runtime.getRuntime().availableProcessors();
        }
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new CustomizableThreadFactory("synapse-messagestore-dispatcher-"));
        Semaphore semaphore = new Semaphore(i);
        String channelName = getChannelName();
        LOG.info(this.marker, "Starting to read message store for channel '{}'.", channelName);
        Instant now = Instant.now();
        AtomicLong atomicLong = new AtomicLong();
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong2 = new AtomicLong(System.currentTimeMillis());
        Map copyOfContextMap = MDC.getCopyOfContextMap();
        return CompletableFuture.supplyAsync(() -> {
            if (copyOfContextMap != null) {
                MDC.setContextMap(copyOfContextMap);
            }
            this.messageStore.stream().filter(messageStoreEntry -> {
                return messageStoreEntry.getChannelName().equals(channelName);
            }).map((v0) -> {
                return v0.getTextMessage();
            }).map(textMessage -> {
                return getMessageLogReceiverEndpoint().intercept(textMessage);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(textMessage2 -> {
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    LOG.error(this.marker, e.getMessage(), e);
                }
                newCachedThreadPool.execute(() -> {
                    if (copyOfContextMap != null) {
                        MDC.setContextMap(copyOfContextMap);
                    }
                    try {
                        getMessageLogReceiverEndpoint().getMessageDispatcher().accept(textMessage2);
                        long andIncrement = atomicLong.getAndIncrement();
                        if (andIncrement > 0 && andIncrement % 100000 == 0) {
                            LOG.info(this.marker, "Consumed {} messages ({} per second) from message store for channel '{}'", new Object[]{Long.valueOf(andIncrement), String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(atomicLong2.getAndSet(System.currentTimeMillis()), 100000L))), channelName});
                        }
                        semaphore.release();
                    } catch (Throwable th) {
                        long andIncrement2 = atomicLong.getAndIncrement();
                        if (andIncrement2 > 0 && andIncrement2 % 100000 == 0) {
                            LOG.info(this.marker, "Consumed {} messages ({} per second) from message store for channel '{}'", new Object[]{Long.valueOf(andIncrement2), String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(atomicLong2.getAndSet(System.currentTimeMillis()), 100000L))), channelName});
                        }
                        semaphore.release();
                        throw th;
                    }
                });
            });
            newCachedThreadPool.shutdown();
            try {
                newCachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
                LOG.info(this.marker, "Consumed a total of {} messages from message store for channel '{}', totalMessagesPerSecond={}", new Object[]{Long.valueOf(atomicLong.get()), channelName, String.format("%.2f", Double.valueOf(LogHelper.calculateMessagesPerSecond(currentTimeMillis, atomicLong.get())))});
            } catch (InterruptedException e) {
                LOG.error(this.marker, e.getMessage(), e);
            }
            LOG.info(this.marker, "Finished reading message store for channel '{}'. Duration was {}.", channelName, Duration.between(now, Instant.now()));
            return this.messageStore.getLatestChannelPosition(channelName);
        }, Executors.newSingleThreadExecutor(new CustomizableThreadFactory("synapse-eventsource-")));
    }
}
