package com.mercateo.sqs.utils.message.handling;

import com.amazonaws.services.sqs.AmazonSQS;
import com.mercateo.sqs.utils.queue.Queue;
import com.mercateo.sqs.utils.queue.QueueName;
import com.mercateo.sqs.utils.visibility.VisibilityTimeoutExtenderFactory;
import java.lang.Thread;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.class */
public class LongRunningMessageHandlerIntegrationTest {

    @Mock
    private AmazonSQS sqsClient;

    @Mock
    private FinishedMessageCallback<InputObject, String> finishedMessageCallback;

    @Mock
    private ErrorHandlingStrategy<InputObject> errorHandlingStrategy;
    private LongRunningMessageHandler<InputObject, String> uut;
    private MessageHandlingRunnableFactory messageHandlingRunnableFactory = new MessageHandlingRunnableFactory();
    private MessageWorkerWithHeaders<InputObject, String> worker = new TestWorkerWithHeaders();

    @Spy
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest$InputObject.class */
    public class InputObject {
        private boolean isRunning;
        private boolean isFinished;

        private InputObject() {
            this.isRunning = false;
            this.isFinished = false;
        }

        void start() {
            this.isRunning = true;
        }

        void stop() {
            this.isRunning = false;
            this.isFinished = true;
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public boolean isFinished() {
            return this.isFinished;
        }
    }

    /* loaded from: input_file:com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest$TestWorkerWithHeaders.class */
    private class TestWorkerWithHeaders implements MessageWorkerWithHeaders<InputObject, String> {
        private TestWorkerWithHeaders() {
        }

        public String work(InputObject inputObject, MessageHeaders messageHeaders) throws Exception {
            inputObject.start();
            Awaitility.await().until(() -> {
                return Boolean.valueOf(inputObject.isFinished());
            });
            inputObject.stop();
            return "done";
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        MockitoAnnotations.openMocks(this);
        HashMap hashMap = new HashMap();
        hashMap.put("VisibilityTimeout", "10");
        Queue queue = new Queue(new QueueName("queueName"), "queueUrl", hashMap);
        this.uut = new LongRunningMessageHandler<>(this.scheduledExecutorService, 4, 2, this.messageHandlingRunnableFactory, new VisibilityTimeoutExtenderFactory(this.sqsClient), this.worker, queue, this.finishedMessageCallback, Duration.ofMillis(1L), Duration.ZERO, this.errorHandlingStrategy);
    }

    @Test
    public void testHandleMessage_processesOneMessageAndReturns() {
        Message<InputObject> createMessage = createMessage(1);
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage);
        });
        thread.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread.isAlive());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage.getPayload()).isRunning());
        });
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1"});
    }

    @Test
    public void testHandleMessage_processesTwoMessagesAndBlocks() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(2);
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage);
        });
        thread.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread.isAlive());
        });
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        });
        thread2.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread2.getState());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage.getPayload()).isRunning());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage2.getPayload()).isRunning());
        });
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1", "messageId2"});
    }

    @Test
    public void testHandleMessage_processesFourMessagesAndFillsQueue() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(2);
        Message<InputObject> createMessage3 = createMessage(3);
        Message<InputObject> createMessage4 = createMessage(4);
        new Thread(() -> {
            this.uut.handleMessage(createMessage);
        }).start();
        new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        }).start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage.getPayload()).isRunning());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage2.getPayload()).isRunning());
        });
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage3);
        });
        thread.start();
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage4);
        });
        thread2.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread.getState());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread2.getState());
        });
        Assert.assertFalse(((InputObject) createMessage3.getPayload()).isRunning());
        Assert.assertFalse(((InputObject) createMessage4.getPayload()).isRunning());
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1", "messageId2", "messageId3", "messageId4"});
        ((ScheduledExecutorService) Mockito.verify(this.scheduledExecutorService, Mockito.times(4))).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
    }

    @Test
    public void testHandleMessage_processesSixMessageAndCrashes() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(2);
        Message<InputObject> createMessage3 = createMessage(3);
        Message<InputObject> createMessage4 = createMessage(4);
        Message<InputObject> createMessage5 = createMessage(5);
        Message<InputObject> createMessage6 = createMessage(6);
        new Thread(() -> {
            this.uut.handleMessage(createMessage);
        }).start();
        new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        }).start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage.getPayload()).isRunning());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage2.getPayload()).isRunning());
        });
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage3);
        });
        thread.start();
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage4);
        });
        thread2.start();
        Thread thread3 = new Thread(() -> {
            this.uut.handleMessage(createMessage5);
        });
        thread3.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread.getState());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread2.getState());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread3.getState());
        });
        Assertions.assertThatThrownBy(() -> {
            this.uut.handleMessage(createMessage6);
        });
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1", "messageId2", "messageId3", "messageId4", "messageId5"});
    }

    @Test
    public void testHandleMessage_performsDeduplication() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(1);
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage);
        });
        thread.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread.isAlive());
        });
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        });
        thread2.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread2.isAlive());
        });
        Assert.assertTrue(((InputObject) createMessage.getPayload()).isRunning());
        Assert.assertFalse(((InputObject) createMessage2.getPayload()).isRunning());
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1"});
        ((ScheduledExecutorService) Mockito.verify(this.scheduledExecutorService)).scheduleAtFixedRate((Runnable) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
    }

    @Test
    public void testHandleMessage_startsQueuedProcess() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(2);
        Message<InputObject> createMessage3 = createMessage(3);
        new Thread(() -> {
            this.uut.handleMessage(createMessage);
        }).start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage.getPayload()).isRunning());
        });
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        });
        thread.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((InputObject) createMessage2.getPayload()).isRunning());
        });
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage3);
        });
        thread2.start();
        ((InputObject) createMessage2.getPayload()).stop();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread.getState());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread2.getState());
        });
        Assert.assertTrue(((InputObject) createMessage3.getPayload()).isRunning());
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId1", "messageId3"});
    }

    @Test
    public void testHandleMessage_resumesWaitingThreads() {
        Message<InputObject> createMessage = createMessage(1);
        Message<InputObject> createMessage2 = createMessage(2);
        Thread thread = new Thread(() -> {
            this.uut.handleMessage(createMessage);
        });
        thread.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread.isAlive());
        });
        Thread thread2 = new Thread(() -> {
            this.uut.handleMessage(createMessage2);
        });
        thread2.start();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(Thread.State.WAITING == thread2.getState());
        });
        ((InputObject) createMessage.getPayload()).stop();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(!thread2.isAlive());
        });
        Assertions.assertThat(this.uut.getMessagesInProcessing().getBackingSet()).containsOnly(new String[]{"messageId2"});
    }

    private Message<InputObject> createMessage(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("MessageId", "messageId" + i);
        hashMap.put("ReceiptHandle", "receiptHandle" + i);
        return new GenericMessage(new InputObject(), new MessageHeaders(hashMap));
    }
}
