package de.otto.synapse.endpoint.sender.aws;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.logging.LogHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/aws/RetryPutRecordsKinesisClient.class */
public class RetryPutRecordsKinesisClient {
    private static final Logger LOG = LoggerFactory.getLogger(RetryPutRecordsKinesisClient.class);
    private static final int MAX_RETRY_COUNT = 3;
    private final KinesisClient kinesisClient;
    private final boolean waitBeforeRetry;

    public RetryPutRecordsKinesisClient(KinesisClient kinesisClient) {
        this(kinesisClient, true);
    }

    public RetryPutRecordsKinesisClient(KinesisClient kinesisClient, boolean z) {
        this.kinesisClient = kinesisClient;
        this.waitBeforeRetry = z;
    }

    public void putRecords(Supplier<PutRecordsRequest> supplier) {
        PutRecordsResponse putRecords;
        long currentTimeMillis;
        int i = 0;
        PutRecordsRequest putRecordsRequest = supplier.get();
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= MAX_RETRY_COUNT) {
                return;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            try {
                putRecords = this.kinesisClient.putRecords(putRecordsRequest);
                currentTimeMillis = System.currentTimeMillis();
            } catch (SdkServiceException | SdkClientException e) {
                LogHelper.warn(LOG, ImmutableMap.of("records", String.valueOf(putRecordsRequest.records()), "recordsSize", String.valueOf(getRecordsSize(putRecordsRequest.records()))), "Failed to write events to Kinesis: %s", new Object[]{e.getMessage()});
                if (i == MAX_RETRY_COUNT) {
                    LOG.error("Failed to write events to Kinesis: {}", e);
                    throw new IllegalStateException(String.format("failed to send records after %s retries", Integer.valueOf(MAX_RETRY_COUNT)));
                }
                putRecordsRequest = supplier.get();
                if (this.waitBeforeRetry) {
                    waitDependingOnRetryStep(i);
                }
            }
            if (putRecords.failedRecordCount().intValue() == 0) {
                LogHelper.trace(LOG, ImmutableMap.of("runtime", Long.valueOf(currentTimeMillis - currentTimeMillis2), "shardName", String.join(",", getShardIds(putRecords.records()))), "Write events to Kinesis", (Object[]) null);
                return;
            }
            LOG.warn("Failed to write events to Kinesis: {}", putRecords.toString());
            putRecordsRequest = (PutRecordsRequest) PutRecordsRequest.builder().records(findFailedRecords(putRecordsRequest, putRecords)).streamName(putRecordsRequest.streamName()).build();
            if (this.waitBeforeRetry) {
                waitDependingOnRetryStep(i);
            }
            if (i == MAX_RETRY_COUNT) {
                LOG.error("Failed to write events to Kinesis: {}", putRecords.toString());
                throw new IllegalStateException(String.format("failed to send records after %s retries", Integer.valueOf(MAX_RETRY_COUNT)));
            }
        }
    }

    private List<String> getShardIds(List<PutRecordsResultEntry> list) {
        return (List) list.stream().map(putRecordsResultEntry -> {
            return putRecordsResultEntry.shardId();
        }).collect(Collectors.toList());
    }

    private long getRecordsSize(List<PutRecordsRequestEntry> list) {
        return list.stream().map(putRecordsRequestEntry -> {
            return Integer.valueOf(putRecordsRequestEntry.data().position());
        }).mapToInt((v0) -> {
            return v0.intValue();
        }).sum();
    }

    private List<PutRecordsRequestEntry> findFailedRecords(PutRecordsRequest putRecordsRequest, PutRecordsResponse putRecordsResponse) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < putRecordsResponse.records().size(); i++) {
            if ("ProvisionedThroughputExceededException".equals(((PutRecordsResultEntry) putRecordsResponse.records().get(i)).errorCode())) {
                arrayList.add(putRecordsRequest.records().get(i));
            }
        }
        return arrayList;
    }

    private void waitDependingOnRetryStep(int i) {
        try {
            Thread.sleep(((long) Math.pow(2.0d, i)) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
