package com.ocadotechnology.pass4s.connectors.activemq;

import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.QueueOfferResult;
import akka.stream.QueueOfferResult$Dropped$;
import akka.stream.QueueOfferResult$Enqueued$;
import akka.stream.QueueOfferResult$QueueClosed$;
import akka.stream.SourceShape;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.SinkQueueWithCancel;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$ExitCase$Canceled$;
import cats.effect.kernel.Resource$ExitCase$Succeeded$;
import cats.effect.package$;
import cats.implicits$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.FlatMapOps$;
import cats.syntax.OptionIdOps$;
import com.ocadotechnology.pass4s.connectors.activemq.taps;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$OptionStreamOps$;
import scala.Function1;
import scala.MatchError;
import scala.Tuple2;
import scala.Tuple3;
import scala.runtime.BoxedUnit;

/* compiled from: taps.scala */
/* loaded from: input_file:com/ocadotechnology/pass4s/connectors/activemq/taps$.class */
public final class taps$ {
    public static final taps$ MODULE$ = new taps$();

    public <A, M> taps.AkkaSourceDsl<A, M> AkkaSourceDsl(Graph<SourceShape<A>, M> graph) {
        return new taps.AkkaSourceDsl<>(graph);
    }

    public <A, B, M> taps.AkkaFlowDsl<A, B, M> AkkaFlowDsl(Graph<FlowShape<A, B>, M> graph) {
        return new taps.AkkaFlowDsl<>(graph);
    }

    public <F, A, M> Stream<F, A> com$ocadotechnology$pass4s$connectors$activemq$taps$$akkaSourceToFs2Stream(Graph<SourceShape<A>, M> graph, Function1<M, BoxedUnit> function1, Materializer materializer, Async<F> async) {
        return Stream$.MODULE$.force(async.delay(() -> {
            Tuple2 tuple2 = (Tuple2) Source$.MODULE$.fromGraph(graph).toMat(Sink$.MODULE$.queue(), Keep$.MODULE$.both()).run(materializer);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2(tuple2._1(), (SinkQueueWithCancel) tuple2._2());
            Object _1 = tuple22._1();
            SinkQueueWithCancel sinkQueueWithCancel = (SinkQueueWithCancel) tuple22._2();
            function1.apply(_1);
            return MODULE$.subscriberStream(sinkQueueWithCancel, async);
        }));
    }

    public <F, A, B, M> Function1<Stream<F, A>, Stream<F, B>> com$ocadotechnology$pass4s$connectors$activemq$taps$$akkaFlowToFs2Pipe(Graph<FlowShape<A, B>, M> graph, Function1<M, BoxedUnit> function1, Materializer materializer, Async<F> async) {
        return stream -> {
            return Stream$.MODULE$.force(async.delay(() -> {
                Source queue = Source$.MODULE$.queue(0, OverflowStrategy$.MODULE$.backpressure());
                Tuple2 tuple2 = (Tuple2) queue.viaMat(graph, Keep$.MODULE$.both()).toMat(Sink$.MODULE$.queue(), Keep$.MODULE$.both()).run(materializer);
                if (tuple2 != null) {
                    Tuple2 tuple22 = (Tuple2) tuple2._1();
                    SinkQueueWithCancel sinkQueueWithCancel = (SinkQueueWithCancel) tuple2._2();
                    if (tuple22 != null) {
                        Tuple3 tuple3 = new Tuple3((SourceQueueWithComplete) tuple22._1(), tuple22._2(), sinkQueueWithCancel);
                        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple3._1();
                        Object _2 = tuple3._2();
                        SinkQueueWithCancel sinkQueueWithCancel2 = (SinkQueueWithCancel) tuple3._3();
                        function1.apply(_2);
                        return MODULE$.transformerStream(sinkQueueWithCancel2, sourceQueueWithComplete, stream, async);
                    }
                }
                throw new MatchError(tuple2);
            }));
        };
    }

    private <F, A, B> Stream<F, B> transformerStream(SinkQueueWithCancel<B> sinkQueueWithCancel, SourceQueueWithComplete<A> sourceQueueWithComplete, Stream<F, A> stream, Async<F> async) {
        return subscriberStream(sinkQueueWithCancel, async).concurrently(publisherStream(sourceQueueWithComplete, stream, async), async);
    }

    private <F, A> Stream<F, BoxedUnit> publisherStream(SourceQueueWithComplete<A> sourceQueueWithComplete, Stream<F, A> stream, Async<F> async) {
        return Stream$OptionStreamOps$.MODULE$.unNoneTerminate$extension(Stream$.MODULE$.OptionStreamOps(stream.interruptWhen(ApplicativeErrorOps$.MODULE$.attempt$extension(implicits$.MODULE$.catsSyntaxApplicativeError(watchCompletion$1(async, sourceQueueWithComplete), async), async)).evalMap(obj -> {
            return publish$1(obj, async, sourceQueueWithComplete);
        }))).onFinalizeCase(exitCase -> {
            Object fail$1;
            if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase) ? true : Resource$ExitCase$Canceled$.MODULE$.equals(exitCase)) {
                fail$1 = complete$1(async, sourceQueueWithComplete);
            } else {
                if (!(exitCase instanceof Resource.ExitCase.Errored)) {
                    throw new MatchError(exitCase);
                }
                fail$1 = fail$1(((Resource.ExitCase.Errored) exitCase).e(), async, sourceQueueWithComplete);
            }
            return fail$1;
        }, async);
    }

    private <F, A> Stream<F, A> subscriberStream(SinkQueueWithCancel<A> sinkQueueWithCancel, Async<F> async) {
        Object fromFuture = package$.MODULE$.Async().apply(async).fromFuture(async.delay(() -> {
            return sinkQueueWithCancel.pull();
        }));
        return Stream$OptionStreamOps$.MODULE$.unNoneTerminate$extension(Stream$.MODULE$.OptionStreamOps(Stream$.MODULE$.repeatEval(fromFuture))).onFinalize(async.delay(() -> {
            sinkQueueWithCancel.cancel();
        }), async);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object publish$1(Object obj, Async async, SourceQueueWithComplete sourceQueueWithComplete) {
        return ApplicativeErrorOps$.MODULE$.recover$extension(implicits$.MODULE$.catsSyntaxApplicativeError(implicits$.MODULE$.toFlatMapOps(package$.MODULE$.Async().apply(async).fromFuture(async.delay(() -> {
            return sourceQueueWithComplete.offer(obj);
        })), async).flatMap(queueOfferResult -> {
            Object raiseError;
            if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                raiseError = ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(OptionIdOps$.MODULE$.some$extension(implicits$.MODULE$.catsSyntaxOptionId(BoxedUnit.UNIT))), async);
            } else if (queueOfferResult instanceof QueueOfferResult.Failure) {
                raiseError = async.raiseError(((QueueOfferResult.Failure) queueOfferResult).cause());
            } else if (QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                raiseError = ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(implicits$.MODULE$.none()), async);
            } else {
                if (!QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                    throw new MatchError(queueOfferResult);
                }
                raiseError = async.raiseError(new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure"));
            }
            return raiseError;
        }), async), new taps$$anonfun$publish$1$1(), async);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object watchCompletion$1(Async async, SourceQueueWithComplete sourceQueueWithComplete) {
        return implicits$.MODULE$.toFunctorOps(package$.MODULE$.Async().apply(async).fromFuture(async.delay(() -> {
            return sourceQueueWithComplete.watchCompletion();
        })), async).void();
    }

    private static final Object fail$1(Throwable th, Async async, SourceQueueWithComplete sourceQueueWithComplete) {
        return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(async.delay(() -> {
            sourceQueueWithComplete.fail(th);
        }), async), () -> {
            return watchCompletion$1(async, sourceQueueWithComplete);
        }, async);
    }

    private static final Object complete$1(Async async, SourceQueueWithComplete sourceQueueWithComplete) {
        return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(async.delay(() -> {
            sourceQueueWithComplete.complete();
        }), async), () -> {
            return watchCompletion$1(async, sourceQueueWithComplete);
        }, async);
    }

    private taps$() {
    }
}
