package org.novelfs.streaming.kafka.consumer;

import cats.Functor;
import cats.effect.ConcurrentEffect;
import cats.effect.Sync;
import cats.effect.Sync$;
import cats.effect.Timer;
import cats.effect.concurrent.MVar;
import cats.effect.concurrent.MVar$;
import cats.implicits$;
import cats.syntax.ApplicativeErrorOps$;
import fs2.Chunk$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$PureOps$;
import fs2.concurrent.Queue;
import fs2.concurrent.Queue$;
import fs2.concurrent.Signal;
import fs2.concurrent.SignallingRef;
import fs2.concurrent.SignallingRef$;
import fs2.internal.FreeC;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.novelfs.streaming.kafka.TopicPartition;
import org.novelfs.streaming.kafka.consumer.KafkaOffsetCommitSettings;
import org.novelfs.streaming.kafka.interpreter.ThinKafkaConsumerClient$;
import org.novelfs.streaming.kafka.utils.package$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.UninitializedFieldError;
import scala.collection.immutable.Map;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaConsumer.scala */
/* loaded from: input_file:org/novelfs/streaming/kafka/consumer/KafkaConsumer$.class */
public final class KafkaConsumer$ {
    public static KafkaConsumer$ MODULE$;
    private final Logger log;
    private volatile boolean bitmap$init$0;

    static {
        new KafkaConsumer$();
    }

    private Logger log() {
        if (!this.bitmap$init$0) {
            throw new UninitializedFieldError("Uninitialized field: /home/travis/build/TheInnerLight/streaming-kafka/src/main/scala/org/novelfs/streaming/kafka/consumer/KafkaConsumer.scala: 20");
        }
        Logger logger = this.log;
        return this.log;
    }

    public <F, K, V> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> accumulateOffsetMetadata() {
        return obj -> {
            return new Stream($anonfun$accumulateOffsetMetadata$1(((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F, K, V> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> publishOffsetsToQueue(Queue<F, Map<TopicPartition, OffsetMetadata>> queue, Functor<F> functor) {
        return obj -> {
            return new Stream($anonfun$publishOffsetsToQueue$1(queue, functor, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F, K, V> FreeC<?, BoxedUnit> commitOffsetsFromQueueEvery(FiniteDuration finiteDuration, MVar<F, KafkaConsumerSubscription<K, V>> mVar, SignallingRef<F, Object> signallingRef, Queue<F, Map<TopicPartition, OffsetMetadata>> queue, ConcurrentEffect<F> concurrentEffect, Timer<F> timer, ExecutionContext executionContext) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.hold$extension(queue.dequeue(), Predef$.MODULE$.Map().empty(), concurrentEffect), signal -> {
            return new Stream($anonfun$commitOffsetsFromQueueEvery$1(finiteDuration, mVar, signallingRef, concurrentEffect, timer, signal));
        });
    }

    public <F, K, V> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> deserializer(Deserializer<K> deserializer, Deserializer<V> deserializer2, Sync<F> sync) {
        return obj -> {
            return new Stream($anonfun$deserializer$1(deserializer, deserializer2, sync, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F, K, V> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> applyCommitPolicy(MVar<F, KafkaConsumerSubscription<byte[], byte[]>> mVar, KafkaConsumerConfig<K, V> kafkaConsumerConfig, ConcurrentEffect<F> concurrentEffect, Timer<F> timer, ExecutionContext executionContext) {
        return obj -> {
            return new Stream($anonfun$applyCommitPolicy$1(mVar, kafkaConsumerConfig, concurrentEffect, timer, executionContext, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F, K, V> FreeC<?, BoxedUnit> apply(KafkaConsumerConfig<K, V> kafkaConsumerConfig, ConcurrentEffect<F> concurrentEffect, Timer<F> timer, ExecutionContext executionContext) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.bracket(subscribe$1(concurrentEffect, kafkaConsumerConfig.copy(kafkaConsumerConfig.copy$default$1(), kafkaConsumerConfig.copy$default$2(), kafkaConsumerConfig.copy$default$3(), kafkaConsumerConfig.copy$default$4(), kafkaConsumerConfig.copy$default$5(), kafkaConsumerConfig.copy$default$6(), kafkaConsumerConfig.copy$default$7(), kafkaConsumerConfig.copy$default$8(), new ByteArrayDeserializer(), new ByteArrayDeserializer())), mVar -> {
            return implicits$.MODULE$.toFlatMapOps(mVar.take(), concurrentEffect).flatMap(kafkaConsumerSubscription -> {
                return KafkaConsumerSubscription$.MODULE$.cleanup(kafkaConsumerSubscription, concurrentEffect);
            });
        }), mVar2 -> {
            return new Stream($anonfun$apply$5(kafkaConsumerConfig, concurrentEffect, timer, executionContext, mVar2));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$accumulateOffsetMetadata$1(FreeC freeC) {
        return Stream$.MODULE$.zipWithScan1$extension(freeC, Predef$.MODULE$.Map().empty(), (map, consumerRecord) -> {
            return map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(consumerRecord.topicPartition()), new OffsetMetadata(consumerRecord.offset())));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$publishOffsetsToQueue$1(Queue queue, Functor functor, FreeC freeC) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.evalTap$extension(Stream$.MODULE$.through$extension(freeC, MODULE$.accumulateOffsetMetadata()), tuple2 -> {
            if (tuple2 != null) {
                return queue.enqueue1((Map) tuple2._2());
            }
            throw new MatchError(tuple2);
        }, functor), tuple22 -> {
            if (tuple22 != null) {
                return (ConsumerRecord) tuple22._1();
            }
            throw new MatchError(tuple22);
        });
    }

    public static final /* synthetic */ void $anonfun$commitOffsetsFromQueueEvery$6(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ FreeC $anonfun$commitOffsetsFromQueueEvery$1(FiniteDuration finiteDuration, MVar mVar, SignallingRef signallingRef, ConcurrentEffect concurrentEffect, Timer timer, Signal signal) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.evalMap$extension(Stream$.MODULE$.evalMap$extension(Stream$.MODULE$.fixedRate(finiteDuration, timer), boxedUnit -> {
            return signal.get();
        }), map -> {
            return ApplicativeErrorOps$.MODULE$.handleErrorWith$extension(implicits$.MODULE$.catsSyntaxApplicativeError(package$.MODULE$.MVarOps(mVar, concurrentEffect).locked(kafkaConsumerSubscription -> {
                return ThinKafkaConsumerClient$.MODULE$.apply(concurrentEffect).commitOffsetMap(map, kafkaConsumerSubscription);
            }), concurrentEffect), th -> {
                MODULE$.log().error("Error during offset commit", th);
                return signallingRef.set(BoxesRunTime.boxToBoolean(true));
            }, concurrentEffect);
        }), boxedUnit2 -> {
            $anonfun$commitOffsetsFromQueueEvery$6(boxedUnit2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ FreeC $anonfun$deserializer$1(Deserializer deserializer, Deserializer deserializer2, Sync sync, FreeC freeC) {
        return Stream$.MODULE$.evalMap$extension(freeC, consumerRecord -> {
            return ApplicativeErrorOps$.MODULE$.attempt$extension(implicits$.MODULE$.catsSyntaxApplicativeError(Sync$.MODULE$.apply(sync).delay(() -> {
                return consumerRecord.copy(consumerRecord.copy$default$1(), consumerRecord.copy$default$2(), deserializer.deserialize(consumerRecord.topicPartition().topic(), (byte[]) consumerRecord.key()), deserializer2.deserialize(consumerRecord.topicPartition().topic(), (byte[]) consumerRecord.value()), consumerRecord.copy$default$5(), consumerRecord.copy$default$6(), consumerRecord.copy$default$7(), consumerRecord.copy$default$8());
            }), sync), sync);
        });
    }

    public static final /* synthetic */ FreeC $anonfun$applyCommitPolicy$3(MVar mVar, ConcurrentEffect concurrentEffect, Timer timer, ExecutionContext executionContext, FiniteDuration finiteDuration, FreeC freeC, Queue queue, SignallingRef signallingRef) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.concurrently$extension(Stream$.MODULE$.interruptWhen$extension2(Stream$.MODULE$.through$extension(freeC, MODULE$.publishOffsetsToQueue(queue, concurrentEffect)), signallingRef, concurrentEffect), MODULE$.commitOffsetsFromQueueEvery(finiteDuration, mVar, signallingRef, queue, concurrentEffect, timer, executionContext), concurrentEffect), consumerRecord -> {
            return consumerRecord;
        });
    }

    public static final /* synthetic */ FreeC $anonfun$applyCommitPolicy$2(MVar mVar, ConcurrentEffect concurrentEffect, Timer timer, ExecutionContext executionContext, FiniteDuration finiteDuration, FreeC freeC, Queue queue) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), concurrentEffect)), signallingRef -> {
            return new Stream($anonfun$applyCommitPolicy$3(mVar, concurrentEffect, timer, executionContext, finiteDuration, freeC, queue, signallingRef));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$applyCommitPolicy$1(MVar mVar, KafkaConsumerConfig kafkaConsumerConfig, ConcurrentEffect concurrentEffect, Timer timer, ExecutionContext executionContext, FreeC freeC) {
        FreeC freeC2;
        KafkaOffsetCommitSettings commitOffsetSettings = kafkaConsumerConfig.commitOffsetSettings();
        if (commitOffsetSettings instanceof KafkaOffsetCommitSettings.AutoCommit) {
            FiniteDuration timeBetweenCommits = ((KafkaOffsetCommitSettings.AutoCommit) commitOffsetSettings).timeBetweenCommits();
            freeC2 = Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Queue$.MODULE$.unbounded(concurrentEffect)), queue -> {
                return new Stream($anonfun$applyCommitPolicy$2(mVar, concurrentEffect, timer, executionContext, timeBetweenCommits, freeC, queue));
            });
        } else {
            freeC2 = freeC;
        }
        return freeC2;
    }

    private static final Object subscribe$1(ConcurrentEffect concurrentEffect, KafkaConsumerConfig kafkaConsumerConfig) {
        return implicits$.MODULE$.toFlatMapOps(KafkaConsumerSubscription$.MODULE$.apply(kafkaConsumerConfig, concurrentEffect), concurrentEffect).flatMap(kafkaConsumerSubscription -> {
            return implicits$.MODULE$.toFunctorOps(MVar$.MODULE$.of(kafkaConsumerSubscription, concurrentEffect), concurrentEffect).map(mVar -> {
                return mVar;
            });
        });
    }

    public static final /* synthetic */ FreeC $anonfun$apply$7(Vector vector) {
        return Stream$.MODULE$.map$extension(Stream$PureOps$.MODULE$.covary$extension(Stream$.MODULE$.PureOps(Stream$.MODULE$.chunk(Chunk$.MODULE$.vector(vector)))), consumerRecord -> {
            return consumerRecord;
        });
    }

    public static final /* synthetic */ FreeC $anonfun$apply$5(KafkaConsumerConfig kafkaConsumerConfig, ConcurrentEffect concurrentEffect, Timer timer, ExecutionContext executionContext, MVar mVar) {
        return Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.scope$extension(Stream$.MODULE$.repeatEval(package$.MODULE$.MVarOps(mVar, concurrentEffect).locked(kafkaConsumerSubscription -> {
            return ThinKafkaConsumerClient$.MODULE$.apply(concurrentEffect).poll(kafkaConsumerConfig.pollTimeout(), kafkaConsumerSubscription);
        }))), vector -> {
            return new Stream($anonfun$apply$7(vector));
        }), MODULE$.applyCommitPolicy(mVar, kafkaConsumerConfig, concurrentEffect, timer, executionContext)), MODULE$.deserializer(kafkaConsumerConfig.keyDeserializer(), kafkaConsumerConfig.valueDeserializer(), concurrentEffect));
    }

    private KafkaConsumer$() {
        MODULE$ = this;
        this.log = LoggerFactory.getLogger(getClass());
        this.bitmap$init$0 = true;
    }
}
