package io.codemonastery.dropwizard.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import com.google.common.base.Preconditions;
import io.codemonastery.dropwizard.kinesis.EventDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/codemonastery/dropwizard/kinesis/consumer/RecordProcessor.class */
public final class RecordProcessor<E> implements IRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class);
    private final EventDecoder<E> decoder;
    private final EventConsumer<E> processor;

    public RecordProcessor(EventDecoder<E> eventDecoder, EventConsumer<E> eventConsumer) {
        Preconditions.checkNotNull(eventDecoder, "decoder cannot be null");
        Preconditions.checkNotNull(eventConsumer, "processor cannot be null");
        this.decoder = eventDecoder;
        this.processor = eventConsumer;
    }

    public void initialize(InitializationInput initializationInput) {
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        Record record = null;
        for (Record record2 : processRecordsInput.getRecords()) {
            try {
                E decode = this.decoder.decode(record2.getData());
                if (decode != null) {
                    boolean z = false;
                    try {
                        z = this.processor.process(decode);
                    } catch (Exception e) {
                        LOG.error("Unhandled exception processing event" + decode, e);
                    }
                    if (!z) {
                        break;
                    } else {
                        record = record2;
                    }
                } else {
                    record = record2;
                }
            } catch (Exception e2) {
                LOG.error("Unexpected exception decoding event", e2);
            }
        }
        if (record != null) {
            try {
                processRecordsInput.getCheckpointer().checkpoint(record);
            } catch (ShutdownException e3) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Abandoning checkpoint because processor was shutdown");
                }
            } catch (Exception e4) {
                LOG.error("Could not checkpoint because of unexpected exception", e4);
            }
        }
    }

    public void shutdown(ShutdownInput shutdownInput) {
    }
}
