package org.apache.flink.connector.jdbc.xa;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.xa.XaFacade;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/connector/jdbc/xa/XaFacadeImpl.class */
public class XaFacadeImpl implements XaFacade {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImpl.class);
    private static final Set<Integer> TRANSIENT_ERR_CODES = new HashSet(Arrays.asList(107, -7));
    private static final Set<Integer> HEUR_ERR_CODES = new HashSet(Arrays.asList(6, 7, 8, 5));
    private static final int MAX_RECOVER_CALLS = 100;
    private final Supplier<XADataSource> dataSourceSupplier;
    private final Integer timeoutSec;
    private transient XAResource xaResource;
    private transient Connection connection;
    private transient XAConnection xaConnection;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/XaFacadeImpl$Command.class */
    public static class Command<T> {
        private final String name;
        private final Optional<Xid> xid;
        private final Callable<T> callable;
        private final Function<XAException, Optional<T>> recover;

        static Command<Object> fromRunnable(String str, Xid xid, ThrowingRunnable<XAException> throwingRunnable) {
            return fromRunnable(str, xid, throwingRunnable, xAException -> {
                throw XaFacadeImpl.wrapException(str, Optional.of(xid), xAException);
            });
        }

        static Command<Object> fromRunnableRecoverByWarn(String str, Xid xid, ThrowingRunnable<XAException> throwingRunnable, Function<XAException, Optional<String>> function) {
            return fromRunnable(str, xid, throwingRunnable, xAException -> {
                XaFacadeImpl.LOG.warn(XaFacadeImpl.formatErrorMessage(str, Optional.of(xid), Optional.of(Integer.valueOf(xAException.errorCode)), (String) ((Optional) function.apply(xAException)).orElseThrow(() -> {
                    return XaFacadeImpl.wrapException(str, Optional.of(xid), xAException);
                })));
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Command<Object> fromRunnable(String str, Xid xid, ThrowingRunnable<XAException> throwingRunnable, Consumer<XAException> consumer) {
            return new Command<>(str, (Optional<Xid>) Optional.of(xid), () -> {
                throwingRunnable.run();
                return null;
            }, xAException -> {
                consumer.accept(xAException);
                return Optional.of("");
            });
        }

        private Command(String str, Optional<Xid> optional, Callable<T> callable) {
            this(str, optional, callable, xAException -> {
                return Optional.empty();
            });
        }

        private Command(String str, Optional<Xid> optional, Callable<T> callable, Function<XAException, Optional<T>> function) {
            this.name = str;
            this.xid = optional;
            this.callable = callable;
            this.recover = function;
        }
    }

    @VisibleForTesting
    static XaFacadeImpl fromXaDataSource(XADataSource xADataSource) {
        return new XaFacadeImpl(() -> {
            return xADataSource;
        }, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public XaFacadeImpl(Supplier<XADataSource> supplier, Integer num) {
        this.dataSourceSupplier = (Supplier) Preconditions.checkNotNull(supplier);
        this.timeoutSec = num;
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void open() throws SQLException {
        Preconditions.checkState(!isOpen(), "already connected");
        this.xaConnection = this.dataSourceSupplier.get().getXAConnection();
        this.xaResource = this.xaConnection.getXAResource();
        if (this.timeoutSec != null) {
            try {
                this.xaResource.setTransactionTimeout(this.timeoutSec.intValue());
            } catch (XAException e) {
                throw new SQLException((Throwable) e);
            }
        }
        this.connection = this.xaConnection.getConnection();
        this.connection.setReadOnly(false);
        this.connection.setAutoCommit(false);
        Preconditions.checkState(!this.connection.getAutoCommit());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        try {
            this.xaConnection.close();
        } catch (SQLException e) {
            LOG.warn("unable to close XA connection", e);
        }
        this.xaResource = null;
    }

    @Override // org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider
    public Connection getConnection() {
        Preconditions.checkNotNull(this.connection);
        return this.connection;
    }

    @Override // org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider
    public boolean isConnectionValid() throws SQLException {
        return isOpen() && this.connection.isValid(this.connection.getNetworkTimeout());
    }

    @Override // org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider
    public Connection getOrEstablishConnection() throws SQLException {
        if (!isOpen()) {
            open();
        }
        return this.connection;
    }

    @Override // org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider
    public void closeConnection() {
        try {
            close();
        } catch (SQLException e) {
            LOG.warn("Connection close failed.", e);
        }
    }

    @Override // org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider
    public Connection reestablishConnection() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void start(Xid xid) {
        execute(Command.fromRunnable("start", xid, () -> {
            this.xaResource.start(xid, 0);
        }));
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void endAndPrepare(Xid xid) {
        execute(Command.fromRunnable("end", xid, () -> {
            this.xaResource.end(xid, 67108864);
        }));
        int intValue = ((Integer) execute(new Command("prepare", Optional.of(xid), () -> {
            return Integer.valueOf(this.xaResource.prepare(xid));
        }))).intValue();
        if (intValue == 3) {
            throw new XaFacade.EmptyXaTransactionException(xid);
        }
        if (intValue != 0) {
            throw new FlinkRuntimeException(formatErrorMessage("prepare", Optional.of(xid), Optional.empty(), "response: " + intValue));
        }
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void failAndRollback(Xid xid) {
        execute(Command.fromRunnable("end (fail)", xid, () -> {
            this.xaResource.end(xid, 536870912);
            this.xaResource.rollback(xid);
        }, xAException -> {
            if (xAException.errorCode >= MAX_RECOVER_CALLS) {
                rollback(xid);
            } else {
                LOG.warn(formatErrorMessage("end (fail)", Optional.of(xid), Optional.of(Integer.valueOf(xAException.errorCode)), new String[0]));
            }
        }));
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void commit(Xid xid, boolean z) {
        execute(Command.fromRunnableRecoverByWarn("commit", xid, () -> {
            this.xaResource.commit(xid, false);
        }, xAException -> {
            return buildCommitErrorDesc(xAException, z);
        }));
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public void rollback(Xid xid) {
        execute(Command.fromRunnableRecoverByWarn("rollback", xid, () -> {
            this.xaResource.rollback(xid);
        }, this::buildRollbackErrorDesc));
    }

    private void forget(Xid xid) {
        execute(Command.fromRunnableRecoverByWarn("forget", xid, () -> {
            this.xaResource.forget(xid);
        }, xAException -> {
            return Optional.of("manual cleanup may be required");
        }));
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public Collection<Xid> recover() {
        return (Collection) execute(new Command("recover", Optional.empty(), () -> {
            List<Xid> recover = recover(16777216);
            int i = 0;
            while (recover.addAll(recover(0))) {
                try {
                    Preconditions.checkState(i < MAX_RECOVER_CALLS, "too many xa_recover() calls");
                    i++;
                } finally {
                    recover(8388608);
                }
            }
            return recover;
        }));
    }

    @Override // org.apache.flink.connector.jdbc.xa.XaFacade
    public boolean isOpen() {
        return this.xaResource != null;
    }

    private List<Xid> recover(int i) throws XAException {
        return Arrays.asList(this.xaResource.recover(i));
    }

    private <T> T execute(Command<T> command) throws FlinkRuntimeException {
        Preconditions.checkState(isOpen(), "not connected");
        LOG.debug("{}, xid={}", ((Command) command).name, ((Command) command).xid);
        try {
            T t = (T) ((Command) command).callable.call();
            LOG.trace("{} succeeded , xid={}", ((Command) command).name, ((Command) command).xid);
            return t;
        } catch (FlinkRuntimeException e) {
            throw e;
        } catch (XAException e2) {
            if (HEUR_ERR_CODES.contains(Integer.valueOf(e2.errorCode))) {
                ((Command) command).xid.ifPresent(this::forget);
            }
            return (T) ((Optional) ((Command) command).recover.apply(e2)).orElseThrow(() -> {
                return wrapException(command.name, command.xid, e2);
            });
        } catch (Exception e3) {
            throw wrapException(((Command) command).name, ((Command) command).xid, e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FlinkRuntimeException wrapException(String str, Optional<Xid> optional, Exception exc) {
        if (!(exc instanceof XAException)) {
            throw new FlinkRuntimeException(formatErrorMessage(str, optional, Optional.empty(), exc.getMessage()), exc);
        }
        XAException xAException = (XAException) exc;
        if (TRANSIENT_ERR_CODES.contains(Integer.valueOf(xAException.errorCode))) {
            throw new XaFacade.TransientXaException(xAException);
        }
        throw new FlinkRuntimeException(formatErrorMessage(str, optional, Optional.of(Integer.valueOf(xAException.errorCode)), xAException.getMessage()));
    }

    private Optional<String> buildCommitErrorDesc(XAException xAException, boolean z) {
        return xAException.errorCode == 7 ? Optional.of("transaction was heuristically committed earlier") : (z && xAException.errorCode == -4) ? Optional.of("transaction is unknown to RM (ignoring)") : Optional.empty();
    }

    private Optional<String> buildRollbackErrorDesc(XAException xAException) {
        return xAException.errorCode == 6 ? Optional.of("transaction was already heuristically rolled back") : xAException.errorCode >= MAX_RECOVER_CALLS ? Optional.of("transaction was already marked for rollback") : Optional.empty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String formatErrorMessage(String str, Optional<Xid> optional, Optional<Integer> optional2, String... strArr) {
        Object[] objArr = new Object[4];
        objArr[0] = str;
        objArr[1] = optional.map(xid -> {
            return " XA transaction, xid: " + xid;
        }).orElse("");
        objArr[2] = optional2.map(num -> {
            return String.format(", error %d: %s", num, descError(num.intValue()));
        }).orElse("");
        objArr[3] = (strArr == null || strArr.length == 0) ? "" : ". " + Arrays.toString(strArr);
        return String.format("unable to %s%s%s%s", objArr);
    }

    private static String descError(int i) {
        switch (i) {
            case -9:
                return "the resource manager is doing work outside the global transaction";
            case -8:
                return "Xid given as an argument is already known to the resource manager";
            case -7:
                return "the resource manager has failed and is not available";
            case -6:
                return "protocol error";
            case -5:
                return "invalid arguments were passed";
            case -4:
                return "Xid is not valid";
            case -3:
                return "resource manager error has occurred";
            case -2:
                return "an asynchronous operation is outstanding";
            case JdbcExecutionOptions.DEFAULT_MAX_RETRY_TIMES /* 3 */:
                return "the transaction branch was read-only, and has already been committed";
            case 4:
                return "the method invoked returned without having any effect, and that it may be invoked again";
            case 5:
                return "heuristic mixed decision was made";
            case 6:
                return "heuristic rollback decision was made";
            case 7:
                return "heuristic commit decision was made";
            case 8:
                return "heuristic decision may have been made";
            case 9:
                return "the transaction resumption must happen where the suspension occurred";
            case MAX_RECOVER_CALLS /* 100 */:
                return "rollback happened for an unspecified reason";
            case 101:
                return "rollback happened due to a communications failure";
            case 102:
                return "rollback happened because deadlock was detected";
            case 103:
                return "rollback happened because an internal integrity check failed";
            case 104:
                return "rollback happened for some reason not fitting any of the other rollback error codes";
            case 105:
                return "rollback happened due to a protocol error in the resource manager";
            case 106:
                return "rollback happened because of a timeout";
            case 107:
                return "rollback happened due to a transient failure";
            default:
                return "";
        }
    }
}
