package org.springframework.integration.jdbc.channel;

import java.util.Optional;
import java.util.concurrent.Executor;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.class */
public class PostgresSubscribableChannel extends AbstractSubscribableChannel implements PostgresChannelMessageTableSubscriber.Subscription {
    private static final LogAccessor LOGGER = new LogAccessor(PostgresSubscribableChannel.class);
    private static final Optional<?> FALLBACK_STUB = Optional.of(new Object());
    private final JdbcChannelMessageStore jdbcChannelMessageStore;
    private final Object groupId;
    private final PostgresChannelMessageTableSubscriber messageTableSubscriber;
    private TransactionTemplate transactionTemplate;
    private Executor executor;
    private volatile boolean hasHandlers;
    private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();
    private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();
    private ErrorHandler errorHandler = ReflectionUtils::rethrowRuntimeException;

    public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageStore, Object obj, PostgresChannelMessageTableSubscriber postgresChannelMessageTableSubscriber) {
        Assert.notNull(jdbcChannelMessageStore, "A jdbcChannelMessageStore must be provided.");
        Assert.notNull(obj, "A groupId must be set.");
        Assert.notNull(postgresChannelMessageTableSubscriber, "A messageTableSubscriber must be set.");
        this.jdbcChannelMessageStore = jdbcChannelMessageStore;
        this.groupId = obj;
        this.messageTableSubscriber = postgresChannelMessageTableSubscriber;
    }

    public void setDispatcherExecutor(Executor executor) {
        Assert.notNull(executor, "An executor must be provided.");
        this.executor = executor;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        Assert.notNull(platformTransactionManager, "A platform transaction manager must be provided.");
        this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.notNull(retryTemplate, "A retry template must be provided.");
        this.retryTemplate = retryTemplate;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        Assert.notNull(errorHandler, "'errorHandler' must not be null.");
        this.errorHandler = errorHandler;
    }

    public boolean subscribe(MessageHandler messageHandler) {
        boolean subscribe = super.subscribe(messageHandler);
        if (this.dispatcher.getHandlerCount() == 1) {
            this.messageTableSubscriber.subscribe(this);
            this.hasHandlers = true;
            notifyUpdate();
        }
        return subscribe;
    }

    public boolean unsubscribe(MessageHandler messageHandler) {
        boolean unsubscribe = super.unsubscribe(messageHandler);
        if (this.dispatcher.getHandlerCount() == 0) {
            this.messageTableSubscriber.unsubscribe(this);
            this.hasHandlers = false;
        }
        return unsubscribe;
    }

    protected MessageDispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected boolean doSend(Message<?> message, long j) {
        this.jdbcChannelMessageStore.addMessageToGroup(this.groupId, message);
        return true;
    }

    @Override // org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber.Subscription
    public void notifyUpdate() {
        this.executor.execute(() -> {
            do {
            } while (pollAndDispatchMessage().isPresent());
        });
    }

    private Optional<?> pollAndDispatchMessage() {
        try {
            return doPollAndDispatchMessage();
        } catch (Exception e) {
            try {
                this.errorHandler.handleError(e);
            } catch (Exception e2) {
                LOGGER.error(e, "Exception during message dispatch");
            }
            return FALLBACK_STUB;
        }
    }

    private Optional<?> doPollAndDispatchMessage() {
        return this.hasHandlers ? this.transactionTemplate != null ? (Optional) this.retryTemplate.execute(retryContext -> {
            return (Optional) this.transactionTemplate.execute(transactionStatus -> {
                return pollMessage().filter(message -> {
                    if (this.hasHandlers) {
                        return true;
                    }
                    transactionStatus.setRollbackOnly();
                    return false;
                }).map(this::dispatch);
            });
        }) : pollMessage().map(message -> {
            return (Message) this.retryTemplate.execute(retryContext2 -> {
                return dispatch(message);
            });
        }) : Optional.empty();
    }

    private Optional<Message<?>> pollMessage() {
        return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
    }

    private Message<?> dispatch(Message<?> message) {
        this.dispatcher.dispatch(message);
        return message;
    }

    @Override // org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber.Subscription
    public String getRegion() {
        return this.jdbcChannelMessageStore.getRegion();
    }

    @Override // org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber.Subscription
    public Object getGroupId() {
        return this.groupId;
    }
}
