package com.snowplowanalytics.snowplow.sources.kafka;

import cats.Applicative$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Sync;
import cats.effect.package$;
import cats.implicits$;
import cats.kernel.Eq;
import cats.kernel.Monoid;
import cats.kernel.Semigroup;
import cats.syntax.ParallelTraversableOps1$;
import com.snowplowanalytics.snowplow.sources.internal.Checkpointer;
import com.snowplowanalytics.snowplow.sources.internal.LowLevelEvents;
import com.snowplowanalytics.snowplow.sources.internal.LowLevelSource;
import com.snowplowanalytics.snowplow.sources.internal.LowLevelSource$;
import com.snowplowanalytics.snowplow.sources.kafka.KafkaSource;
import fs2.Chunk;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$NestedStreamOps$;
import fs2.compat.NotGiven$;
import fs2.kafka.CommittableConsumerRecord;
import fs2.kafka.ConsumerSettings;
import fs2.kafka.ConsumerSettings$;
import fs2.kafka.GenericDeserializer;
import fs2.kafka.GenericDeserializer$;
import fs2.kafka.KafkaConsumer$;
import fs2.kafka.Timestamp;
import fs2.kafka.Value;
import fs2.kafka.consumer.MkConsumer$;
import java.nio.ByteBuffer;
import java.time.Instant;
import org.apache.kafka.common.TopicPartition;
import org.typelevel.log4cats.Logger$;
import org.typelevel.log4cats.SelfAwareStructuredLogger;
import org.typelevel.log4cats.slf4j.Slf4jLogger$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaSource.scala */
/* loaded from: input_file:com/snowplowanalytics/snowplow/sources/kafka/KafkaSource$.class */
public final class KafkaSource$ {
    public static KafkaSource$ MODULE$;

    static {
        new KafkaSource$();
    }

    private <F> SelfAwareStructuredLogger<F> logger(Sync<F> sync) {
        return Slf4jLogger$.MODULE$.getLogger(sync, "com.snowplowanalytics.snowplow.sources.kafka.KafkaSource");
    }

    public <F> F build(KafkaSourceConfig kafkaSourceConfig, Async<F> async) {
        return (F) LowLevelSource$.MODULE$.toSourceAndAck(lowLevel(kafkaSourceConfig, async), async);
    }

    private <F> LowLevelSource<F, KafkaSource.KafkaCheckpoints<F>> lowLevel(final KafkaSourceConfig kafkaSourceConfig, final Async<F> async) {
        return new LowLevelSource<F, KafkaSource.KafkaCheckpoints<F>>(async, kafkaSourceConfig) { // from class: com.snowplowanalytics.snowplow.sources.kafka.KafkaSource$$anon$1
            private final Async evidence$3$1;
            private final KafkaSourceConfig config$1;

            public Checkpointer<F, KafkaSource.KafkaCheckpoints<F>> checkpointer() {
                return KafkaSource$.MODULE$.com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$kafkaCheckpointer(this.evidence$3$1);
            }

            public Stream<F, Stream<F, LowLevelEvents<KafkaSource.KafkaCheckpoints<F>>>> stream() {
                return KafkaSource$.MODULE$.com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$kafkaStream(this.config$1, this.evidence$3$1);
            }

            {
                this.evidence$3$1 = async;
                this.config$1 = kafkaSourceConfig;
            }
        };
    }

    public <F> Semigroup<KafkaSource.OffsetAndCommit<F>> com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$offsetAndCommitSemigroup() {
        return new Semigroup<KafkaSource.OffsetAndCommit<F>>() { // from class: com.snowplowanalytics.snowplow.sources.kafka.KafkaSource$$anon$2
            public double combine$mcD$sp(double d, double d2) {
                return Semigroup.combine$mcD$sp$(this, d, d2);
            }

            public float combine$mcF$sp(float f, float f2) {
                return Semigroup.combine$mcF$sp$(this, f, f2);
            }

            public int combine$mcI$sp(int i, int i2) {
                return Semigroup.combine$mcI$sp$(this, i, i2);
            }

            public long combine$mcJ$sp(long j, long j2) {
                return Semigroup.combine$mcJ$sp$(this, j, j2);
            }

            public Object combineN(Object obj, int i) {
                return Semigroup.combineN$(this, obj, i);
            }

            public double combineN$mcD$sp(double d, int i) {
                return Semigroup.combineN$mcD$sp$(this, d, i);
            }

            public float combineN$mcF$sp(float f, int i) {
                return Semigroup.combineN$mcF$sp$(this, f, i);
            }

            public int combineN$mcI$sp(int i, int i2) {
                return Semigroup.combineN$mcI$sp$(this, i, i2);
            }

            public long combineN$mcJ$sp(long j, int i) {
                return Semigroup.combineN$mcJ$sp$(this, j, i);
            }

            public Object repeatedCombineN(Object obj, int i) {
                return Semigroup.repeatedCombineN$(this, obj, i);
            }

            public double repeatedCombineN$mcD$sp(double d, int i) {
                return Semigroup.repeatedCombineN$mcD$sp$(this, d, i);
            }

            public float repeatedCombineN$mcF$sp(float f, int i) {
                return Semigroup.repeatedCombineN$mcF$sp$(this, f, i);
            }

            public int repeatedCombineN$mcI$sp(int i, int i2) {
                return Semigroup.repeatedCombineN$mcI$sp$(this, i, i2);
            }

            public long repeatedCombineN$mcJ$sp(long j, int i) {
                return Semigroup.repeatedCombineN$mcJ$sp$(this, j, i);
            }

            public Option<KafkaSource.OffsetAndCommit<F>> combineAllOption(TraversableOnce<KafkaSource.OffsetAndCommit<F>> traversableOnce) {
                return Semigroup.combineAllOption$(this, traversableOnce);
            }

            public Semigroup<KafkaSource.OffsetAndCommit<F>> reverse() {
                return Semigroup.reverse$(this);
            }

            public Semigroup<Object> reverse$mcD$sp() {
                return Semigroup.reverse$mcD$sp$(this);
            }

            public Semigroup<Object> reverse$mcF$sp() {
                return Semigroup.reverse$mcF$sp$(this);
            }

            public Semigroup<Object> reverse$mcI$sp() {
                return Semigroup.reverse$mcI$sp$(this);
            }

            public Semigroup<Object> reverse$mcJ$sp() {
                return Semigroup.reverse$mcJ$sp$(this);
            }

            public Semigroup intercalate(Object obj) {
                return Semigroup.intercalate$(this, obj);
            }

            public Semigroup<Object> intercalate$mcD$sp(double d) {
                return Semigroup.intercalate$mcD$sp$(this, d);
            }

            public Semigroup<Object> intercalate$mcF$sp(float f) {
                return Semigroup.intercalate$mcF$sp$(this, f);
            }

            public Semigroup<Object> intercalate$mcI$sp(int i) {
                return Semigroup.intercalate$mcI$sp$(this, i);
            }

            public Semigroup<Object> intercalate$mcJ$sp(long j) {
                return Semigroup.intercalate$mcJ$sp$(this, j);
            }

            public KafkaSource.OffsetAndCommit<F> combine(KafkaSource.OffsetAndCommit<F> offsetAndCommit, KafkaSource.OffsetAndCommit<F> offsetAndCommit2) {
                return offsetAndCommit.offset() > offsetAndCommit2.offset() ? offsetAndCommit : offsetAndCommit2;
            }

            {
                Semigroup.$init$(this);
            }
        };
    }

    public <F> Checkpointer<F, KafkaSource.KafkaCheckpoints<F>> com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$kafkaCheckpointer(final Async<F> async) {
        return new Checkpointer<F, KafkaSource.KafkaCheckpoints<F>>(async) { // from class: com.snowplowanalytics.snowplow.sources.kafka.KafkaSource$$anon$3
            private final KafkaSource.KafkaCheckpoints<F> empty;
            private final Async evidence$4$1;

            public boolean isEmpty(Object obj, Eq eq) {
                return Monoid.isEmpty$(this, obj, eq);
            }

            public Object combineN(Object obj, int i) {
                return Monoid.combineN$(this, obj, i);
            }

            public Object combineAll(TraversableOnce traversableOnce) {
                return Monoid.combineAll$(this, traversableOnce);
            }

            public Option<KafkaSource.KafkaCheckpoints<F>> combineAllOption(TraversableOnce<KafkaSource.KafkaCheckpoints<F>> traversableOnce) {
                return Monoid.combineAllOption$(this, traversableOnce);
            }

            /* renamed from: reverse, reason: merged with bridge method [inline-methods] */
            public Monoid<KafkaSource.KafkaCheckpoints<F>> m3reverse() {
                return Monoid.reverse$(this);
            }

            public double combine$mcD$sp(double d, double d2) {
                return Semigroup.combine$mcD$sp$(this, d, d2);
            }

            public float combine$mcF$sp(float f, float f2) {
                return Semigroup.combine$mcF$sp$(this, f, f2);
            }

            public int combine$mcI$sp(int i, int i2) {
                return Semigroup.combine$mcI$sp$(this, i, i2);
            }

            public long combine$mcJ$sp(long j, long j2) {
                return Semigroup.combine$mcJ$sp$(this, j, j2);
            }

            public double combineN$mcD$sp(double d, int i) {
                return Semigroup.combineN$mcD$sp$(this, d, i);
            }

            public float combineN$mcF$sp(float f, int i) {
                return Semigroup.combineN$mcF$sp$(this, f, i);
            }

            public int combineN$mcI$sp(int i, int i2) {
                return Semigroup.combineN$mcI$sp$(this, i, i2);
            }

            public long combineN$mcJ$sp(long j, int i) {
                return Semigroup.combineN$mcJ$sp$(this, j, i);
            }

            public Object repeatedCombineN(Object obj, int i) {
                return Semigroup.repeatedCombineN$(this, obj, i);
            }

            public double repeatedCombineN$mcD$sp(double d, int i) {
                return Semigroup.repeatedCombineN$mcD$sp$(this, d, i);
            }

            public float repeatedCombineN$mcF$sp(float f, int i) {
                return Semigroup.repeatedCombineN$mcF$sp$(this, f, i);
            }

            public int repeatedCombineN$mcI$sp(int i, int i2) {
                return Semigroup.repeatedCombineN$mcI$sp$(this, i, i2);
            }

            public long repeatedCombineN$mcJ$sp(long j, int i) {
                return Semigroup.repeatedCombineN$mcJ$sp$(this, j, i);
            }

            public Semigroup<Object> reverse$mcD$sp() {
                return Semigroup.reverse$mcD$sp$(this);
            }

            public Semigroup<Object> reverse$mcF$sp() {
                return Semigroup.reverse$mcF$sp$(this);
            }

            public Semigroup<Object> reverse$mcI$sp() {
                return Semigroup.reverse$mcI$sp$(this);
            }

            public Semigroup<Object> reverse$mcJ$sp() {
                return Semigroup.reverse$mcJ$sp$(this);
            }

            public Semigroup intercalate(Object obj) {
                return Semigroup.intercalate$(this, obj);
            }

            public Semigroup<Object> intercalate$mcD$sp(double d) {
                return Semigroup.intercalate$mcD$sp$(this, d);
            }

            public Semigroup<Object> intercalate$mcF$sp(float f) {
                return Semigroup.intercalate$mcF$sp$(this, f);
            }

            public Semigroup<Object> intercalate$mcI$sp(int i) {
                return Semigroup.intercalate$mcI$sp$(this, i);
            }

            public Semigroup<Object> intercalate$mcJ$sp(long j) {
                return Semigroup.intercalate$mcJ$sp$(this, j);
            }

            public KafkaSource.KafkaCheckpoints<F> combine(KafkaSource.KafkaCheckpoints<F> kafkaCheckpoints, KafkaSource.KafkaCheckpoints<F> kafkaCheckpoints2) {
                return new KafkaSource.KafkaCheckpoints<>((Map) implicits$.MODULE$.catsSyntaxSemigroup(kafkaCheckpoints.byPartition(), implicits$.MODULE$.catsKernelStdMonoidForMap(KafkaSource$.MODULE$.com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$offsetAndCommitSemigroup())).$bar$plus$bar(kafkaCheckpoints2.byPartition()));
            }

            /* renamed from: empty, reason: merged with bridge method [inline-methods] */
            public KafkaSource.KafkaCheckpoints<F> m4empty() {
                return this.empty;
            }

            public F ack(KafkaSource.KafkaCheckpoints<F> kafkaCheckpoints) {
                return (F) implicits$.MODULE$.toFunctorOps(ParallelTraversableOps1$.MODULE$.parTraverse$extension(implicits$.MODULE$.catsSyntaxParallelTraverse1(kafkaCheckpoints.byPartition().values().toList(), implicits$.MODULE$.catsStdInstancesForList()), offsetAndCommit -> {
                    return offsetAndCommit.commit();
                }, implicits$.MODULE$.catsStdInstancesForList(), cats.effect.implicits$.MODULE$.parallelForGenSpawn(this.evidence$4$1)), this.evidence$4$1).void();
            }

            public F nack(KafkaSource.KafkaCheckpoints<F> kafkaCheckpoints) {
                return (F) Applicative$.MODULE$.apply(this.evidence$4$1).unit();
            }

            {
                this.evidence$4$1 = async;
                Semigroup.$init$(this);
                Monoid.$init$(this);
                this.empty = new KafkaSource.KafkaCheckpoints<>(Predef$.MODULE$.Map().empty());
            }
        };
    }

    public <F> Stream<F, Stream<F, LowLevelEvents<KafkaSource.KafkaCheckpoints<F>>>> com$snowplowanalytics$snowplow$sources$kafka$KafkaSource$$kafkaStream(KafkaSourceConfig kafkaSourceConfig, Async<F> async) {
        return KafkaConsumer$.MODULE$.stream(consumerSettings(kafkaSourceConfig, async), async, MkConsumer$.MODULE$.mkConsumerForSync(async)).evalTap(kafkaConsumer -> {
            return kafkaConsumer.subscribeTo(kafkaSourceConfig.topicName(), Predef$.MODULE$.wrapRefArray(new String[0]));
        }).flatMap(kafkaConsumer2 -> {
            return kafkaConsumer2.partitionsMapStream().evalMapFilter(map -> {
                return MODULE$.logWhenNoPartitions(map, async);
            }).map(map2 -> {
                return MODULE$.joinPartitions(map2, async);
            });
        }, NotGiven$.MODULE$.default());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F> Stream<F, LowLevelEvents<KafkaSource.KafkaCheckpoints<F>>> joinPartitions(Map<TopicPartition, Stream<F, CommittableConsumerRecord<F, byte[], ByteBuffer>>> map, Async<F> async) {
        Seq seq = (Seq) map.toSeq().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            return ((Stream) tuple2._2()).chunks().flatMap(chunk -> {
                Some last = chunk.last();
                if (!(last instanceof Some)) {
                    if (None$.MODULE$.equals(last)) {
                        return Stream$.MODULE$.empty();
                    }
                    throw new MatchError(last);
                }
                CommittableConsumerRecord committableConsumerRecord = (CommittableConsumerRecord) last.value();
                Chunk map2 = chunk.map(committableConsumerRecord2 -> {
                    return (ByteBuffer) committableConsumerRecord2.record().value();
                });
                KafkaSource.KafkaCheckpoints kafkaCheckpoints = new KafkaSource.KafkaCheckpoints(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(topicPartition.partition())), new KafkaSource.OffsetAndCommit(committableConsumerRecord.record().offset(), committableConsumerRecord.offset().commit()))})));
                Iterator flatMap = chunk.iterator().flatMap(committableConsumerRecord3 -> {
                    Timestamp timestamp = committableConsumerRecord3.record().timestamp();
                    return Option$.MODULE$.option2Iterable(timestamp.logAppendTime().orElse(() -> {
                        return timestamp.createTime();
                    }).orElse(() -> {
                        return timestamp.unknownTime();
                    }));
                });
                return Stream$.MODULE$.emit(new LowLevelEvents(map2, kafkaCheckpoints, flatMap.isEmpty() ? None$.MODULE$ : new Some(Instant.ofEpochMilli(BoxesRunTime.unboxToLong(flatMap.min(implicits$.MODULE$.catsKernelOrderingForOrder(implicits$.MODULE$.catsKernelStdOrderForLong())))))));
            }, NotGiven$.MODULE$.default());
        }, Seq$.MODULE$.canBuildFrom());
        String formatForLog = formatForLog(map.keys());
        return Stream$.MODULE$.eval(Logger$.MODULE$.apply(logger(async)).info(() -> {
            return new StringBuilder(24).append("Processsing partitions: ").append(formatForLog).toString();
        })).drain().$plus$plus(() -> {
            return Stream$NestedStreamOps$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.NestedStreamOps(Stream$.MODULE$.emits(seq)), async).onFinalize(Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                return new StringBuilder(35).append("Stopping processing of partitions: ").append(formatForLog).toString();
            }), async);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F> F logWhenNoPartitions(Map<TopicPartition, Stream<F, CommittableConsumerRecord<F, byte[], ByteBuffer>>> map, Sync<F> sync) {
        return map.isEmpty() ? (F) implicits$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger(sync)).info(() -> {
            return "No partitions are currently assigned to this processor";
        }), sync).as(None$.MODULE$) : (F) package$.MODULE$.Sync().apply(sync).pure(new Some(map));
    }

    public String formatForLog(Iterable<TopicPartition> iterable) {
        return ((TraversableOnce) ((TraversableOnce) iterable.map(topicPartition -> {
            return new StringBuilder(1).append(topicPartition.topic()).append("-").append(topicPartition.partition()).toString();
        }, Iterable$.MODULE$.canBuildFrom())).toSeq().sorted(implicits$.MODULE$.catsKernelOrderingForOrder(implicits$.MODULE$.catsKernelStdOrderForString()))).mkString(",");
    }

    private <F> Resource<F, GenericDeserializer<Value, F, ByteBuffer>> byteBufferDeserializer(Sync<F> sync) {
        return package$.MODULE$.Resource().pure(fs2.kafka.package$.MODULE$.Deserializer().lift(bArr -> {
            return package$.MODULE$.Sync().apply(sync).pure(ByteBuffer.wrap(bArr));
        }, sync));
    }

    private <F> ConsumerSettings<F, byte[], ByteBuffer> consumerSettings(KafkaSourceConfig kafkaSourceConfig, Async<F> async) {
        return ConsumerSettings$.MODULE$.apply(GenericDeserializer$.MODULE$.resource(GenericDeserializer$.MODULE$.identity(async)), byteBufferDeserializer(async)).withBootstrapServers(kafkaSourceConfig.bootstrapServers()).withProperties(kafkaSourceConfig.consumerConf()).withEnableAutoCommit(false);
    }

    private KafkaSource$() {
        MODULE$ = this;
    }
}
