package de.otto.synapse.endpoint.receiver.aws;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.InterceptorChain;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.info.MessageReceiverNotification;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.logging.LogHelper;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import software.amazon.awssdk.services.kinesis.KinesisClient;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/aws/KinesisMessageLogReceiverEndpoint.class */
public class KinesisMessageLogReceiverEndpoint extends AbstractMessageLogReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisMessageLogReceiverEndpoint.class);
    private final KinesisMessageLogReader kinesisMessageLogReader;
    private final ApplicationEventPublisher eventPublisher;

    /* loaded from: input_file:de/otto/synapse/endpoint/receiver/aws/KinesisMessageLogReceiverEndpoint$KinesisShardResponseConsumer.class */
    private static class KinesisShardResponseConsumer implements Consumer<KinesisShardResponse> {
        private final AtomicReference<ChannelDurationBehind> channelDurationBehind;
        private final InterceptorChain interceptorChain;
        private final MessageDispatcher messageDispatcher;
        private final ApplicationEventPublisher eventPublisher;

        private KinesisShardResponseConsumer(List<String> list, InterceptorChain interceptorChain, MessageDispatcher messageDispatcher, ApplicationEventPublisher applicationEventPublisher) {
            this.channelDurationBehind = new AtomicReference<>();
            this.interceptorChain = interceptorChain;
            this.messageDispatcher = messageDispatcher;
            this.eventPublisher = applicationEventPublisher;
            this.channelDurationBehind.set(ChannelDurationBehind.unknown(list));
        }

        @Override // java.util.function.Consumer
        public void accept(KinesisShardResponse kinesisShardResponse) {
            kinesisShardResponse.getMessages().forEach(message -> {
                try {
                    if (this.interceptorChain.intercept(message) != null) {
                        this.messageDispatcher.accept(message);
                    }
                } catch (Exception e) {
                    KinesisMessageLogReceiverEndpoint.LOG.error("Error processing message: " + e.getMessage(), e);
                }
            });
            this.channelDurationBehind.updateAndGet(channelDurationBehind -> {
                return ChannelDurationBehind.copyOf(channelDurationBehind).with(kinesisShardResponse.getShardName(), kinesisShardResponse.getDurationBehind()).build();
            });
            if (this.eventPublisher != null) {
                this.eventPublisher.publishEvent(MessageReceiverNotification.builder().withChannelName(kinesisShardResponse.getChannelName()).withChannelDurationBehind(this.channelDurationBehind.get()).withStatus(MessageReceiverStatus.RUNNING).withMessage("Reading from kinesis shard.").build());
            }
        }
    }

    public KinesisMessageLogReceiverEndpoint(String str, KinesisClient kinesisClient, ObjectMapper objectMapper, ApplicationEventPublisher applicationEventPublisher) {
        this(str, kinesisClient, objectMapper, applicationEventPublisher, Clock.systemDefaultZone());
    }

    public KinesisMessageLogReceiverEndpoint(String str, KinesisClient kinesisClient, ObjectMapper objectMapper, ApplicationEventPublisher applicationEventPublisher, Clock clock) {
        super(str, objectMapper, applicationEventPublisher);
        this.eventPublisher = applicationEventPublisher;
        this.kinesisMessageLogReader = new KinesisMessageLogReader(str, kinesisClient, clock);
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition channelPosition, @Nonnull Instant instant) {
        try {
            publishEvent(MessageReceiverStatus.STARTING, "Consuming messages from Kinesis.", null);
            long currentTimeMillis = System.currentTimeMillis();
            List<String> openShards = this.kinesisMessageLogReader.getOpenShards();
            publishEvent(MessageReceiverStatus.STARTED, "Received shards from Kinesis.", null);
            return this.kinesisMessageLogReader.consumeUntil(channelPosition, instant, new KinesisShardResponseConsumer(openShards, getInterceptorChain(), getMessageDispatcher(), this.eventPublisher)).exceptionally(th -> {
                LOG.error("Failed to consume from Kinesis stream {}: {}", getChannelName(), th.getMessage());
                publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kinesis: " + th.getMessage(), null);
                stop();
                throw new RuntimeException(th.getMessage(), th);
            }).thenApply(channelPosition2 -> {
                LogHelper.info(LOG, ImmutableMap.of("runtime", Long.valueOf(System.currentTimeMillis() - currentTimeMillis)), "Consume events from Kinesis", (Object[]) null);
                publishEvent(MessageReceiverStatus.FINISHED, "Finished consuming messages from Kinesis", null);
                return channelPosition2;
            });
        } catch (Exception e) {
            LOG.error("Failed to consume from Kinesis stream {}: {}", getChannelName(), e.getMessage());
            publishEvent(MessageReceiverStatus.FAILED, "Failed to consume messages from Kinesis: " + e.getMessage(), null);
            stop();
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void stop() {
        LOG.info("Channel {} received stop signal.", getChannelName());
        this.kinesisMessageLogReader.stop();
    }

    @VisibleForTesting
    List<KinesisShardReader> getCurrentKinesisShards() {
        return this.kinesisMessageLogReader.getCurrentKinesisShards();
    }
}
