package de.otto.synapse.endpoint.sender;

import de.otto.synapse.endpoint.AbstractMessageEndpoint;
import de.otto.synapse.endpoint.EndpointType;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.MessageTranslator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.class */
public abstract class AbstractMessageSenderEndpoint extends AbstractMessageEndpoint implements MessageSenderEndpoint {
    public static final int BATCH_SENDER_TIMEOUT = 500;
    private final MessageTranslator<TextMessage> messageTranslator;

    public AbstractMessageSenderEndpoint(@Nonnull String str, @Nonnull MessageInterceptorRegistry messageInterceptorRegistry, @Nonnull MessageTranslator<TextMessage> messageTranslator) {
        super(str, messageInterceptorRegistry);
        this.messageTranslator = messageTranslator;
    }

    @Override // de.otto.synapse.endpoint.sender.MessageSender
    public final CompletableFuture<Void> send(@Nonnull Message<?> message) {
        TextMessage intercept = intercept(this.messageTranslator.apply(message));
        return intercept != null ? doSend(intercept) : CompletableFuture.completedFuture(null);
    }

    @Override // de.otto.synapse.endpoint.sender.MessageSender
    public final CompletableFuture<Void> sendBatch(@Nonnull Stream<? extends Message<?>> stream) {
        MessageTranslator<TextMessage> messageTranslator = this.messageTranslator;
        Objects.requireNonNull(messageTranslator);
        return doSendBatch(stream.map(messageTranslator::apply).map(this::intercept).filter((v0) -> {
            return Objects.nonNull(v0);
        }));
    }

    @Override // de.otto.synapse.endpoint.MessageEndpoint
    @Nonnull
    public final EndpointType getEndpointType() {
        return EndpointType.SENDER;
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<TextMessage> stream) {
        return CompletableFuture.allOf((CompletableFuture[]) stream.map(this::doSend).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    protected abstract CompletableFuture<Void> doSend(@Nonnull TextMessage textMessage);

    public MessageFormat getMessageFormat() {
        return MessageFormat.defaultMessageFormat();
    }
}
