package pro.chenggang.project.reactive.mybatis.support.r2dbc.executor;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import java.util.function.Function;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.session.RowBounds;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.MybatisReactiveContextManager;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.connection.ConnectionCloseHolder;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.delegate.R2dbcMybatisConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:pro/chenggang/project/reactive/mybatis/support/r2dbc/executor/AbstractReactiveMybatisExecutor.class */
public abstract class AbstractReactiveMybatisExecutor implements ReactiveMybatisExecutor {
    private static final Log log = LogFactory.getLog(AbstractReactiveMybatisExecutor.class);
    protected final R2dbcMybatisConfiguration configuration;
    protected final ConnectionFactory connectionFactory;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractReactiveMybatisExecutor(R2dbcMybatisConfiguration r2dbcMybatisConfiguration, ConnectionFactory connectionFactory) {
        this.configuration = r2dbcMybatisConfiguration;
        this.connectionFactory = connectionFactory;
    }

    @Override // pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor
    public Mono<Integer> update(MappedStatement mappedStatement, Object obj) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setDirty();
            return inConnection(this.connectionFactory, connection -> {
                return doUpdateWithConnection(connection, mappedStatement, obj);
            });
        });
    }

    @Override // pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor
    public <E> Flux<E> query(MappedStatement mappedStatement, Object obj, RowBounds rowBounds) {
        return inConnectionMany(this.connectionFactory, connection -> {
            return doQueryWithConnection(connection, mappedStatement, obj, rowBounds);
        });
    }

    @Override // pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor
    public Mono<Void> commit(boolean z) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceCommit(reactiveExecutorContext.isDirty() || z);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> {
                return Mono.from(connection.close());
            }).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    @Override // pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor
    public Mono<Void> rollback(boolean z) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceRollback(reactiveExecutorContext.isDirty() || z);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> {
                return Mono.from(connection.close());
            }).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    @Override // pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor
    public Mono<Void> close(boolean z) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceRollback(z);
            reactiveExecutorContext.setRequireClosed(true);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> {
                return Mono.from(connection.close());
            }).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    protected abstract Mono<Integer> doUpdateWithConnection(Connection connection, MappedStatement mappedStatement, Object obj);

    protected abstract <E> Flux<E> doQueryWithConnection(Connection connection, MappedStatement mappedStatement, Object obj, RowBounds rowBounds);

    protected <T> Mono<T> inConnection(ConnectionFactory connectionFactory, Function<Connection, Mono<T>> function) {
        return Mono.usingWhen(MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            return Mono.from(connectionFactory.create()).doOnNext(connection -> {
                log.debug("Execute Statement With Mono,Get Connection [" + connection + "] From Connection Factory ");
            });
        }).map(connection -> {
            return new ConnectionCloseHolder(connection, this::closeConnection);
        }), connectionCloseHolder -> {
            return (Mono) function.apply(connectionCloseHolder.getTarget());
        }, (v0) -> {
            return v0.close();
        }, (connectionCloseHolder2, th) -> {
            return connectionCloseHolder2.close();
        }, (v0) -> {
            return v0.close();
        });
    }

    protected <T> Flux<T> inConnectionMany(ConnectionFactory connectionFactory, Function<Connection, Flux<T>> function) {
        return Flux.usingWhen(MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            return Mono.from(connectionFactory.create()).doOnNext(connection -> {
                log.debug("Execute Statement With Flux,Get Connection [" + connection + "] From Connection Factory ");
            });
        }).map(connection -> {
            return new ConnectionCloseHolder(connection, this::closeConnection);
        }), connectionCloseHolder -> {
            return (Flux) function.apply(connectionCloseHolder.getTarget());
        }, (v0) -> {
            return v0.close();
        }, (connectionCloseHolder2, th) -> {
            return connectionCloseHolder2.close();
        }, (v0) -> {
            return v0.close();
        });
    }

    protected Mono<Void> closeConnection(Connection connection) {
        return Mono.from(connection.close()).onErrorResume(th -> {
            return Mono.from(connection.close());
        });
    }
}
