package com.ocadotechnology.pass4s.connectors.activemq;

import akka.actor.ActorSystem;
import akka.stream.Materializer$;
import akka.stream.alpakka.jms.Destination;
import akka.stream.alpakka.jms.JmsEnvelope;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings$;
import akka.stream.alpakka.jms.JmsTextMessage$;
import akka.stream.alpakka.jms.scaladsl.JmsProducer$;
import cats.ApplicativeError;
import cats.effect.implicits$;
import cats.effect.kernel.Async;
import cats.effect.kernel.Deferred;
import cats.effect.kernel.Deferred$;
import cats.effect.kernel.DeferredSink;
import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Ref$;
import cats.effect.kernel.Ref$Make$;
import cats.effect.kernel.Resource;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.package$;
import cats.effect.std.Queue$;
import cats.effect.std.Semaphore;
import cats.effect.std.Semaphore$;
import cats.package$ApplicativeThrow$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.MonadErrorRethrowOps$;
import com.ocadotechnology.pass4s.connectors.activemq.taps;
import com.ocadotechnology.pass4s.core.Message;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import javax.jms.ConnectionFactory;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;
import scala.util.Either;

/* compiled from: producer.scala */
/* loaded from: input_file:com/ocadotechnology/pass4s/connectors/activemq/producer$.class */
public final class producer$ {
    public static final producer$ MODULE$ = new producer$();

    public <F> Resource<F, Function1<Message<?>, F>> createMessageProducer(ConnectionFactory connectionFactory, int i, Async<F> async, ActorSystem actorSystem) {
        return package$.MODULE$.Resource().eval(Queue$.MODULE$.bounded(i, async)).flatMap(queue -> {
            return GenSpawnOps$.MODULE$.background$extension(implicits$.MODULE$.genSpawnOps(Stream$.MODULE$.fromQueueUnterminated(queue, 1, async).through(MODULE$.sendMessageAndCompletePromise(connectionFactory, async, actorSystem)).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(async))).drain(), async), async).map(obj -> {
                return MODULE$.enqueueAndWaitForPromise(jmsEnvelope -> {
                    return queue.offer(jmsEnvelope);
                }, async);
            });
        });
    }

    public <F> int createMessageProducer$default$2() {
        return 100;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F> Function1<Message<?>, F> enqueueAndWaitForPromise(Function1<JmsEnvelope<Deferred<F, Either<Throwable, BoxedUnit>>>, F> function1, GenConcurrent<F, Throwable> genConcurrent) {
        return message -> {
            return cats.implicits$.MODULE$.toFlatMapOps(MODULE$.extractJmsDestination(message.destination(), genConcurrent), genConcurrent).flatMap(jmsDestination -> {
                return cats.implicits$.MODULE$.toFlatMapOps(Deferred$.MODULE$.apply(genConcurrent), genConcurrent).flatMap(deferred -> {
                    return cats.implicits$.MODULE$.toFlatMapOps(function1.apply(JmsTextMessage$.MODULE$.apply(message.payload().text(), deferred).withProperties(message.payload().metadata()).to((Destination) common$.MODULE$.toAlpakkaDestination().apply(jmsDestination.name(), jmsDestination.destinationType()))), genConcurrent).flatMap(boxedUnit -> {
                        return MonadErrorRethrowOps$.MODULE$.rethrow$extension(cats.implicits$.MODULE$.catsSyntaxMonadErrorRethrow(deferred.get(), genConcurrent), genConcurrent);
                    });
                });
            });
        };
    }

    private <F> Function1<Stream<F, JmsEnvelope<Deferred<F, Either<Throwable, BoxedUnit>>>>, Stream<F, BoxedUnit>> sendMessageAndCompletePromise(ConnectionFactory connectionFactory, Async<F> async, ActorSystem actorSystem) {
        return stream -> {
            return Stream$.MODULE$.eval(cats.implicits$.MODULE$.catsSyntaxTuple2Semigroupal(new Tuple2(Ref$.MODULE$.of(Predef$.MODULE$.Set().empty(), Ref$Make$.MODULE$.concurrentInstance(async)), Semaphore$.MODULE$.apply(1L, async))).tupled(async, async)).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Ref ref = (Ref) tuple2._1();
                Semaphore semaphore = (Semaphore) tuple2._2();
                JmsProducerSettings withTopic = JmsProducerSettings$.MODULE$.apply(actorSystem, connectionFactory).withTopic("Pass4s.Default");
                taps.AkkaFlowDsl AkkaFlowDsl = taps$.MODULE$.AkkaFlowDsl(JmsProducer$.MODULE$.flexiFlow(withTopic).named(MODULE$.getClass().getSimpleName()));
                return stream.evalTap(jmsEnvelope -> {
                    return addMessageToRef$1(jmsEnvelope, semaphore, ref, async);
                }, async).through(AkkaFlowDsl.toPipe(AkkaFlowDsl.toPipe$default$1(), async, Materializer$.MODULE$.matFromSystem(actorSystem))).attempts(Stream$.MODULE$.constant(withTopic.connectionRetrySettings().initialRetry(), Stream$.MODULE$.constant$default$2()), async).evalMap(either -> {
                    return either.fold(th -> {
                        return failAllAndCleanRef$1(th, semaphore, ref, async);
                    }, jmsEnvelope2 -> {
                        return completeMessageAndRemoveFromRef$1(jmsEnvelope2, semaphore, async, ref);
                    });
                });
            }, NotGiven$.MODULE$.default());
        };
    }

    private <F> F extractJmsDestination(com.ocadotechnology.pass4s.core.Destination<?> destination, ApplicativeError<F, Throwable> applicativeError) {
        Object raiseError;
        if (destination instanceof JmsDestination) {
            raiseError = ApplicativeIdOps$.MODULE$.pure$extension(cats.implicits$.MODULE$.catsSyntaxApplicativeId((JmsDestination) destination), applicativeError);
        } else {
            raiseError = package$ApplicativeThrow$.MODULE$.apply(applicativeError).raiseError(new UnsupportedOperationException(new StringBuilder(43).append("JmsConnector does not support destination: ").append(destination).toString()));
        }
        return (F) raiseError;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object addMessageToRef$1(JmsEnvelope jmsEnvelope, Semaphore semaphore, Ref ref, Async async) {
        return semaphore.permit().surround(ref.update(set -> {
            return set.$plus(jmsEnvelope);
        }), async);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object completeMessageAndRemoveFromRef$1(JmsEnvelope jmsEnvelope, Semaphore semaphore, Async async, Ref ref) {
        return semaphore.permit().surround(cats.implicits$.MODULE$.catsSyntaxApply(ApplicativeErrorOps$.MODULE$.attempt$extension(cats.implicits$.MODULE$.catsSyntaxApplicativeError(((DeferredSink) jmsEnvelope.passThrough()).complete(scala.package$.MODULE$.Right().apply(BoxedUnit.UNIT)), async), async), async).$times$greater(ref.update(set -> {
            return set.$minus(jmsEnvelope);
        })), async);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object failAllAndCleanRef$1(Throwable th, Semaphore semaphore, Ref ref, Async async) {
        return semaphore.permit().surround(cats.implicits$.MODULE$.catsSyntaxApply(cats.implicits$.MODULE$.toFlatMapOps(ref.get(), async).flatMap(set -> {
            return cats.implicits$.MODULE$.toTraverseOps(set.toList(), cats.implicits$.MODULE$.catsStdInstancesForList()).traverse(jmsEnvelope -> {
                return ApplicativeErrorOps$.MODULE$.attempt$extension(cats.implicits$.MODULE$.catsSyntaxApplicativeError(((DeferredSink) jmsEnvelope.passThrough()).complete(scala.package$.MODULE$.Left().apply(th)), async), async);
            }, async);
        }), async).$times$greater(ref.set(Predef$.MODULE$.Set().apply(Nil$.MODULE$))), async);
    }

    private producer$() {
    }
}
