package de.tschumacher.sqsservice.consumer;

import com.amazonaws.services.sqs.model.Message;
import de.tschumacher.sqsservice.SQSQueue;
import de.tschumacher.sqsservice.supplier.SQSMessageFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/tschumacher/sqsservice/consumer/SQSMessageReceiver.class */
public class SQSMessageReceiver<F> {
    private static final Logger logger = LoggerFactory.getLogger(SQSMessageReceiver.class);
    private static final int WORKER_COUNT = 5;
    protected static final int MAX_RETRYS = 5;
    protected static final int RETRY_SECONDS = 120;
    private final SQSMessageHandler<F> handler;
    private final Runnable worker;
    final SQSMessageFactory<F> factory;
    private boolean running = false;
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);

    public SQSMessageReceiver(SQSQueue sQSQueue, SQSMessageHandler<F> sQSMessageHandler, SQSMessageFactory<F> sQSMessageFactory) {
        this.handler = sQSMessageHandler;
        this.worker = newWorker(sQSQueue);
        this.factory = sQSMessageFactory;
        start();
    }

    public void start() {
        this.running = true;
        for (int i = 0; i < 5; i++) {
            this.executorService.submit(this.worker);
        }
    }

    public void stop() throws InterruptedException {
        this.running = false;
    }

    private Runnable newWorker(final SQSQueue sQSQueue) {
        return new Runnable() { // from class: de.tschumacher.sqsservice.consumer.SQSMessageReceiver.1
            @Override // java.lang.Runnable
            public void run() {
                while (SQSMessageReceiver.this.running) {
                    try {
                        Message receiveMessage = sQSQueue.receiveMessage();
                        if (receiveMessage != null) {
                            try {
                                SQSMessageReceiver.this.handler.receivedMessage(sQSQueue, SQSMessageReceiver.this.factory.createMessage(receiveMessage.getBody()));
                                sQSQueue.deleteMessage(receiveMessage.getReceiptHandle());
                            } catch (Throwable th) {
                                SQSMessageReceiver.logger.error("could not process message", th);
                                sQSQueue.changeMessageVisibility(receiveMessage.getReceiptHandle(), SQSMessageReceiver.RETRY_SECONDS);
                            }
                        }
                    } catch (Throwable th2) {
                        SQSMessageReceiver.logger.error("could not handle message", th2);
                    }
                }
            }
        };
    }

    protected void finalize() throws Throwable {
        stop();
        super.finalize();
    }
}
