package com.snowplowanalytics.snowplow.sources.internal;

import cats.effect.kernel.Async;
import cats.effect.kernel.Deferred;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$ExitCase$Canceled$;
import cats.effect.kernel.Resource$ExitCase$Succeeded$;
import cats.effect.kernel.Sync;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.package$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.implicits$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.ApplyOps$;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import org.typelevel.log4cats.Logger$;
import org.typelevel.log4cats.SelfAwareStructuredLogger;
import org.typelevel.log4cats.slf4j.Slf4jLogger$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

/* compiled from: CleanCancellation.scala */
/* loaded from: input_file:com/snowplowanalytics/snowplow/sources/internal/CleanCancellation$.class */
public final class CleanCancellation$ {
    public static final CleanCancellation$ MODULE$ = new CleanCancellation$();

    private <F> SelfAwareStructuredLogger<F> logger(Sync<F> sync) {
        return Slf4jLogger$.MODULE$.getLogger(sync, "com.snowplowanalytics.snowplow.sources.internal.CleanCancellation");
    }

    public <F, A> Function1<Stream<F, A>, Stream<F, Nothing$>> apply(Function1<Stream<F, A>, Stream<F, Nothing$>> function1, Async<F> async) {
        return stream -> {
            return Stream$.MODULE$.eval(Queue$.MODULE$.synchronous(async)).flatMap(queue -> {
                return Stream$.MODULE$.eval(package$.MODULE$.Deferred().apply(async)).flatMap(deferred -> {
                    return MODULE$.impl(stream, function1, queue, deferred, async);
                }, NotGiven$.MODULE$.default());
            }, NotGiven$.MODULE$.default());
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F, A> Stream<F, Nothing$> impl(Stream<F, A> stream, Function1<Stream<F, A>, Stream<F, Nothing$>> function1, Queue<F, Option<A>> queue, Deferred<F, BoxedUnit> deferred, Async<F> async) {
        return Stream$.MODULE$.eval(deferred.get()).onFinalizeCase(exitCase -> {
            if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase)) {
                return package$.MODULE$.Sync().apply(async).unit();
            }
            if (Resource$ExitCase$Canceled$.MODULE$.equals(exitCase)) {
                return MODULE$.terminateStream(queue, deferred, async);
            }
            if (exitCase instanceof Resource.ExitCase.Errored) {
                return ApplicativeErrorOps$.MODULE$.handleErrorWith$extension(implicits$.MODULE$.catsSyntaxApplicativeError(MODULE$.terminateStream(queue, deferred, async), async), th -> {
                    return Logger$.MODULE$.apply(MODULE$.logger(async)).error(th, () -> {
                        return "Error when terminating the sink and checkpoint";
                    });
                }, async);
            }
            throw new MatchError(exitCase);
        }, async).drain().concurrently(Stream$.MODULE$.bracket(ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(BoxedUnit.UNIT), async), boxedUnit -> {
            return implicits$.MODULE$.toFunctorOps(deferred.complete(BoxedUnit.UNIT), async).void();
        }).$greater$greater(() -> {
            return Stream$.MODULE$.fromQueueNoneTerminated(queue, 1, async).through(function1).onFinalizeCase(exitCase2 -> {
                if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase2)) {
                    return Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                        return "Completed sinking and checkpointing events";
                    });
                }
                if (Resource$ExitCase$Canceled$.MODULE$.equals(exitCase2)) {
                    return Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                        return "Sinking and checkpointing was cancelled";
                    });
                }
                if (!(exitCase2 instanceof Resource.ExitCase.Errored)) {
                    throw new MatchError(exitCase2);
                }
                return Logger$.MODULE$.apply(MODULE$.logger(async)).error(((Resource.ExitCase.Errored) exitCase2).e(), () -> {
                    return "Error on sinking and checkpointing events";
                });
            }, async);
        }, NotGiven$.MODULE$.default()), async).concurrently(stream.evalMap(obj -> {
            return queue.offer(new Some(obj));
        }).onFinalizeCase(exitCase2 -> {
            if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase2)) {
                return ApplyOps$.MODULE$.$times$greater$extension(implicits$.MODULE$.catsSyntaxApplyOps(Logger$.MODULE$.apply(MODULE$.logger(async)).debug(() -> {
                    return "Reached the end of the source of events";
                })), MODULE$.closeQueue(queue, async), async);
            }
            if (Resource$ExitCase$Canceled$.MODULE$.equals(exitCase2)) {
                return Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                    return "Source of events was cancelled";
                });
            }
            if (!(exitCase2 instanceof Resource.ExitCase.Errored)) {
                throw new MatchError(exitCase2);
            }
            return ApplyOps$.MODULE$.$times$greater$extension(implicits$.MODULE$.catsSyntaxApplyOps(Logger$.MODULE$.apply(MODULE$.logger(async)).error(((Resource.ExitCase.Errored) exitCase2).e(), () -> {
                return "Error in the source of events";
            })), MODULE$.terminateStream(queue, deferred, async), async);
        }, async), async);
    }

    private <F, A> F closeQueue(Queue<F, Option<A>> queue, Async<F> async) {
        return (F) implicits$.MODULE$.toFunctorOps(GenSpawnOps$.MODULE$.start$extension(cats.effect.implicits$.MODULE$.genSpawnOps(queue.offer(None$.MODULE$), async), async), async).void();
    }

    private <F, A> F terminateStream(Queue<F, Option<A>> queue, Deferred<F, BoxedUnit> deferred, Async<F> async) {
        return (F) implicits$.MODULE$.toFlatMapOps(Logger$.MODULE$.apply(logger(async)).warn(() -> {
            return "Requesting event processor to terminate cleanly...";
        }), async).flatMap(boxedUnit -> {
            return implicits$.MODULE$.toFlatMapOps(MODULE$.closeQueue(queue, async), async).flatMap(boxedUnit -> {
                return deferred.get();
            });
        });
    }

    private CleanCancellation$() {
    }
}
