package de.otto.synapse.endpoint.receiver.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ChannelResponse;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.EndpointType;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.message.TextMessage;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:de/otto/synapse/endpoint/receiver/kafka/KafkaRecordsConsumer.class */
public class KafkaRecordsConsumer implements Function<ConsumerRecords<String, String>, ChannelResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsConsumer.class);
    public static final Duration UNKNOWN_DURATION_BEHIND = Duration.ofMillis(Long.MAX_VALUE);
    private final String channelName;
    private final MessageInterceptorRegistry interceptorRegistry;
    private final MessageDispatcher messageDispatcher;
    private final Supplier<Set<String>> currentShardsSupplier;
    private final ChannelDurationBehindHandler durationBehindHandler;
    private final KafkaDecoder decoder;
    private ChannelPosition currentChannelPosition;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaRecordsConsumer(String str, ChannelPosition channelPosition, MessageInterceptorRegistry messageInterceptorRegistry, MessageDispatcher messageDispatcher, ChannelDurationBehindHandler channelDurationBehindHandler, Supplier<Set<String>> supplier, KafkaDecoder kafkaDecoder) {
        this.channelName = str;
        this.currentChannelPosition = channelPosition;
        this.interceptorRegistry = messageInterceptorRegistry;
        this.messageDispatcher = messageDispatcher;
        this.currentShardsSupplier = supplier;
        this.durationBehindHandler = channelDurationBehindHandler;
        this.decoder = kafkaDecoder;
    }

    @Override // java.util.function.Function
    public ChannelResponse apply(ConsumerRecords<String, String> consumerRecords) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        consumerRecords.forEach(consumerRecord -> {
            try {
                String str = "" + consumerRecord.partition();
                TextMessage apply = this.decoder.apply((ConsumerRecord<String, String>) consumerRecord);
                LOG.debug("Processing message " + apply.getKey());
                TextMessage intercept = this.interceptorRegistry.getInterceptorChain(this.channelName, EndpointType.RECEIVER).intercept(apply);
                newHashMap2.put(str, toShardPosition(consumerRecord));
                if (intercept != null) {
                    this.messageDispatcher.accept(intercept);
                    newHashMap.compute(str, (str2, builder) -> {
                        return builder != null ? builder.add(intercept) : ImmutableList.builder().add(intercept);
                    });
                } else {
                    LOG.debug("Message {} dropped by interceptor", apply.getKey());
                }
            } catch (Exception e) {
                LOG.error("Error processing message: " + e.getMessage(), e);
            }
        });
        ImmutableMap<String, Duration> updateAndGetDurationBehind = updateAndGetDurationBehind(consumerRecords);
        updateCurrentChannelPosition(newHashMap2.values());
        return ChannelResponse.channelResponse(this.channelName, (ImmutableList) currentShardPositions().stream().map(shardPosition -> {
            String shardName = shardPosition.shardName();
            return ShardResponse.shardResponse(shardPosition, (Duration) updateAndGetDurationBehind.getOrDefault(shardName, UNKNOWN_DURATION_BEHIND), ((ImmutableList.Builder) newHashMap.getOrDefault(shardName, ImmutableList.builder())).build());
        }).collect(ImmutableList.toImmutableList()));
    }

    private ShardPosition toShardPosition(ConsumerRecord<String, String> consumerRecord) {
        return ShardPosition.fromPositionAndTimestamp("" + consumerRecord.partition(), "" + consumerRecord.offset(), Instant.ofEpochMilli(consumerRecord.timestamp()));
    }

    private ImmutableList<ShardPosition> currentShardPositions() {
        return (ImmutableList) this.currentShardsSupplier.get().stream().map(str -> {
            return this.currentChannelPosition.shard(str);
        }).collect(ImmutableList.toImmutableList());
    }

    private void updateCurrentChannelPosition(Collection<ShardPosition> collection) {
        collection.forEach(shardPosition -> {
            this.currentChannelPosition = ChannelPosition.merge(this.currentChannelPosition, shardPosition);
        });
    }

    private ImmutableMap<String, Duration> updateAndGetDurationBehind(ConsumerRecords<String, String> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) Iterables.getLast(consumerRecords.records(topicPartition));
            this.durationBehindHandler.update(topicPartition, consumerRecord.offset(), Instant.ofEpochMilli(consumerRecord.timestamp()));
        }
        return this.durationBehindHandler.getChannelDurationBehind().getShardDurationsBehind();
    }
}
