package fs2.io.internal;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Resource;
import cats.effect.kernel.syntax.EffectResourceOps$;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.effect.std.Semaphore;
import cats.effect.std.Semaphore$;
import cats.effect.syntax.package$all$;
import fs2.Chunk;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Pull$;
import fs2.Pull$StreamPullOps$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import java.io.Serializable;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Option;
import scala.runtime.ModuleSerializationProxy;
import scala.util.Either;
import scala.util.NotGiven$;

/* compiled from: SuspendedStream.scala */
/* loaded from: input_file:fs2/io/internal/SuspendedStream$.class */
public final class SuspendedStream$ implements Serializable {
    public static final SuspendedStream$ MODULE$ = new SuspendedStream$();

    private SuspendedStream$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(SuspendedStream$.class);
    }

    public <F, O> Resource<F, SuspendedStream<F, O>> apply(Stream<F, O> stream, GenConcurrent<F, Throwable> genConcurrent) {
        return EffectResourceOps$.MODULE$.toResource$extension(package$all$.MODULE$.effectResourceOps(Queue$.MODULE$.synchronous(genConcurrent))).flatMap(queue -> {
            return GenSpawnOps$.MODULE$.background$extension(package$all$.MODULE$.genSpawnOps(stream.chunks().attempt().enqueueNoneTerminated(queue).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genConcurrent))).drain(), genConcurrent), genConcurrent).flatMap(obj -> {
                return EffectResourceOps$.MODULE$.toResource$extension(package$all$.MODULE$.effectResourceOps(genConcurrent.ref(fs2$io$internal$SuspendedStream$$$streamFromQueue(queue, genConcurrent)))).flatMap(ref -> {
                    return EffectResourceOps$.MODULE$.toResource$extension(package$all$.MODULE$.effectResourceOps(Semaphore$.MODULE$.apply(1L, genConcurrent))).map(semaphore -> {
                        return new SuspendedStream<F, O>(genConcurrent, queue, ref, semaphore) { // from class: fs2.io.internal.SuspendedStream$$anon$1
                            private final GenConcurrent F$1;
                            private final Queue queue$1;
                            private final Ref suspended$1;
                            private final Semaphore semaphore$1;

                            {
                                this.F$1 = genConcurrent;
                                this.queue$1 = queue;
                                this.suspended$1 = ref;
                                this.semaphore$1 = semaphore;
                            }

                            @Override // fs2.io.internal.SuspendedStream
                            public Stream stream() {
                                return Stream$.MODULE$.resource(this.semaphore$1.permit(), this.F$1).flatMap(boxedUnit -> {
                                    return Stream$.MODULE$.eval(this.suspended$1.get()).flatten($less$colon$less$.MODULE$.refl()).onFinalize(this.suspended$1.set(SuspendedStream$.MODULE$.fs2$io$internal$SuspendedStream$$$streamFromQueue(this.queue$1, this.F$1)), this.F$1);
                                }, NotGiven$.MODULE$.value());
                            }

                            @Override // fs2.io.internal.SuspendedStream
                            public Stream getAndUpdate(Function1 function1) {
                                return Stream$.MODULE$.resource(this.semaphore$1.permit(), this.F$1).flatMap(boxedUnit -> {
                                    return Pull$StreamPullOps$.MODULE$.stream$extension(Pull$.MODULE$.StreamPullOps(Pull$.MODULE$.eval(this.suspended$1.get()).flatMap(function1).flatMap(stream2 -> {
                                        return Pull$.MODULE$.eval(this.suspended$1.set(stream2)).void();
                                    })));
                                }, NotGiven$.MODULE$.value());
                            }
                        };
                    });
                });
            });
        });
    }

    public <F, O> Stream<F, O> fs2$io$internal$SuspendedStream$$$streamFromQueue(Queue<F, Option<Either<Throwable, Chunk<O>>>> queue, GenConcurrent<F, Throwable> genConcurrent) {
        return Stream$.MODULE$.fromQueueNoneTerminated(queue, Stream$.MODULE$.fromQueueNoneTerminated$default$2(), genConcurrent).rethrow($less$colon$less$.MODULE$.refl(), RaiseThrowable$.MODULE$.fromApplicativeError(genConcurrent)).unchunks($less$colon$less$.MODULE$.refl());
    }
}
