package dev.naoh.lettucef.streams;

import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Resource;
import cats.effect.package$;
import cats.effect.std.Dispatcher;
import cats.syntax.FlatMapOps$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import dev.naoh.lettucef.api.models.pubsub.RedisPushed;
import dev.naoh.lettucef.api.models.pubsub.RedisPushed$Message$;
import dev.naoh.lettucef.api.models.pubsub.RedisPushed$PMessage$;
import dev.naoh.lettucef.core.RedisPubSubF;
import dev.naoh.lettucef.streams.ManagedPubSubF;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.concurrent.Channel;
import fs2.concurrent.SignallingRef;
import fs2.concurrent.SignallingRef$;
import io.lettuce.core.pubsub.RedisPubSubListener;
import java.io.Serializable;
import scala.Function1;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: ManagedPubSubF.scala */
/* loaded from: input_file:dev/naoh/lettucef/streams/ManagedPubSubF$.class */
public final class ManagedPubSubF$ implements Serializable {
    public static final ManagedPubSubF$State$ State = null;
    public static final ManagedPubSubF$ MODULE$ = new ManagedPubSubF$();

    private ManagedPubSubF$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(ManagedPubSubF$.class);
    }

    public <F, K, V> Resource<F, ManagedPubSubF<F, K, V>> create(Resource<F, RedisPubSubF<F, K, V>> resource, Dispatcher<F> dispatcher, Async<F> async) {
        return resource.flatMap(redisPubSubF -> {
            return package$.MODULE$.Resource().eval(SignallingRef$.MODULE$.of(ManagedPubSubF$State$.MODULE$.zero(), async)).flatMap(signallingRef -> {
                return package$.MODULE$.Resource().eval(SignallingRef$.MODULE$.of(ManagedPubSubF$State$.MODULE$.zero(), async)).flatMap(signallingRef -> {
                    return new ManagedPubSubF(redisPubSubF, dispatcher, signallingRef, signallingRef, async).dev$naoh$lettucef$streams$ManagedPubSubF$$init().map(managedPubSubF -> {
                        return managedPubSubF;
                    });
                });
            });
        });
    }

    public <F, K, V> RedisPubSubListener<K, V> dev$naoh$lettucef$streams$ManagedPubSubF$$$messageSender(final Set<K> set, final Channel<F, RedisPushed.Message<K, V>> channel, final Dispatcher<F> dispatcher) {
        return new ManagedPubSubF.VoidListener<K, V>(set, channel, dispatcher) { // from class: dev.naoh.lettucef.streams.ManagedPubSubF$$anon$2
            private final Set target$3;
            private final Channel ch$2;
            private final Dispatcher d$4;

            {
                this.target$3 = set;
                this.ch$2 = channel;
                this.d$4 = dispatcher;
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void message(Object obj, Object obj2, Object obj3) {
                message(obj, obj2, obj3);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void subscribed(Object obj, long j) {
                subscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void psubscribed(Object obj, long j) {
                psubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void unsubscribed(Object obj, long j) {
                unsubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void punsubscribed(Object obj, long j) {
                punsubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void message(Object obj, Object obj2) {
                if (this.target$3.contains(obj)) {
                    this.d$4.unsafeRunSync(this.ch$2.send(RedisPushed$Message$.MODULE$.apply(obj, obj2)));
                }
            }
        };
    }

    public <F, K, V> RedisPubSubListener<K, V> dev$naoh$lettucef$streams$ManagedPubSubF$$$pmessageSender(final Set<K> set, final Channel<F, RedisPushed.PMessage<K, V>> channel, final Dispatcher<F> dispatcher) {
        return new ManagedPubSubF.VoidListener<K, V>(set, channel, dispatcher) { // from class: dev.naoh.lettucef.streams.ManagedPubSubF$$anon$3
            private final Set target$4;
            private final Channel ch$3;
            private final Dispatcher d$5;

            {
                this.target$4 = set;
                this.ch$3 = channel;
                this.d$5 = dispatcher;
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void message(Object obj, Object obj2) {
                message(obj, obj2);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void subscribed(Object obj, long j) {
                subscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void psubscribed(Object obj, long j) {
                psubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void unsubscribed(Object obj, long j) {
                unsubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public /* bridge */ /* synthetic */ void punsubscribed(Object obj, long j) {
                punsubscribed(obj, j);
            }

            @Override // dev.naoh.lettucef.streams.ManagedPubSubF.VoidListener
            public void message(Object obj, Object obj2, Object obj3) {
                if (this.target$4.contains(obj)) {
                    this.d$5.unsafeRunSync(this.ch$3.send(RedisPushed$PMessage$.MODULE$.apply(obj, obj2, obj3)));
                }
            }
        };
    }

    public <F, K, O> Resource<F, Stream<F, O>> dev$naoh$lettucef$streams$ManagedPubSubF$$$stream(Set<K> set, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef, Function1<Seq<K>, Object> function1, Function1<Seq<K>, Object> function12, Resource<F, Stream<F, O>> resource, Async<F> async) {
        return package$.MODULE$.Resource().eval(package$.MODULE$.Async().apply(async).memoize(package$functor$.MODULE$.toFunctorOps(update1$1(signallingRef, async, ManagedPubSubF$State$.MODULE$.unsubscribe1(set), function12), async).void())).flatMap(obj -> {
            return package$.MODULE$.Resource().make(FlatMapOps$.MODULE$.$greater$greater$extension(package$flatMap$.MODULE$.catsSyntaxFlatMapOps(update1$1(signallingRef, async, ManagedPubSubF$State$.MODULE$.subscribe1(set), function1), async), () -> {
                return r3.stream$$anonfun$1$$anonfun$1(r4, r5, r6);
            }, async), boxedUnit -> {
                return obj;
            }, async).flatMap(boxedUnit2 -> {
                return resource.map(stream -> {
                    return stream.onFinalize(obj, async);
                });
            });
        });
    }

    private <F, K> Object await(Set<K> set, SignallingRef<F, Map<K, Tuple2<Object, Object>>> signallingRef, GenConcurrent<F, Throwable> genConcurrent) {
        Stream discrete = signallingRef.discrete();
        return discrete.takeWhile(map -> {
            return !set.forall(obj -> {
                return map.get(obj).exists(tuple2 -> {
                    return BoxesRunTime.unboxToInt(tuple2._1()) == ManagedPubSubF$State$.MODULE$.Subscribed();
                });
            });
        }, discrete.takeWhile$default$2()).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genConcurrent))).drain();
    }

    private final Object update1$1(SignallingRef signallingRef, Async async, Function1 function1, Function1 function12) {
        return package$flatMap$.MODULE$.toFlatMapOps(signallingRef.modify(function1), async).flatMap(seq -> {
            return seq.isEmpty() ? package$.MODULE$.Async().apply(async).unit() : function12.apply(seq);
        });
    }

    private final Object stream$$anonfun$1$$anonfun$1(Set set, SignallingRef signallingRef, Async async) {
        return await(set, signallingRef, async);
    }
}
