package de.otto.synapse.endpoint.receiver.aws;

import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.logging.LogHelper;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:de/otto/synapse/endpoint/receiver/aws/KinesisShardIterator.class */
public class KinesisShardIterator {
    public static final String POISON_SHARD_ITER = "__synapse__poison__iter";
    private static final int RETRY_MAX_ATTEMPTS = 16;
    private static final int RETRY_BACK_OFF_POLICY_INITIAL_INTERVAL = 1000;
    private static final int RETRY_BACK_OFF_POLICY_MAX_INTERVAL = 64000;
    private static final double RETRY_BACK_OFF_POLICY_MULTIPLIER = 2.0d;
    private final KinesisClient kinesisClient;
    private final String channelName;
    private String id;
    private ShardPosition shardPosition;
    private final int fetchRecordLimit;
    private final RetryTemplate retryTemplate;
    private final AtomicBoolean stopSignal;
    private static final Logger LOG = LoggerFactory.getLogger(KinesisShardIterator.class);
    public static final Integer FETCH_RECORDS_LIMIT = 10000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: de.otto.synapse.endpoint.receiver.aws.KinesisShardIterator$1, reason: invalid class name */
    /* loaded from: input_file:de/otto/synapse/endpoint/receiver/aws/KinesisShardIterator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$de$otto$synapse$channel$StartFrom = new int[StartFrom.values().length];

        static {
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.HORIZON.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.POSITION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.AT_POSITION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$de$otto$synapse$channel$StartFrom[StartFrom.TIMESTAMP.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:de/otto/synapse/endpoint/receiver/aws/KinesisShardIterator$LogRetryListener.class */
    public class LogRetryListener extends RetryListenerSupport {
        LogRetryListener() {
        }

        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            LogHelper.warn(KinesisShardIterator.LOG, ImmutableMap.of("retryCount", Integer.valueOf(retryContext.getRetryCount()), "errorMessage", Strings.nullToEmpty(th.getMessage())), "fail to iterate on shard", (Object[]) null);
        }
    }

    public KinesisShardIterator(@Nonnull KinesisClient kinesisClient, @Nonnull String str, @Nonnull ShardPosition shardPosition) {
        this(kinesisClient, str, shardPosition, FETCH_RECORDS_LIMIT.intValue());
    }

    public KinesisShardIterator(@Nonnull KinesisClient kinesisClient, @Nonnull String str, @Nonnull ShardPosition shardPosition, int i) {
        this.stopSignal = new AtomicBoolean(false);
        this.kinesisClient = kinesisClient;
        this.fetchRecordLimit = i;
        this.retryTemplate = createRetryTemplate();
        this.channelName = str;
        this.shardPosition = shardPosition;
        this.id = kinesisClient.getShardIterator(buildIteratorShardRequest(shardPosition)).shardIterator();
    }

    public String getId() {
        return this.id;
    }

    @Nonnull
    public ShardPosition getShardPosition() {
        return this.shardPosition;
    }

    public int getFetchRecordLimit() {
        return this.fetchRecordLimit;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPoison() {
        return this.id.equals(POISON_SHARD_ITER);
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    public KinesisShardResponse next() {
        try {
            Stopwatch createStarted = Stopwatch.createStarted();
            return new KinesisShardResponse(this.channelName, this.shardPosition, (GetRecordsResponse) this.retryTemplate.execute(retryContext -> {
                if (this.stopSignal.get()) {
                    retryContext.setExhaustedOnly();
                }
                return tryNext();
            }), createStarted.elapsed(TimeUnit.MILLISECONDS));
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private GetShardIteratorRequest buildIteratorShardRequest(ShardPosition shardPosition) {
        GetShardIteratorRequest.Builder streamName = GetShardIteratorRequest.builder().shardId(shardPosition.shardName()).streamName(this.channelName);
        switch (AnonymousClass1.$SwitchMap$de$otto$synapse$channel$StartFrom[shardPosition.startFrom().ordinal()]) {
            case 1:
                streamName.shardIteratorType(ShardIteratorType.TRIM_HORIZON);
                break;
            case 2:
                streamName.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
                streamName.startingSequenceNumber(shardPosition.position());
                break;
            case 3:
                streamName.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
                streamName.startingSequenceNumber(shardPosition.position());
                break;
            case 4:
                streamName.shardIteratorType(ShardIteratorType.AT_TIMESTAMP).timestamp(shardPosition.timestamp());
                break;
        }
        return (GetShardIteratorRequest) streamName.build();
    }

    private GetRecordsResponse tryNext() {
        GetRecordsResponse records = this.kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(this.id).limit(Integer.valueOf(this.fetchRecordLimit)).build());
        this.id = records.nextShardIterator();
        LOG.debug("next() with id " + this.id + " returned " + records.records().size() + " records");
        if (!records.records().isEmpty()) {
            this.shardPosition = ShardPosition.fromPosition(this.shardPosition.shardName(), ((Record) records.records().get(records.records().size() - 1)).sequenceNumber());
        }
        return records;
    }

    private RetryTemplate createRetryTemplate() {
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(RETRY_MAX_ATTEMPTS, ImmutableMap.of(KinesisException.class, true, SdkClientException.class, true));
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(1000L);
        exponentialBackOffPolicy.setMaxInterval(64000L);
        exponentialBackOffPolicy.setMultiplier(RETRY_BACK_OFF_POLICY_MULTIPLIER);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.registerListener(new LogRetryListener());
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        return retryTemplate;
    }
}
