package com.snowplowanalytics.snowplow.sinks.kafka;

import cats.Monad;
import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.implicits$;
import cats.syntax.FlattenOps$;
import com.snowplowanalytics.snowplow.sinks.ListOfList;
import com.snowplowanalytics.snowplow.sinks.ListOfList$;
import com.snowplowanalytics.snowplow.sinks.Sink;
import com.snowplowanalytics.snowplow.sinks.Sink$;
import com.snowplowanalytics.snowplow.sinks.Sinkable;
import fs2.kafka.GenericSerializer$;
import fs2.kafka.Header$;
import fs2.kafka.HeaderSerializer$;
import fs2.kafka.Headers$;
import fs2.kafka.KafkaProducer;
import fs2.kafka.KafkaProducer$;
import fs2.kafka.KafkaProducer$ProducerPartiallyApplied$;
import fs2.kafka.ProducerRecord;
import fs2.kafka.ProducerRecord$;
import fs2.kafka.ProducerSettings$;
import fs2.kafka.producer.MkProducer$;
import java.util.UUID;
import scala.MatchError;
import scala.collection.Iterable;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.List;

/* compiled from: KafkaSink.scala */
/* loaded from: input_file:com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink$.class */
public final class KafkaSink$ {
    public static KafkaSink$ MODULE$;

    static {
        new KafkaSink$();
    }

    public <F> Resource<F, Sink<F>> resource(KafkaSinkConfig kafkaSinkConfig, Async<F> async) {
        return KafkaProducer$ProducerPartiallyApplied$.MODULE$.resource$extension(KafkaProducer$.MODULE$.apply(), ProducerSettings$.MODULE$.apply(GenericSerializer$.MODULE$.resource(GenericSerializer$.MODULE$.string(async)), GenericSerializer$.MODULE$.resource(GenericSerializer$.MODULE$.identity(async))).withBootstrapServers(kafkaSinkConfig.bootstrapServers()).withProperties(kafkaSinkConfig.producerConf()), async, MkProducer$.MODULE$.mkProducerForSync(async)).map(kafkaProducer -> {
            return MODULE$.fromFs2Producer(kafkaSinkConfig, kafkaProducer, async);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F> Sink<F> fromFs2Producer(KafkaSinkConfig kafkaSinkConfig, KafkaProducer<F, String, byte[]> kafkaProducer, Monad<F> monad) {
        return Sink$.MODULE$.apply(obj -> {
            return $anonfun$fromFs2Producer$1(kafkaSinkConfig, kafkaProducer, monad, ((ListOfList) obj).com$snowplowanalytics$snowplow$sinks$ListOfList$$value());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProducerRecord<String, byte[]> toProducerRecord(KafkaSinkConfig kafkaSinkConfig, Sinkable sinkable) {
        return ProducerRecord$.MODULE$.apply(kafkaSinkConfig.topicName(), sinkable.partitionKey().getOrElse(() -> {
            return UUID.randomUUID().toString();
        }), sinkable.bytes()).withHeaders(Headers$.MODULE$.fromIterable((Iterable) sinkable.attributes().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Header$.MODULE$.apply((String) tuple2._1(), (String) tuple2._2(), HeaderSerializer$.MODULE$.string());
        }, Iterable$.MODULE$.canBuildFrom())));
    }

    public static final /* synthetic */ Object $anonfun$fromFs2Producer$1(KafkaSinkConfig kafkaSinkConfig, KafkaProducer kafkaProducer, Monad monad, List list) {
        return implicits$.MODULE$.toFunctorOps(FlattenOps$.MODULE$.flatten$extension(implicits$.MODULE$.catsSyntaxFlatten(kafkaProducer.produce(ListOfList$.MODULE$.copyToChunk$extension(list).map(sinkable -> {
            return MODULE$.toProducerRecord(kafkaSinkConfig, sinkable);
        })), monad), monad), monad).void();
    }

    private KafkaSink$() {
        MODULE$ = this;
    }
}
