package de.otto.synapse.endpoint.sender.aws;

import com.google.common.collect.ImmutableMap;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.translator.MessageTranslator;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SQSAsyncClient;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/aws/SqsMessageSender.class */
public class SqsMessageSender extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(SqsMessageSender.class);
    public static final String MSG_KEY_ATTR = "synapse_msg_key";
    public static final String MSG_SENDER_ATTR = "synapse_msg_sender";
    private final String queueUrl;
    private final String messageSender;
    private final SQSAsyncClient sqsAsyncClient;

    public SqsMessageSender(String str, String str2, MessageTranslator<String> messageTranslator, SQSAsyncClient sQSAsyncClient, String str3) {
        super(str, messageTranslator);
        this.queueUrl = str2;
        this.sqsAsyncClient = sQSAsyncClient;
        this.messageSender = str3;
    }

    protected void doSend(@Nonnull Message<String> message) {
        this.sqsAsyncClient.sendMessage((SendMessageRequest) SendMessageRequest.builder().queueUrl(this.queueUrl).messageAttributes(ImmutableMap.of("synapse_msg_key", MessageAttributeValue.builder().dataType("String").stringValue(message.getKey()).build(), MSG_SENDER_ATTR, MessageAttributeValue.builder().dataType("String").stringValue(this.messageSender).build())).messageBody((String) message.getPayload()).build()).whenComplete((sendMessageResponse, th) -> {
            if (th != null) {
                LOG.error(String.format("Failed to send message %s", message), th);
            }
            if (sendMessageResponse != null) {
                LOG.debug("Successfully sent message ", sendMessageResponse);
            }
        });
    }

    protected void doSendBatch(@Nonnull Stream<Message<String>> stream) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.sqsAsyncClient.sendMessageBatch((SendMessageBatchRequest) SendMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((Collection) stream.map(message -> {
            return (SendMessageBatchRequestEntry) SendMessageBatchRequestEntry.builder().id(String.valueOf(atomicInteger.getAndIncrement())).messageAttributes(ImmutableMap.of("synapse_msg_key", MessageAttributeValue.builder().dataType("String").stringValue(message.getKey()).build())).messageBody((String) message.getPayload()).build();
        }).collect(Collectors.toList())).build()).whenComplete((sendMessageBatchResponse, th) -> {
            if (th != null) {
                LOG.error("Failed to send batch of messages: " + th.getMessage(), th);
            }
            if (sendMessageBatchResponse != null) {
                if (!sendMessageBatchResponse.successful().isEmpty()) {
                    LOG.debug("Successfully sent {} messages in a batch", Integer.valueOf(sendMessageBatchResponse.successful().size()));
                }
                if (sendMessageBatchResponse.failed().isEmpty()) {
                    return;
                }
                LOG.error("Failed to sent {} messages in a batch: {}", Integer.valueOf(sendMessageBatchResponse.failed().size()), sendMessageBatchResponse.failed());
            }
        });
    }
}
