package io.codemonastery.dropwizard.kinesis.producer;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.codemonastery.dropwizard.kinesis.EventEncoder;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/codemonastery/dropwizard/kinesis/producer/BufferedProducer.class */
public final class BufferedProducer<E> extends Producer<E> {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedProducer.class);
    private final AmazonKinesis kinesis;
    private final String streamName;
    private final ExecutorService deliveryExecutor;
    private final BufferedProducerMetrics bufferedMetrics;
    private final PutRecordsBuffer buffer;

    public BufferedProducer(AmazonKinesis amazonKinesis, String str, Function<E, String> function, EventEncoder<E> eventEncoder, int i, ScheduledExecutorService scheduledExecutorService, BufferedProducerMetrics bufferedProducerMetrics) {
        super(function, eventEncoder, bufferedProducerMetrics);
        Preconditions.checkNotNull(amazonKinesis, "kinesis cannot be null");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "must have a stream name");
        Preconditions.checkArgument(i > 0, "maxBufferSize must be positive");
        Preconditions.checkNotNull(scheduledExecutorService, "must have a delivery executor");
        this.kinesis = amazonKinesis;
        this.streamName = str;
        this.deliveryExecutor = scheduledExecutorService;
        this.bufferedMetrics = bufferedProducerMetrics;
        this.buffer = new PutRecordsBuffer(i);
    }

    public void flush() {
        try {
            List<PutRecordsRequestEntry> drain = this.buffer.drain();
            this.bufferedMetrics.bufferRemove(drain.size());
            if (!drain.isEmpty()) {
                this.deliveryExecutor.submit(() -> {
                    putRecords(drain);
                });
            }
        } catch (Exception e) {
            LOG.error("unexpected error while flushing", e);
        }
    }

    @Override // io.codemonastery.dropwizard.kinesis.producer.Producer
    public void stop() throws Exception {
        synchronized (this.buffer) {
            super.stop();
            putRecords(this.buffer.drain());
        }
    }

    @Override // io.codemonastery.dropwizard.kinesis.producer.Producer
    protected void send(PutRecordsRequestEntry putRecordsRequestEntry) {
        List<PutRecordsRequestEntry> add = this.buffer.add(putRecordsRequestEntry);
        if (add != null) {
            this.bufferedMetrics.bufferRemove(add.size());
        }
        this.bufferedMetrics.bufferPut(1);
        if (add != null) {
            this.deliveryExecutor.submit(() -> {
                putRecords(add);
            });
        }
    }

    private void putRecords(List<PutRecordsRequestEntry> list) {
        int size = list.size();
        try {
            try {
                Closeable time = this.metrics.time();
                Throwable th = null;
                try {
                    try {
                        PutRecordsResult putRecords = this.kinesis.putRecords(new PutRecordsRequest().withRecords(list).withStreamName(this.streamName));
                        int intValue = putRecords.getFailedRecordCount().intValue();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(String.format("Put %d records to stream %s, %d failed", Integer.valueOf(putRecords.getRecords().size()), this.streamName, Integer.valueOf(intValue)));
                        }
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                        this.metrics.sent(list.size() - intValue, intValue);
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (time != null) {
                        if (th != null) {
                            try {
                                time.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                this.metrics.sent(list.size() - size, size);
                throw th6;
            }
        } catch (Exception e) {
            LOG.error("Unexpected error while putting records", e);
            this.metrics.sent(list.size() - size, size);
        }
    }
}
