package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: KafkaClient.scala */
@ScalaSignature(bytes = "\u0006\u0001\tEc\u0001\u0002\u0010 \u0001!B\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\tA\u0002\u0011\t\u0011)A\u0006C\")Q\r\u0001C\u0001M\"9A\u000e\u0001b\u0001\n\u0003i\u0007B\u0002<\u0001A\u0003%a\u000eC\u0004x\u0001\t\u0007I\u0011\u0001=\t\u000f\u0005%\u0001\u0001)A\u0005s\"I\u00111\u0002\u0001C\u0002\u0013\u0005\u0011Q\u0002\u0005\t\u0003+\u0001\u0001\u0015!\u0003\u0002\u0010!I\u0011q\u0003\u0001C\u0002\u0013\u0005\u0011\u0011\u0004\u0005\t\u0003O\u0001\u0001\u0015!\u0003\u0002\u001c!Q\u0011\u0011\u0006\u0001\t\u0006\u0004%\t!a\u000b\t\u000f\u0005}\u0002\u0001\"\u0001\u0002B!9\u0011\u0011\n\u0001\u0005\u0002\u0005-\u0003bBA)\u0001\u0011\u0005\u00111\u000b\u0005\b\u0003G\u0002A\u0011AA3\u0011\u001d\t\t\n\u0001C\u0005\u0003'Cq!!.\u0001\t\u0013\t9\fC\u0004\u0002>\u0002!I!a0\t\u000f\u0005\r\u0007\u0001\"\u0001\u0002F\"9\u0011\u0011\u001b\u0001\u0005\u0002\u0005M\u0007bBAp\u0001\u0011%\u0011\u0011\u001d\u0005\b\u0003W\u0004A\u0011BAw\u0011\u001d\tI\u0010\u0001C\u0005\u0003wDq!a@\u0001\t\u0003\u0011\t\u0001C\u0004\u0003\u0006\u0001!\tAa\u0002\t\u000f\t=\u0001\u0001\"\u0001\u0003\u0012!9!q\b\u0001\u0005\u0002\t\u0005\u0003b\u0002B$\u0001\u0011\u0005!\u0011\n\u0002\f\u0017\u000647.Y\"mS\u0016tGO\u0003\u0002!C\u0005)1.\u00194lC*\u0011!eI\u0001\u0006kRLGn\u001d\u0006\u0003I\u0015\n\u0001b\u001d;be2\f7.\u001a\u0006\u0002M\u0005\u0011\u0011-[\u0002\u0001'\u0015\u0001\u0011fL\u001dF!\tQS&D\u0001,\u0015\u0005a\u0013!B:dC2\f\u0017B\u0001\u0018,\u0005\u0019\te.\u001f*fMB\u0011\u0001gN\u0007\u0002c)\u0011!gM\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003iU\n\u0001\u0002^=qKN\fg-\u001a\u0006\u0002m\u0005\u00191m\\7\n\u0005a\n$!D*ue&\u001cG\u000fT8hO&tw\r\u0005\u0002;\u00076\t1H\u0003\u0002={\u0005\u00191/\u001d7\u000b\u0005yz\u0014!B:qCJ\\'B\u0001!B\u0003\u0019\t\u0007/Y2iK*\t!)A\u0002pe\u001eL!\u0001R\u001e\u0003\u001d\u0011\u000bG/Y:fi2{wmZ5oOB\u0011aiS\u0007\u0002\u000f*\u0011\u0001*S\u0001\u0005Y\u0006twMC\u0001K\u0003\u0011Q\u0017M^1\n\u00051;%!D!vi>\u001cEn\\:fC\ndW-A\u0006lC\u001a\\\u0017mQ8oM&<\u0007CA(^\u001d\t\u0001&L\u0004\u0002R1:\u0011!k\u0016\b\u0003'Zk\u0011\u0001\u0016\u0006\u0003+\u001e\na\u0001\u0010:p_Rt\u0014\"\u0001\u0014\n\u0005\u0011*\u0013BA-$\u0003\u0019\u0019wN\u001c4jO&\u00111\fX\u0001\t'\u0016$H/\u001b8hg*\u0011\u0011lI\u0005\u0003=~\u00131bS1gW\u0006\u001cuN\u001c4jO*\u00111\fX\u0001\tg\u0016$H/\u001b8hgB\u0011!mY\u0007\u00029&\u0011A\r\u0018\u0002\t'\u0016$H/\u001b8hg\u00061A(\u001b8jiz\"\"aZ6\u0015\u0005!T\u0007CA5\u0001\u001b\u0005y\u0002\"\u00021\u0004\u0001\b\t\u0007\"B'\u0004\u0001\u0004q\u0015\u0001E2p[\u0016$xJ\u001a4tKR\u001cXj\u001c3f+\u0005q\u0007CA8u\u001b\u0005\u0001(BA9s\u0003\u0015iw\u000eZ3m\u0015\t\u00198%\u0001\u0004tG\",W.Y\u0005\u0003kB\u0014A!T8eK\u0006\t2m\\7fi>3gm]3ug6{G-\u001a\u0011\u0002\u001bM,'O^3s\u001fB$\u0018n\u001c8t+\u0005I\bC\u0002>\u007f\u0003\u0007\t\u0019A\u0004\u0002|yB\u00111kK\u0005\u0003{.\na\u0001\u0015:fI\u00164\u0017bA@\u0002\u0002\t\u0019Q*\u00199\u000b\u0005u\\\u0003c\u0001>\u0002\u0006%!\u0011qAA\u0001\u0005\u0019\u0019FO]5oO\u0006q1/\u001a:wKJ|\u0005\u000f^5p]N\u0004\u0013AE2p[\u0016$xJ\u001a4tKR\u001c8i\u001c8gS\u001e,\"!a\u0004\u0011\u0007=\u000b\t\"C\u0002\u0002\u0014}\u0013\u0001cS1gW\u0006$v\u000e]5d\u0007>tg-[4\u0002'\r|W.\u001a;PM\u001a\u001cX\r^:D_:4\u0017n\u001a\u0011\u0002/M,'O^3s\u001fB$\u0018n\u001c8t!J|\u0007/\u001a:uS\u0016\u001cXCAA\u000e!\u0011\ti\"a\t\u000e\u0005\u0005}!bAA\u0011\u0013\u0006!Q\u000f^5m\u0013\u0011\t)#a\b\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\rtKJ4XM](qi&|gn\u001d)s_B,'\u000f^5fg\u0002\naa\u00197jK:$XCAA\u0017!\u0011\ty#a\u000f\u000e\u0005\u0005E\"\u0002BA\u001a\u0003k\tQ!\u00193nS:TA!a\u000e\u0002:\u000591\r\\5f]R\u001c(B\u0001\u0011@\u0013\u0011\ti$!\r\u0003\u0017\u0005#W.\u001b8DY&,g\u000e^\u0001\u0006G2|7/\u001a\u000b\u0003\u0003\u0007\u00022AKA#\u0013\r\t9e\u000b\u0002\u0005+:LG/A\u0006eK2,G/\u001a+pa&\u001cG\u0003BA\"\u0003\u001bBq!a\u0014\u000f\u0001\u0004\t\u0019!A\u0005u_BL7MT1nK\u000692M]3bi\u0016$v\u000e]5d\u0013\u001atu\u000e\u001e)sKN,g\u000e\u001e\u000b\u0007\u0003\u0007\n)&a\u0018\t\u000f\u0005]s\u00021\u0001\u0002Z\u0005)Ao\u001c9jGB!\u0011qFA.\u0013\u0011\ti&!\r\u0003\u00119+w\u000fV8qS\u000eDa!!\u0019\u0010\u0001\u0004I\u0018\u0001B2p]\u001a\fq\u0002^8qS\u000e,e\u000eZ(gMN,Go\u001d\u000b\u0007\u0003O\nY)!$\u0011\r\u0005%\u00141OA=\u001d\u0011\tY'a\u001c\u000f\u0007M\u000bi'C\u0001-\u0013\r\t\thK\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t)(a\u001e\u0003\t1K7\u000f\u001e\u0006\u0004\u0003cZ\u0003c\u0002\u0016\u0002|\u0005}\u0014QQ\u0005\u0004\u0003{Z#A\u0002+va2,'\u0007E\u0002+\u0003\u0003K1!a!,\u0005\rIe\u000e\u001e\t\u0004U\u0005\u001d\u0015bAAEW\t!Aj\u001c8h\u0011\u001d\ty\u0005\u0005a\u0001\u0003\u0007Aa!a$\u0011\u0001\u0004I\u0018!D1dG\u0016\u001c8o\u00149uS>t7/A\tfqR\u0014\u0018m\u0019;QCJ$\u0018\u000e^5p]N$b!!&\u0002&\u0006\u001d\u0006CBAL\u0003g\nIJD\u0002+\u0003_\u0002B!a'\u0002\"6\u0011\u0011Q\u0014\u0006\u0005\u0003?\u000bI$\u0001\u0004d_6lwN\\\u0005\u0005\u0003G\u000biJ\u0001\bU_BL7\rU1si&$\u0018n\u001c8\t\u000f\u0005=\u0013\u00031\u0001\u0002\u0004!9\u0011\u0011V\tA\u0002\u0005-\u0016\u0001C2p]N,X.\u001a:\u0011\u0011\u00055\u0016\u0011WA\u0002\u0003\u0007i!!a,\u000b\t\u0005%\u0016QG\u0005\u0005\u0003g\u000byKA\u0007LC\u001a\\\u0017mQ8ogVlWM]\u0001\f]\u0016<8i\u001c8tk6,'\u000f\u0006\u0004\u0002,\u0006e\u00161\u0018\u0005\b\u0003\u001f\u0012\u0002\u0019AA\u0002\u0011\u0019\tyI\u0005a\u0001s\u0006Q!-^5mIB\u0013x\u000e]:\u0015\t\u0005m\u0011\u0011\u0019\u0005\u0007\u0003\u001f\u001b\u0002\u0019A=\u0002!Q|\u0007/[2TCZ,wJ\u001a4tKR\u001cH\u0003CA\"\u0003\u000f\fY-!4\t\u000f\u0005%G\u00031\u0001\u0002\u0004\u0005yAo\u001c9jG\u000e{gNZ5h\u001d\u0006lW\r\u0003\u0004\u0002\u0010R\u0001\r!\u001f\u0005\b\u0003\u001f$\u0002\u0019AA4\u0003\u001dygMZ:fiN\fA#\u00193nS:$v\u000e]5d!\u0006\u0014H/\u001b;j_:\u001cH\u0003BAk\u0003;\u0004b!!\u001b\u0002t\u0005]\u0007\u0003BAN\u00033LA!a7\u0002\u001e\n\u0011Bk\u001c9jGB\u000b'\u000f^5uS>t\u0017J\u001c4p\u0011\u001d\ty%\u0006a\u0001\u0003\u0007\tQ\u0004^8qS\u000e\u001cUO\u001d:f]R|eMZ:fiN4%o\\7TiJ,\u0017-\u001c\u000b\u0005\u0003G\fI\u000fE\u0003+\u0003K\f9'C\u0002\u0002h.\u0012aa\u00149uS>t\u0007bBAe-\u0001\u0007\u00111A\u0001\u0011G>lW\r^(gMN,Go\u001d'pG.$B!a<\u0002xB!\u0011\u0011_Az\u001b\u0005\t\u0013bAA{C\tAa)\u001b7f\u0019>\u001c7\u000eC\u0004\u0002J^\u0001\r!a\u0001\u00027Q|\u0007/[2DkJ\u0014XM\u001c;PM\u001a\u001cX\r^:Ge>lg)\u001b7f)\u0011\t\u0019/!@\t\u000f\u0005%\u0007\u00041\u0001\u0002\u0004\u0005\u0019Bo\u001c9jG\u000e+(O]3oi>3gm]3ugR!\u00111\u001dB\u0002\u0011\u001d\tI-\u0007a\u0001\u0003\u0007\tQb\u001c4gg\u0016$8/Q:Kg>tGC\u0002B\u0005\u0005\u0017\u0011i\u0001E\u0003+\u0003K\f\u0019\u0001C\u0004\u0002Pi\u0001\r!a\u0001\t\u000f\u0005='\u00041\u0001\u0002h\u0005\t2m\u001c8tk6,Gk\u001c9jG\n\u000bGo\u00195\u0015\u0011\tM!\u0011\u0007B\u001a\u0005{\u0001rAKA>\u0005+\t9\u0007\u0005\u0003\u0003\u0018\t-b\u0002\u0002B\r\u0005SqAAa\u0007\u0003(9!!Q\u0004B\u0013\u001d\u0011\u0011yBa\t\u000f\u0007M\u0013\t#C\u0001C\u0013\t\u0001\u0015)\u0003\u0002?\u007f%\u0011A(P\u0005\u0004\u0003cZ\u0014\u0002\u0002B\u0017\u0005_\u0011\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0007\u0005E4\bC\u0004\u0002Jn\u0001\r!a\u0001\t\u000f\tU2\u00041\u0001\u00038\u000591/Z:tS>t\u0007c\u0001\u001e\u0003:%\u0019!1H\u001e\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\t\re[\u0002\u0019AA\b\u0003U\u0019wN\\:v[\u0016$v\u000e]5d'R\u0014X-Y7j]\u001e$bA!\u0006\u0003D\t\u0015\u0003b\u0002B\u001b9\u0001\u0007!q\u0007\u0005\u00073r\u0001\r!a\u0004\u0002\u0017MLgn\u001b+p)>\u0004\u0018n\u0019\u000b\u0007\u0003\u0007\u0012YE!\u0014\t\rek\u0002\u0019AA\b\u0011\u001d\u0011y%\ba\u0001\u0005+\t!\u0001\u001a4")
/* loaded from: input_file:ai/starlake/utils/kafka/KafkaClient.class */
public class KafkaClient implements StrictLogging, DatasetLogging, AutoCloseable {
    private AdminClient client;
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties serverOptionsProperties;
    private final Logger logger;
    private volatile boolean bitmap$0;

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties serverOptionsProperties() {
        return this.serverOptionsProperties;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [ai.starlake.utils.kafka.KafkaClient] */
    private AdminClient client$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create(serverOptionsProperties());
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.client;
    }

    public AdminClient client() {
        return !this.bitmap$0 ? client$lzycompute() : this.client;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        client().close();
    }

    public void deleteTopic(String str) {
        boolean contains = ((Set) client().listTopics().names().get()).contains(str);
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(((TraversableOnce) JavaConverters$.MODULE$.asScalaSetConverter((Set) client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        if (contains) {
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("Deleting topic {}", new Object[]{str});
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter(new $colon.colon(str, Nil$.MODULE$)).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic newTopic, Map<String, String> map) {
        if (((Set) client().listTopics().names().get()).contains(newTopic.name())) {
            return;
        }
        client().createTopics(Collections.singleton(newTopic.configs((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava()))).all().get();
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String str, Map<String, String> map) {
        Failure apply = Try$.MODULE$.apply(() -> {
            KafkaConsumer<String, String> newConsumer = this.newConsumer(str, map);
            List<TopicPartition> extractPartitions = this.extractPartitions(str, newConsumer);
            newConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(extractPartitions).asJava());
            newConsumer.seekToEnd((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(extractPartitions).asJava());
            List list = (List) extractPartitions.map(topicPartition -> {
                return new Tuple2.mcIJ.sp(topicPartition.partition(), newConsumer.position(topicPartition));
            }, List$.MODULE$.canBuildFrom());
            newConsumer.close();
            return list;
        });
        if (apply instanceof Failure) {
            Throwable exception = apply.exception();
            exception.printStackTrace();
            throw exception;
        }
        if (apply instanceof Success) {
            return (List) ((Success) apply).value();
        }
        throw new MatchError(apply);
    }

    private List<TopicPartition> extractPartitions(String str, KafkaConsumer<String, String> kafkaConsumer) {
        return ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(kafkaConsumer.partitionsFor(str)).asScala()).map(partitionInfo -> {
            return new TopicPartition(str, partitionInfo.partition());
        }, Buffer$.MODULE$.canBuildFrom())).toList();
    }

    private KafkaConsumer<String, String> newConsumer(String str, Map<String, String> map) {
        Properties buildProps = buildProps(map);
        if (logger().underlying().isInfoEnabled()) {
            if (logger().underlying().isInfoEnabled()) {
                logger().underlying().info("access options for topic {} ==>", new Object[]{str});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            ((IterableLike) JavaConverters$.MODULE$.propertiesAsScalaMapConverter(buildProps).asScala()).foreach(tuple2 -> {
                $anonfun$newConsumer$1(this, tuple2);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        return new KafkaConsumer<>(buildProps);
    }

    private Properties buildProps(Map<String, String> map) {
        Properties properties = new Properties();
        map.foreach(tuple2 -> {
            return properties.put(tuple2._1(), tuple2._2());
        });
        return properties;
    }

    public void topicSaveOffsets(String str, Map<String, String> map, List<Tuple2<Object, Object>> list) {
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            KafkaProducer kafkaProducer = new KafkaProducer(buildProps(map));
            list.foreach(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                return kafkaProducer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), new StringBuilder(1).append(str).append("/").append(tuple2._1$mcI$sp()).toString(), String.valueOf(BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()))));
            });
            kafkaProducer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
            throw new Exception("Should never happen");
        }
        FileLock cometOffsetsLock = cometOffsetsLock(str);
    }

    public List<TopicPartitionInfo> adminTopicPartitions(String str) {
        return ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription) ((java.util.Map) client().describeTopics(Collections.singleton(str)).allTopicNames().get()).get(str)).partitions()).asScala()).toList();
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String str) {
        Properties properties = new Properties();
        cometOffsetsConfig().allAccessOptions(this.settings.comet().kafka().sparkServerOptions()).foreach(tuple2 -> {
            return properties.put(tuple2._1(), tuple2._2());
        });
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        List<TopicPartition> extractPartitions = extractPartitions(cometOffsetsConfig().topicName(), kafkaConsumer);
        kafkaConsumer.assign((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(extractPartitions).asJava());
        kafkaConsumer.seekToBeginning((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(extractPartitions).asJava());
        scala.collection.mutable.Map empty = Map$.MODULE$.empty();
        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(100L));
        while (true) {
            ConsumerRecords consumerRecords = poll;
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                break;
            }
            ((IterableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(consumerRecords.records(cometOffsetsConfig().topicName())).asScala()).foreach(consumerRecord -> {
                return empty.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(consumerRecord.key()), consumerRecord.value()));
            });
            poll = kafkaConsumer.poll(Duration.ofMillis(100L));
        }
        return ((TraversableLike) empty.keys().map(str2 -> {
            String[] split = new StringOps(Predef$.MODULE$.augmentString(str2)).split('/');
            return new Tuple3(split[0], split[1], empty.apply(str2));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy(tuple3 -> {
            if (tuple3 != null) {
                return (String) tuple3._1();
            }
            throw new MatchError(tuple3);
        }).mapValues(iterable -> {
            return ((TraversableOnce) iterable.map(tuple32 -> {
                if (tuple32 == null) {
                    throw new MatchError(tuple32);
                }
                return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString((String) tuple32._2())).toInt(), new StringOps(Predef$.MODULE$.augmentString((String) tuple32._3())).toLong());
            }, Iterable$.MODULE$.canBuildFrom())).toList();
        }).get(str);
    }

    private FileLock cometOffsetsLock(String str) {
        return new FileLock(new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(str).append(".lock").toString()), this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String str) {
        FileLock cometOffsetsLock = cometOffsetsLock(str);
        return (Option) cometOffsetsLock.doExclusively(cometOffsetsLock.doExclusively$default$1(), () -> {
            Path path = new Path(this.cometOffsetsConfig().topicName(), str);
            if (this.settings.storageHandler().exists(path)) {
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{path});
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                return new Some((List) ((List) YamlSerializer$.MODULE$.mapper().readValue(this.settings.storageHandler().read(path), List.class)).map(str2 -> {
                    String[] split = new StringOps(Predef$.MODULE$.augmentString(str2)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(split[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(split[1])).toLong());
                }, List$.MODULE$.canBuildFrom()));
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{path});
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            return None$.MODULE$;
        });
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String str) {
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            return topicCurrentOffsetsFromStream(str);
        }
        if (Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
            return topicCurrentOffsetsFromFile(str);
        }
        throw new Exception("Should never happen");
    }

    public Option<String> offsetsAsJson(String str, List<Tuple2<Object, Object>> list) {
        if (list.isEmpty()) {
            return None$.MODULE$;
        }
        return new Some(new StringBuilder(7).append("{\"").append(str).append("\":{").append(((TraversableOnce) list.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int _1$mcI$sp = tuple2._1$mcI$sp();
            return new StringBuilder(4).append("\"").append(_1$mcI$sp).append("\": ").append(tuple2._2$mcJ$sp()).toString();
        }, List$.MODULE$.canBuildFrom())).mkString(",")).append("}}").toString());
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String str, SparkSession sparkSession, Settings.KafkaTopicConfig kafkaTopicConfig) {
        long j = -2;
        List<Tuple2<Object, Object>> list = (List) topicCurrentOffsets(str).getOrElse(() -> {
            KafkaConsumer<String, String> newConsumer = this.newConsumer(kafkaTopicConfig.topicName(), kafkaTopicConfig.allAccessOptions(this.settings.comet().kafka().sparkServerOptions()));
            List list2 = (List) this.extractPartitions(kafkaTopicConfig.topicName(), newConsumer).map(topicPartition -> {
                return new Tuple2.mcIJ.sp(topicPartition.partition(), j);
            }, List$.MODULE$.canBuildFrom());
            newConsumer.close();
            return list2;
        });
        if (logger().underlying().isInfoEnabled()) {
            list.foreach(tuple2 -> {
                $anonfun$consumeTopicBatch$3(this, str, tuple2);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> list2 = topicEndOffsets(kafkaTopicConfig.topicName(), kafkaTopicConfig.allAccessOptions(this.settings.comet().kafka().sparkServerOptions()));
        if (logger().underlying().isInfoEnabled()) {
            list2.foreach(tuple22 -> {
                $anonfun$consumeTopicBatch$4(this, str, tuple22);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Map $plus$plus = kafkaTopicConfig.allAccessOptions(this.settings.comet().kafka().sparkServerOptions()).$plus$plus(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("startingOffsets"), offsetsAsJson(kafkaTopicConfig.topicName(), list).getOrElse(() -> {
            return "earliest";
        })), new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("endingOffsets"), offsetsAsJson(kafkaTopicConfig.topicName(), list2).getOrElse(() -> {
            return "latest";
        })), Nil$.MODULE$)));
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(new StringBuilder(24).append("withOffsetsTopicOptions:").append($plus$plus.toString()).toString());
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        }
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(new StringBuilder(40).append("settings.comet.kafka.sparkServerOptions:").append(this.settings.comet().kafka().sparkServerOptions().toString()).toString());
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
        }
        Dataset selectExpr = sparkSession.read().format("kafka").options($plus$plus).options(this.settings.comet().kafka().sparkServerOptions()).load().selectExpr(kafkaTopicConfig.fields());
        if (!logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
        } else if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(DatasetHelper(selectExpr).schemaString());
            BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
        }
        return new Tuple2<>(selectExpr, list2);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession sparkSession, Settings.KafkaTopicConfig kafkaTopicConfig) {
        return sparkSession.readStream().format("kafka").options(kafkaTopicConfig.allAccessOptions(this.settings.comet().kafka().sparkServerOptions())).load().selectExpr(kafkaTopicConfig.fields());
    }

    public void sinkToTopic(Settings.KafkaTopicConfig kafkaTopicConfig, Dataset<Row> dataset) {
        dataset.selectExpr(kafkaTopicConfig.fields()).write().format("kafka").options(kafkaTopicConfig.allAccessOptions(this.settings.comet().kafka().sparkServerOptions())).option("topic", kafkaTopicConfig.topicName()).save();
    }

    public static final /* synthetic */ void $anonfun$newConsumer$1(KafkaClient kafkaClient, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String str = (String) tuple2._1();
        String str2 = (String) tuple2._2();
        if (!kafkaClient.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            kafkaClient.logger().underlying().info("\t{}={}", new Object[]{str, str2});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$3(KafkaClient kafkaClient, String str, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        if (!kafkaClient.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            kafkaClient.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{str, BoxesRunTime.boxToInteger(_1$mcI$sp), BoxesRunTime.boxToLong(_2$mcJ$sp)});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$4(KafkaClient kafkaClient, String str, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        if (!kafkaClient.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            kafkaClient.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{str, BoxesRunTime.boxToInteger(_1$mcI$sp), BoxesRunTime.boxToLong(_2$mcJ$sp)});
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        this.cometOffsetsMode = (Mode) settings.comet().kafka().cometOffsetsMode().map(str -> {
            return Mode$.MODULE$.fromString(str);
        }).getOrElse(() -> {
            return Mode$STREAM$.MODULE$;
        });
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig) kafkaConfig.topics().apply("comet_offsets");
        this.serverOptionsProperties = new Properties();
        serverOptions().foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return this.serverOptionsProperties().put((String) tuple2._1(), (String) tuple2._2());
        });
        Mode cometOffsetsMode = cometOffsetsMode();
        if (Mode$STREAM$.MODULE$.equals(cometOffsetsMode)) {
            createTopicIfNotPresent(new NewTopic(cometOffsetsConfig().topicName(), cometOffsetsConfig().partitions(), cometOffsetsConfig().replicationFactor()), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cleanup.policy"), "compact")})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!Mode$FILE$.MODULE$.equals(cometOffsetsMode)) {
                throw new Exception("Should never happen");
            }
            if (settings.storageHandler().exists(new Path(cometOffsetsConfig().topicName()))) {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else {
                BoxesRunTime.boxToBoolean(settings.storageHandler().mkdirs(new Path(cometOffsetsConfig().topicName())));
            }
        }
    }
}
