package com.ocadotechnology.pass4s.connectors.activemq;

import akka.actor.ActorSystem;
import akka.stream.Materializer$;
import akka.stream.alpakka.jms.Destination;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsConsumerSettings$;
import akka.stream.alpakka.jms.TxEnvelope;
import akka.stream.alpakka.jms.scaladsl.JmsConsumer$;
import akka.stream.scaladsl.RestartSource$;
import cats.ApplicativeError;
import cats.effect.kernel.Async;
import cats.effect.kernel.Sync;
import cats.implicits$;
import cats.package$ApplicativeThrow$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.OptionIdOps$;
import com.ocadotechnology.pass4s.connectors.activemq.Jms;
import com.ocadotechnology.pass4s.connectors.activemq.JmsSource;
import com.ocadotechnology.pass4s.connectors.activemq.taps;
import com.ocadotechnology.pass4s.core.CommittableMessage;
import com.ocadotechnology.pass4s.core.CommittableMessage$;
import com.ocadotechnology.pass4s.core.Message;
import com.ocadotechnology.pass4s.core.Source;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$OptionStreamOps$;
import fs2.compat.NotGiven$;
import javax.jms.ConnectionFactory;
import javax.jms.TextMessage;
import org.typelevel.log4cats.Logger;
import org.typelevel.log4cats.Logger$;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.util.Try$;

/* compiled from: consumer.scala */
/* loaded from: input_file:com/ocadotechnology/pass4s/connectors/activemq/consumer$.class */
public final class consumer$ {
    public static final consumer$ MODULE$ = new consumer$();

    public <F> Stream<F, CommittableMessage<F>> consumeAndReconnectOnErrors(ConnectionFactory connectionFactory, Source<?> source, Async<F> async, Logger<F> logger, ActorSystem actorSystem) {
        return Stream$.MODULE$.eval(extractJmsSource(source, async)).map(jmsSource -> {
            if (jmsSource == null) {
                throw new MatchError(jmsSource);
            }
            String name = jmsSource.name();
            Jms.Type sourceType = jmsSource.sourceType();
            JmsSource.JmsSourceSettings jmsSourceSettings = jmsSource.settings();
            return new Tuple2(jmsSource, JmsConsumerSettings$.MODULE$.apply(actorSystem, connectionFactory).withAckTimeout(jmsSourceSettings.messageProcessingTimeout().$plus(new package.DurationInt(package$.MODULE$.DurationInt(1)).second()).$times(1.2d)).withSessionCount(jmsSourceSettings.parallelSessions()).withFailStreamOnAckTimeout(true).withDestination((Destination) common$.MODULE$.toAlpakkaDestination().apply(name, sourceType)));
        }).flatMap(tuple2 -> {
            if (tuple2 != null) {
                JmsSource jmsSource2 = (JmsSource) tuple2._1();
                JmsConsumerSettings jmsConsumerSettings = (JmsConsumerSettings) tuple2._2();
                if (jmsSource2 != null) {
                    taps.AkkaSourceDsl AkkaSourceDsl = taps$.MODULE$.AkkaSourceDsl(RestartSource$.MODULE$.withBackoff(jmsSource2.settings().restartSettings().toAkka(), () -> {
                        return JmsConsumer$.MODULE$.txSource(jmsConsumerSettings).named(MODULE$.getClass().getSimpleName());
                    }));
                    return AkkaSourceDsl.toStream(AkkaSourceDsl.toStream$default$1(), async, Materializer$.MODULE$.matFromSystem(actorSystem)).flatMap(txEnvelope -> {
                        return Stream$OptionStreamOps$.MODULE$.unNone$extension(Stream$.MODULE$.OptionStreamOps(Stream$.MODULE$.eval(MODULE$.toCommittableMessage(txEnvelope, async, logger))));
                    }, NotGiven$.MODULE$.default());
                }
            }
            throw new MatchError(tuple2);
        }, NotGiven$.MODULE$.default());
    }

    private <F> F extractJmsSource(Source<?> source, ApplicativeError<F, Throwable> applicativeError) {
        Object raiseError;
        if (source instanceof JmsSource) {
            raiseError = ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId((JmsSource) source), applicativeError);
        } else {
            raiseError = package$ApplicativeThrow$.MODULE$.apply(applicativeError).raiseError(new UnsupportedOperationException(new StringBuilder(43).append("JmsConnector does not support destination: ").append(source).toString()));
        }
        return (F) raiseError;
    }

    private <F> F toCommittableMessage(TxEnvelope txEnvelope, Sync<F> sync, Logger<F> logger) {
        Object $times$greater;
        Object delay = cats.effect.package$.MODULE$.Sync().apply(sync).delay(() -> {
            txEnvelope.commit();
        });
        Object delay2 = cats.effect.package$.MODULE$.Sync().apply(sync).delay(() -> {
            txEnvelope.rollback();
        });
        TextMessage message = txEnvelope.message();
        if (message instanceof TextMessage) {
            TextMessage textMessage = message;
            $times$greater = ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(OptionIdOps$.MODULE$.some$extension(implicits$.MODULE$.catsSyntaxOptionId(CommittableMessage$.MODULE$.instance(new Message.Payload(textMessage.getText(), getHeaders(textMessage)), delay, rollbackCause -> {
                return delay2;
            }, sync)))), sync);
        } else {
            $times$greater = implicits$.MODULE$.catsSyntaxApply(Logger$.MODULE$.apply(logger).warn(() -> {
                return new StringBuilder(68).append("JmsConnector supports only TextMessages. Ignoring received message: ").append(message).toString();
            }), sync).$times$greater(implicits$.MODULE$.toFunctorOps(delay2, sync).as(None$.MODULE$));
        }
        return (F) $times$greater;
    }

    private Map<String, String> getHeaders(javax.jms.Message message) {
        return (Map) Try$.MODULE$.apply(() -> {
            return CollectionConverters$.MODULE$.IteratorHasAsScala(message.getPropertyNames().asIterator()).asScala().map(str -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), message.getStringProperty(str));
            }).toMap($less$colon$less$.MODULE$.refl());
        }).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        });
    }

    private consumer$() {
    }
}
