package org.apache.spark.sql.mlsql.sources;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap$;
import org.apache.spark.sql.execution.streaming.LongOffset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinlogOffset;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.ExecutorBinlogServer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.ReportBinlogSocketServerHostAndPort;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.StreamSourceProvider;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$Int$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import tech.mlsql.common.utils.hdfs.HDFSOperator$;
import tech.mlsql.common.utils.path.PathFun$;

/* compiled from: MLSQLBinLogDataSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\t4A!\u0001\u0002\u0001\u001f\t)R\nT*R\u0019\nKg\u000eT8h\t\u0006$\u0018mU8ve\u000e,'BA\u0002\u0005\u0003\u001d\u0019x.\u001e:dKNT!!\u0002\u0004\u0002\u000b5d7/\u001d7\u000b\u0005\u001dA\u0011aA:rY*\u0011\u0011BC\u0001\u0006gB\f'o\u001b\u0006\u0003\u00171\ta!\u00199bG\",'\"A\u0007\u0002\u0007=\u0014xm\u0001\u0001\u0014\u000b\u0001\u0001bc\u0007\u0010\u0011\u0005E!R\"\u0001\n\u000b\u0003M\tQa]2bY\u0006L!!\u0006\n\u0003\r\u0005s\u0017PU3g!\t9\u0012$D\u0001\u0019\u0015\t\u0019a!\u0003\u0002\u001b1\t!2\u000b\u001e:fC6\u001cv.\u001e:dKB\u0013xN^5eKJ\u0004\"a\u0006\u000f\n\u0005uA\"A\u0005#bi\u0006\u001cv.\u001e:dKJ+w-[:uKJ\u0004\"a\b\u0012\u000e\u0003\u0001R!!\t\u0005\u0002\u0011%tG/\u001a:oC2L!a\t\u0011\u0003\u000f1{wmZ5oO\")Q\u0005\u0001C\u0001M\u00051A(\u001b8jiz\"\u0012a\n\t\u0003Q\u0001i\u0011A\u0001\u0005\u0006U\u0001!\teK\u0001\rg>,(oY3TG\",W.\u0019\u000b\u0006Yq\u0012u)\u0013\t\u0005#5zc'\u0003\u0002/%\t1A+\u001e9mKJ\u0002\"\u0001M\u001a\u000f\u0005E\t\u0014B\u0001\u001a\u0013\u0003\u0019\u0001&/\u001a3fM&\u0011A'\u000e\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005I\u0012\u0002CA\u001c;\u001b\u0005A$BA\u001d\u0007\u0003\u0015!\u0018\u0010]3t\u0013\tY\u0004H\u0001\u0006TiJ,8\r\u001e+za\u0016DQ!P\u0015A\u0002y\n!b]9m\u0007>tG/\u001a=u!\ty\u0004)D\u0001\u0007\u0013\t\teA\u0001\u0006T#2\u001buN\u001c;fqRDQaQ\u0015A\u0002\u0011\u000baa]2iK6\f\u0007cA\tFm%\u0011aI\u0005\u0002\u0007\u001fB$\u0018n\u001c8\t\u000b!K\u0003\u0019A\u0018\u0002\u0019A\u0014xN^5eKJt\u0015-\\3\t\u000b)K\u0003\u0019A&\u0002\u0015A\f'/Y7fi\u0016\u00148\u000f\u0005\u00031\u0019>z\u0013BA'6\u0005\ri\u0015\r\u001d\u0005\u0006\u001f\u0002!\t\u0005U\u0001\rGJ,\u0017\r^3T_V\u00148-\u001a\u000b\u0007#fSF,\u00180\u0011\u0005I;V\"A*\u000b\u0005Q+\u0016!C:ue\u0016\fW.\u001b8h\u0015\t1f!A\u0005fq\u0016\u001cW\u000f^5p]&\u0011\u0001l\u0015\u0002\u0007'>,(oY3\t\u000bur\u0005\u0019\u0001 \t\u000bms\u0005\u0019A\u0018\u0002\u00195,G/\u00193bi\u0006\u0004\u0016\r\u001e5\t\u000b\rs\u0005\u0019\u0001#\t\u000b!s\u0005\u0019A\u0018\t\u000b)s\u0005\u0019A&\t\u000b\u0001\u0004A\u0011I1\u0002\u0013MDwN\u001d;OC6,G#A\u0018")
/* loaded from: input_file:org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource.class */
public class MLSQLBinLogDataSource implements StreamSourceProvider, DataSourceRegister, Logging {
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    public Tuple2<String, StructType> sourceSchema(SQLContext sQLContext, Option<StructType> option, String str, Map<String, String> map) {
        Predef$.MODULE$.require(option.isEmpty(), new MLSQLBinLogDataSource$$anonfun$sourceSchema$1(this));
        return new Tuple2<>(shortName(), StructType$.MODULE$.apply(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new StructField[]{new StructField("value", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())}))));
    }

    /* JADX WARN: Type inference failed for: r0v118, types: [org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4] */
    public Source createSource(SQLContext sQLContext, String str, Option<StructType> option, String str2, Map<String, String> map) {
        Option option2;
        Option option3;
        Some some;
        final String mkString;
        final SparkSession sparkSession = sQLContext.sparkSession();
        final String sessionLocalTimeZone = sparkSession.sessionState().conf().sessionLocalTimeZone();
        final String str3 = (String) map.apply("host");
        final int i = new StringOps(Predef$.MODULE$.augmentString((String) map.apply("port"))).toInt();
        final String str4 = (String) map.apply("userName");
        final String str5 = (String) map.apply("password");
        Option option4 = map.get("bingLogNamePrefix");
        final Option option5 = map.get("databaseNamePattern");
        final Option option6 = map.get("tableNamePattern");
        Some some2 = map.get("startingOffsets");
        if (some2 instanceof Some) {
            option3 = Option$.MODULE$.apply((String) some2.x());
        } else {
            if (!None$.MODULE$.equals(some2)) {
                throw new MatchError(some2);
            }
            Tuple2 tuple2 = new Tuple2(map.get("binlogIndex"), map.get("binlogFileOffset"));
            if (tuple2 != null) {
                Some some3 = (Option) tuple2._1();
                Some some4 = (Option) tuple2._2();
                if (some3 instanceof Some) {
                    String str6 = (String) some3.x();
                    if (some4 instanceof Some) {
                        option2 = Option$.MODULE$.apply(BoxesRunTime.boxToLong(new BinlogOffset(new StringOps(Predef$.MODULE$.augmentString(str6)).toLong(), new StringOps(Predef$.MODULE$.augmentString((String) some4.x())).toLong()).offset()).toString());
                        option3 = option2;
                    }
                }
            }
            if (tuple2 != null) {
                Some some5 = (Option) tuple2._1();
                Option option7 = (Option) tuple2._2();
                if (some5 instanceof Some) {
                    String str7 = (String) some5.x();
                    if (None$.MODULE$.equals(option7)) {
                        option2 = Option$.MODULE$.apply(BoxesRunTime.boxToLong(new BinlogOffset(new StringOps(Predef$.MODULE$.augmentString(str7)).toLong(), 4L).offset()).toString());
                        option3 = option2;
                    }
                }
            }
            option2 = None$.MODULE$;
            option3 = option2;
        }
        Some map2 = option3.map(new MLSQLBinLogDataSource$$anonfun$2(this));
        map.get("startingOffsets").map(new MLSQLBinLogDataSource$$anonfun$createSource$1(this));
        try {
            if (map2 instanceof Some) {
                Predef$.MODULE$.assert(BoxesRunTime.boxToLong(((LongOffset) map2.x()).offset()).toString().length() >= 14, new MLSQLBinLogDataSource$$anonfun$createSource$2(this));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!None$.MODULE$.equals(map2)) {
                    throw new MatchError(map2);
                }
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            mkString = Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(new StringOps(Predef$.MODULE$.augmentString(str)).stripSuffix("/").split("/")).dropRight(2)).mkString("/");
            some = Option$.MODULE$.apply(new LongOffset(getOffsetFromCk$1(mkString)));
        } catch (Exception e) {
            logError(new MLSQLBinLogDataSource$$anonfun$6(this, e), e);
            some = None$.MODULE$;
        }
        Some some6 = some;
        Some some7 = some6.isDefined() ? some6 : map2;
        Predef$.MODULE$.assert(some7.isDefined() == option4.isDefined(), new MLSQLBinLogDataSource$$anonfun$createSource$3(this));
        Option map3 = some7.map(new MLSQLBinLogDataSource$$anonfun$7(this));
        final Option map4 = map3.map(new MLSQLBinLogDataSource$$anonfun$8(this, option4));
        final Option map5 = map3.map(new MLSQLBinLogDataSource$$anonfun$9(this));
        AtomicReference atomicReference = new AtomicReference();
        TempSocketServerInDriver tempSocketServerInDriver = new TempSocketServerInDriver(atomicReference);
        final String host = tempSocketServerInDriver.host();
        final int port = tempSocketServerInDriver.port();
        final long j = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("maxBinlogQueueSize", new MLSQLBinLogDataSource$$anonfun$10(this)))).toLong();
        final String uuid = UUID.randomUUID().toString();
        Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();
        ((IterableLike) map.filter(new MLSQLBinLogDataSource$$anonfun$createSource$4(this))).foreach(new MLSQLBinLogDataSource$$anonfun$createSource$5(this, hadoopConfiguration));
        final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(hadoopConfiguration);
        final CaseInsensitiveMap apply = CaseInsensitiveMap$.MODULE$.apply(((Map) ((TraversableLike) map.filter(new MLSQLBinLogDataSource$$anonfun$11(this))).map(new MLSQLBinLogDataSource$$anonfun$12(this), Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        new Thread(this, sparkSession, sessionLocalTimeZone, str3, i, str4, str5, option5, option6, mkString, map4, map5, host, port, j, uuid, serializableConfiguration, apply) { // from class: org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4
            private final /* synthetic */ MLSQLBinLogDataSource $outer;
            private final SparkSession spark$1;
            private final String timezoneID$1;
            private final String bingLogHost$1;
            private final int bingLogPort$1;
            private final String bingLogUserName$1;
            private final String bingLogPassword$1;
            private final Option databaseNamePattern$1;
            private final Option tableNamePattern$1;
            private final String checkPointDir$1;
            private final Option binlogFilename$1;
            private final Option binlogPos$1;
            private final String tempSocketServerHost$1;
            private final int tempSocketServerPort$1;
            private final long maxBinlogQueueSize$1;
            private final String binlogServerId$1;
            private final SerializableConfiguration broadcastedHadoopConf$1;
            private final CaseInsensitiveMap binaryLogClientParameters$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                this.$outer.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(this.spark$1, this.timezoneID$1, this.bingLogHost$1, this.bingLogPort$1, this.bingLogUserName$1, this.bingLogPassword$1, this.databaseNamePattern$1, this.tableNamePattern$1, this.checkPointDir$1, this.binlogFilename$1, this.binlogPos$1, this.tempSocketServerHost$1, this.tempSocketServerPort$1, this.maxBinlogQueueSize$1, this.binlogServerId$1, this.broadcastedHadoopConf$1, this.binaryLogClientParameters$1);
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("launch-binlog-socket-server-in-spark-job");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.spark$1 = sparkSession;
                this.timezoneID$1 = sessionLocalTimeZone;
                this.bingLogHost$1 = str3;
                this.bingLogPort$1 = i;
                this.bingLogUserName$1 = str4;
                this.bingLogPassword$1 = str5;
                this.databaseNamePattern$1 = option5;
                this.tableNamePattern$1 = option6;
                this.checkPointDir$1 = mkString;
                this.binlogFilename$1 = map4;
                this.binlogPos$1 = map5;
                this.tempSocketServerHost$1 = host;
                this.tempSocketServerPort$1 = port;
                this.maxBinlogQueueSize$1 = j;
                this.binlogServerId$1 = uuid;
                this.broadcastedHadoopConf$1 = serializableConfiguration;
                this.binaryLogClientParameters$1 = apply;
                setDaemon(true);
            }
        }.start();
        int i2 = 60;
        while (atomicReference.get() == null) {
            Thread.sleep(1000L);
            i2--;
        }
        if (atomicReference.get() == null) {
            throw new RuntimeException("start BinLogSocketServerInExecutor fail");
        }
        ReportBinlogSocketServerHostAndPort reportBinlogSocketServerHostAndPort = (ReportBinlogSocketServerHostAndPort) atomicReference.get();
        return new MLSQLBinLogSource(new ExecutorBinlogServer(reportBinlogSocketServerHostAndPort.host(), reportBinlogSocketServerHostAndPort.port()), sQLContext.sparkSession(), str, some7, map.$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("binlogServerId"), uuid)}))));
    }

    public String shortName() {
        return "mysql-binglog";
    }

    private final long getOffsetFromCk$1(String str) {
        return new StringOps(Predef$.MODULE$.augmentString((String) Predef$.MODULE$.refArrayOps(HDFSOperator$.MODULE$.readFile(((Path) ((Tuple2) ((TraversableLike) ((SeqLike) ((TraversableLike) HDFSOperator$.MODULE$.listFiles(PathFun$.MODULE$.apply(str).add("offsets").toPath()).filterNot(new MLSQLBinLogDataSource$$anonfun$3(this))).map(new MLSQLBinLogDataSource$$anonfun$4(this), Seq$.MODULE$.canBuildFrom())).sortBy(new MLSQLBinLogDataSource$$anonfun$5(this), Ordering$Int$.MODULE$)).last())._2()).toString()).split("\n")).last())).toLong();
    }

    public final ExecutorBinlogServer[] org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(SparkSession sparkSession, String str, String str2, int i, String str3, String str4, Option option, Option option2, String str5, Option option3, Option option4, String str6, int i2, long j, String str7, SerializableConfiguration serializableConfiguration, CaseInsensitiveMap caseInsensitiveMap) {
        sparkSession.sparkContext().setJobGroup(str7, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"binlog server (", ":", ")"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2, BoxesRunTime.boxToInteger(i)})), true);
        return (ExecutorBinlogServer[]) sparkSession.sparkContext().parallelize(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"launch-binlog-socket-server"})), 1, ClassTag$.MODULE$.apply(String.class)).map(new MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1(this, str, str2, i, str3, str4, option, option2, str5, option3, option4, str6, i2, j, serializableConfiguration, caseInsensitiveMap), ClassTag$.MODULE$.apply(ExecutorBinlogServer.class)).collect();
    }

    public MLSQLBinLogDataSource() {
        Logging.class.$init$(this);
    }
}
