package de.otto.edison.eventsourcing.kinesis;

import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:de/otto/edison/eventsourcing/kinesis/KinesisShard.class */
public class KinesisShard {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShard.class);
    private final String shardId;
    private final KinesisClient kinesisClient;
    private final KinesisStream kinesisStream;

    public KinesisShard(String str, KinesisStream kinesisStream, KinesisClient kinesisClient) {
        this.shardId = str;
        this.kinesisStream = kinesisStream;
        this.kinesisClient = kinesisClient;
    }

    public String getShardId() {
        return this.shardId;
    }

    public KinesisShardIterator retrieveIterator(String str) {
        GetShardIteratorResponse shardIterator;
        try {
            shardIterator = this.kinesisClient.getShardIterator(buildIteratorShardRequest(str));
        } catch (InvalidArgumentException e) {
            LOG.error(String.format("invalidShardSequenceNumber in Snapshot %s/%s - reading from HORIZON", this.kinesisStream.getStreamName(), this.shardId));
            shardIterator = this.kinesisClient.getShardIterator(buildIteratorShardRequest("0"));
        }
        return new KinesisShardIterator(this.kinesisClient, shardIterator.shardIterator());
    }

    private GetShardIteratorRequest buildIteratorShardRequest(String str) {
        GetShardIteratorRequest.Builder streamName = GetShardIteratorRequest.builder().shardId(this.shardId).streamName(this.kinesisStream.getStreamName());
        if (str == null || str.equals("0")) {
            streamName.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
        } else {
            streamName.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
            streamName.startingSequenceNumber(str);
        }
        return (GetShardIteratorRequest) streamName.build();
    }

    public ShardPosition consumeRecordsAndReturnLastSeqNumber(String str, BiFunction<Long, Record, Boolean> biFunction, BiConsumer<Long, Record> biConsumer) {
        boolean booleanValue;
        LOG.info("Reading from stream {}, shard {} with starting sequence number {}", new Object[]{this.kinesisStream.getStreamName(), this.shardId, str});
        KinesisShardIterator retrieveIterator = retrieveIterator(str);
        String str2 = str;
        do {
            GetRecordsResponse next = retrieveIterator.next();
            booleanValue = biFunction.apply(null, null).booleanValue();
            if (!isEmptyStream(next)) {
                Long millisBehindLatest = next.millisBehindLatest();
                for (Record record : next.records()) {
                    biConsumer.accept(millisBehindLatest, record);
                    booleanValue = biFunction.apply(millisBehindLatest, record).booleanValue();
                    str2 = record.sequenceNumber();
                }
                logInfo(this.kinesisStream.getStreamName(), next, Duration.ofMillis(millisBehindLatest.longValue()));
            }
            if (!booleanValue) {
                booleanValue = waitABit();
            }
        } while (!booleanValue);
        LOG.info("Terminating event source for stream {}", this.kinesisStream.getStreamName());
        return new ShardPosition(this.shardId, str2);
    }

    private void logInfo(String str, GetRecordsResponse getRecordsResponse, Duration duration) {
        LOG.info("Consumed {} records from kinesis {}; behind latest: {}", new Object[]{Integer.valueOf(getRecordsResponse.records().size()), str, String.format("%s days %s hrs %s min %s sec", Long.valueOf(duration.toDays()), Long.valueOf(duration.toHours() % 24), Long.valueOf(duration.toMinutes() % 60), Long.valueOf(duration.getSeconds() % 60))});
    }

    private boolean waitABit() {
        try {
            Thread.sleep(100L);
            return false;
        } catch (InterruptedException e) {
            LOG.warn("Thread got interrupted");
            return true;
        }
    }

    private boolean isEmptyStream(GetRecordsResponse getRecordsResponse) {
        return getRecordsResponse.records().isEmpty();
    }
}
