package dev.profunktor.redis4cats.pubsub.internals;

import cats.Applicative$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$;
import cats.effect.kernel.Sync$;
import cats.effect.kernel.implicits$;
import cats.effect.kernel.syntax.MonadCancelOps_$;
import cats.syntax.FlatMapOps$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.data;
import dev.profunktor.redis4cats.effect.FutureLift;
import dev.profunktor.redis4cats.effect.FutureLift$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.RedisExecutor;
import dev.profunktor.redis4cats.pubsub.SubscribeCommands;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$OptionStreamOps$;
import fs2.concurrent.Topic;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function1;
import scala.Option;
import scala.collection.immutable.Map;
import scala.util.NotGiven$;

/* compiled from: Subscriber.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/internals/Subscriber.class */
public class Subscriber<F, K, V> implements SubscribeCommands<Stream, K, V> {
    private final Ref<F, Map<K, Topic<F, Option<V>>>> state;
    private final StatefulRedisPubSubConnection<K, V> subConnection;
    private final Async<F> evidence$1;
    private final FutureLift<F> evidence$2;
    private final Log<F> evidence$3;
    private final RedisExecutor<F> evidence$4;

    public Subscriber(Ref<F, Map<K, Topic<F, Option<V>>>> ref, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, Async<F> async, FutureLift<F> futureLift, Log<F> log, RedisExecutor<F> redisExecutor) {
        this.state = ref;
        this.subConnection = statefulRedisPubSubConnection;
        this.evidence$1 = async;
        this.evidence$2 = futureLift;
        this.evidence$3 = log;
        this.evidence$4 = redisExecutor;
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    /* renamed from: subscribe, reason: merged with bridge method [inline-methods] */
    public Stream subscribe2(Object obj) {
        return Stream$.MODULE$.resource((Resource) FlatMapOps$.MODULE$.$greater$greater$eq$extension((Resource) package$all$.MODULE$.catsSyntaxFlatMapOps(Resource$.MODULE$.eval(this.state.get()), Resource$.MODULE$.catsEffectAsyncForResource(this.evidence$1)), (Function1) PubSubInternals$.MODULE$.apply(this.state, this.subConnection, this.evidence$1, this.evidence$3).apply(new data.RedisChannel(obj)), Resource$.MODULE$.catsEffectAsyncForResource(this.evidence$1)), this.evidence$1).evalTap(topic -> {
            return FutureLift$.MODULE$.apply(this.evidence$2).lift(Sync$.MODULE$.apply(this.evidence$1).delay(() -> {
                return r2.subscribe$$anonfun$1$$anonfun$1(r3);
            }), this.evidence$4);
        }, this.evidence$1).flatMap(topic2 -> {
            return Stream$OptionStreamOps$.MODULE$.unNone$extension(Stream$.MODULE$.OptionStreamOps(topic2.subscribe(500)));
        }, NotGiven$.MODULE$.value());
    }

    @Override // dev.profunktor.redis4cats.pubsub.SubscribeCommands
    /* renamed from: unsubscribe, reason: merged with bridge method [inline-methods] */
    public Stream unsubscribe2(Object obj) {
        return Stream$.MODULE$.eval(MonadCancelOps_$.MODULE$.guarantee$extension(implicits$.MODULE$.monadCancelOps_(package$all$.MODULE$.toFunctorOps(FutureLift$.MODULE$.apply(this.evidence$2).lift(Sync$.MODULE$.apply(this.evidence$1).delay(() -> {
            return r5.unsubscribe$$anonfun$1(r6);
        }), this.evidence$4), this.evidence$1).void()), package$all$.MODULE$.toFlatMapOps(this.state.get(), this.evidence$1).flatMap(map -> {
            return package$all$.MODULE$.catsSyntaxApply(map.get(obj).fold(this::unsubscribe$$anonfun$2$$anonfun$1, topic -> {
                return package$all$.MODULE$.toFunctorOps(topic.publish1(package$all$.MODULE$.none()), this.evidence$1).void();
            }), this.evidence$1).$times$greater(this.state.update(map -> {
                return map.$minus(obj);
            }));
        }), this.evidence$1));
    }

    private final RedisFuture subscribe$$anonfun$1$$anonfun$1(Object obj) {
        return this.subConnection.async().subscribe(new Object[]{obj});
    }

    private final RedisFuture unsubscribe$$anonfun$1(Object obj) {
        return this.subConnection.async().unsubscribe(new Object[]{obj});
    }

    private final Object unsubscribe$$anonfun$2$$anonfun$1() {
        return Applicative$.MODULE$.apply(this.evidence$1).unit();
    }
}
