package de.otto.edison.eventsourcing.kinesis;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.otto.edison.eventsourcing.consumer.Event;
import de.otto.edison.eventsourcing.consumer.EventSource;
import de.otto.edison.eventsourcing.consumer.StreamPosition;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import software.amazon.awssdk.services.kinesis.model.Record;

/* loaded from: input_file:de/otto/edison/eventsourcing/kinesis/KinesisEventSource.class */
public class KinesisEventSource<T> implements EventSource<T> {
    private KinesisStream kinesisStream;
    private Function<String, T> deserializer;

    /* loaded from: input_file:de/otto/edison/eventsourcing/kinesis/KinesisEventSource$RecordConsumer.class */
    private class RecordConsumer implements BiConsumer<Long, Record> {
        private final Consumer<Event<T>> consumer;

        RecordConsumer(Consumer<Event<T>> consumer) {
            this.consumer = consumer;
        }

        @Override // java.util.function.BiConsumer
        public void accept(Long l, Record record) {
            this.consumer.accept(KinesisEventSource.this.createEvent(Duration.ofMillis(l.longValue()), record));
        }
    }

    /* loaded from: input_file:de/otto/edison/eventsourcing/kinesis/KinesisEventSource$RecordStopCondition.class */
    private class RecordStopCondition implements BiFunction<Long, Record, Boolean> {
        private final Predicate<Event<T>> stopCondition;

        RecordStopCondition(Predicate<Event<T>> predicate) {
            this.stopCondition = predicate;
        }

        @Override // java.util.function.BiFunction
        public Boolean apply(Long l, Record record) {
            return record == null ? Boolean.valueOf(this.stopCondition.test(null)) : Boolean.valueOf(this.stopCondition.test(KinesisEventSource.this.createEvent(Duration.ofMillis(l.longValue()), record)));
        }
    }

    public KinesisEventSource(Class<T> cls, ObjectMapper objectMapper, KinesisStream kinesisStream) {
        this.deserializer = str -> {
            try {
                return objectMapper.readValue(str, cls);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
        this.kinesisStream = kinesisStream;
    }

    public KinesisEventSource(Function<String, T> function, KinesisStream kinesisStream) {
        this.deserializer = function;
        this.kinesisStream = kinesisStream;
    }

    @Override // de.otto.edison.eventsourcing.consumer.EventSource
    public String getStreamName() {
        return this.kinesisStream.getStreamName();
    }

    @Override // de.otto.edison.eventsourcing.consumer.EventSource
    public StreamPosition consumeAll(StreamPosition streamPosition, Predicate<Event<T>> predicate, Consumer<Event<T>> consumer) {
        return StreamPosition.of((Map) ((Stream) this.kinesisStream.retrieveAllOpenShards().stream().parallel()).map(kinesisShard -> {
            return kinesisShard.consumeRecordsAndReturnLastSeqNumber(streamPosition.positionOf(kinesisShard.getShardId()), new RecordStopCondition(predicate), new RecordConsumer(consumer));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getShardId();
        }, (v0) -> {
            return v0.getSequenceNumber();
        })));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Event<T> createEvent(Duration duration, Record record) {
        return KinesisEvent.kinesisEvent(duration, record, byteBuffer -> {
            return this.deserializer.apply(StandardCharsets.UTF_8.decode(record.data()).toString());
        });
    }
}
