package blended.streams.transaction;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.FanInShape2;
import akka.stream.FanOutShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.KillSwitch;
import akka.stream.UniformFanInShape;
import akka.stream.UniformFanOutShape;
import akka.stream.scaladsl.Broadcast$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.Merge$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Zip$;
import blended.jms.utils.IdAwareConnectionFactory;
import blended.jms.utils.JmsDestination;
import blended.jms.utils.JmsTopic;
import blended.streams.FlowHeaderConfig;
import blended.streams.FlowProcessor$;
import blended.streams.jms.JmsConsumerSettings;
import blended.streams.jms.JmsProducerSettings;
import blended.streams.jms.JmsProducerSettings$;
import blended.streams.jms.JmsStreamSupport;
import blended.streams.message.FlowEnvelope;
import blended.streams.message.FlowEnvelopeLogger;
import blended.streams.message.FlowEnvelopeLogger$;
import blended.streams.processor.Collector;
import blended.util.logging.LogLevel$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: FlowTransactionStream.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005%b\u0001\u0002\n\u0014\u0001iA\u0001b\n\u0001\u0003\u0002\u0003\u0006I\u0001\u000b\u0005\te\u0001\u0011\t\u0011)A\u0005g!Aq\u0007\u0001B\u0001B\u0003%\u0001\b\u0003\u0005=\u0001\t\u0005\t\u0015!\u0003>\u0011!\u0019\u0005A!A!\u0002\u0017!\u0005\"\u0002'\u0001\t\u0003i\u0005bB+\u0001\u0005\u0004%IA\u0016\u0005\u0007M\u0002\u0001\u000b\u0011B,\t\u000f\u001d\u0004!\u0019!C\u0005Q\"1a\u000f\u0001Q\u0001\n%Dqa\u001e\u0001C\u0002\u0013%\u0001\u0010C\u0004\u0002\n\u0001\u0001\u000b\u0011B=\t\u0013\u0005-\u0001A1A\u0005\n\u00055\u0001\u0002CA\u000b\u0001\u0001\u0006I!a\u0004\t\u0013\u0005]\u0001A1A\u0005\n\u0005e\u0001\u0002CA\u0012\u0001\u0001\u0006I!a\u0007\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(\t)b\t\\8x)J\fgn]1di&|gn\u0015;sK\u0006l'B\u0001\u000b\u0016\u0003-!(/\u00198tC\u000e$\u0018n\u001c8\u000b\u0005Y9\u0012aB:ue\u0016\fWn\u001d\u0006\u00021\u00059!\r\\3oI\u0016$7\u0001A\n\u0004\u0001m\t\u0003C\u0001\u000f \u001b\u0005i\"\"\u0001\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0001j\"AB!osJ+g\r\u0005\u0002#K5\t1E\u0003\u0002%+\u0005\u0019!.\\:\n\u0005\u0019\u001a#\u0001\u0005&ngN#(/Z1n'V\u0004\bo\u001c:u\u0003)Ig\u000e^3s]\u0006d7I\u001a\t\u00049%Z\u0013B\u0001\u0016\u001e\u0005\u0019y\u0005\u000f^5p]B\u0011A\u0006M\u0007\u0002[)\u0011afL\u0001\u0006kRLGn\u001d\u0006\u0003I]I!!M\u0017\u00031%#\u0017i^1sK\u000e{gN\\3di&|gNR1di>\u0014\u00180A\u0005iK\u0006$WM]\"gOB\u0011A'N\u0007\u0002+%\u0011a'\u0006\u0002\u0011\r2|w\u000fS3bI\u0016\u00148i\u001c8gS\u001e\fA\u0001^'heB\u0011\u0011HO\u0007\u0002'%\u00111h\u0005\u0002\u0017\r2|w\u000f\u0016:b]N\f7\r^5p]6\u000bg.Y4fe\u0006a1\u000f\u001e:fC6dunZ4feB\u0011a(Q\u0007\u0002\u007f)\u0011\u0001)F\u0001\b[\u0016\u001c8/Y4f\u0013\t\u0011uH\u0001\nGY><XI\u001c<fY>\u0004X\rT8hO\u0016\u0014\u0018AB:zgR,W\u000e\u0005\u0002F\u00156\taI\u0003\u0002H\u0011\u0006)\u0011m\u0019;pe*\t\u0011*\u0001\u0003bW.\f\u0017BA&G\u0005-\t5\r^8s'f\u001cH/Z7\u0002\rqJg.\u001b;?)\u0015q\u0015KU*U)\ty\u0005\u000b\u0005\u0002:\u0001!)1I\u0002a\u0002\t\")qE\u0002a\u0001Q!)!G\u0002a\u0001g!)qG\u0002a\u0001q!)AH\u0002a\u0001{\u0005YQ\u000f\u001d3bi\u0016,e/\u001a8u+\u00059\u0006\u0003\u0002\u000fY5vK!!W\u000f\u0003\u0013\u0019+hn\u0019;j_:\f\u0004C\u0001 \\\u0013\tavH\u0001\u0007GY><XI\u001c<fY>\u0004X\rE\u0002_C\u000el\u0011a\u0018\u0006\u0003Av\tA!\u001e;jY&\u0011!m\u0018\u0002\u0004)JL\bCA\u001de\u0013\t)7C\u0001\u000bGY><HK]1og\u0006\u001cG/[8o\u000bZ,g\u000e^\u0001\rkB$\u0017\r^3Fm\u0016tG\u000fI\u0001\u000eY><WI^3oiR{'*\\:\u0016\u0003%\u0004B\u0001\b-,UB)1\u000e\u001d.[e6\tAN\u0003\u0002n]\u0006A1oY1mC\u0012\u001cHN\u0003\u0002p\u0011\u000611\u000f\u001e:fC6L!!\u001d7\u0003\t\u0019cwn\u001e\t\u0003gRl\u0011\u0001S\u0005\u0003k\"\u0013qAT8u+N,G-\u0001\bm_\u001e,e/\u001a8u)>TUn\u001d\u0011\u0002#I,7m\u001c:e)J\fgn]1di&|g.F\u0001z!\u0011Q80 :\u000e\u00039L!\u0001 8\u0003\u000b\u001d\u0013\u0018\r\u001d5\u0011\u000bitX,!\u0001\n\u0005}t'!\u0003$m_^\u001c\u0006.\u00199f!\u0011q\u0016-a\u0001\u0011\u0007e\n)!C\u0002\u0002\bM\u0011qB\u00127poR\u0013\u0018M\\:bGRLwN\\\u0001\u0013e\u0016\u001cwN\u001d3Ue\u0006t7/Y2uS>t\u0007%\u0001\bm_\u001e$&/\u00198tC\u000e$\u0018n\u001c8\u0016\u0005\u0005=\u0001#\u0002>|\u0003#\u0011\bC\u0002>\u007f\u0003\u0003\t\u0019\u0002E\u0002_Cj\u000bq\u0002\\8h)J\fgn]1di&|g\u000eI\u0001\rGJ,\u0017\r^3SKN,H\u000e^\u000b\u0003\u00037\u0001R\u0001\b-\u0002\u001ei\u0003b\u0001HA\u00105\u0006M\u0011bAA\u0011;\t1A+\u001e9mKJ\nQb\u0019:fCR,'+Z:vYR\u0004\u0013!\u00022vS2$G#\u00016")
/* loaded from: input_file:blended/streams/transaction/FlowTransactionStream.class */
public class FlowTransactionStream implements JmsStreamSupport {
    private final Option<IdAwareConnectionFactory> internalCf;
    private final FlowHeaderConfig headerCfg;
    private final FlowTransactionManager tMgr;
    private final FlowEnvelopeLogger streamLogger;
    private final ActorSystem system;
    private final Function1<FlowEnvelope, Try<FlowTransactionEvent>> updateEvent;
    private final Function1<IdAwareConnectionFactory, Flow<FlowEnvelope, FlowEnvelope, NotUsed>> logEventToJms;
    private final Graph<FlowShape<Try<FlowTransactionEvent>, Try<FlowTransaction>>, NotUsed> recordTransaction;
    private final Graph<FlowShape<Try<FlowTransaction>, Try<FlowEnvelope>>, NotUsed> logTransaction;
    private final Function1<Tuple2<FlowEnvelope, Try<FlowEnvelope>>, FlowEnvelope> createResult;

    @Override // blended.streams.jms.JmsStreamSupport
    public Try<KillSwitch> processMessages(Flow<FlowEnvelope, FlowEnvelope, ?> flow, FiniteDuration finiteDuration, Seq<FlowEnvelope> seq, ActorSystem actorSystem) {
        Try<KillSwitch> processMessages;
        processMessages = processMessages(flow, finiteDuration, seq, actorSystem);
        return processMessages;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Try<KillSwitch> sendMessages(JmsProducerSettings jmsProducerSettings, FlowEnvelopeLogger flowEnvelopeLogger, FiniteDuration finiteDuration, Seq<FlowEnvelope> seq, ActorSystem actorSystem) {
        Try<KillSwitch> sendMessages;
        sendMessages = sendMessages(jmsProducerSettings, flowEnvelopeLogger, finiteDuration, seq, actorSystem);
        return sendMessages;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Collector<FlowEnvelope> receiveMessages(FlowHeaderConfig flowHeaderConfig, IdAwareConnectionFactory idAwareConnectionFactory, JmsDestination jmsDestination, FlowEnvelopeLogger flowEnvelopeLogger, Integer num, Option<FiniteDuration> option, Option<String> option2, Option<Function1<Seq<FlowEnvelope>, Object>> option3, Option<FiniteDuration> option4, FiniteDuration finiteDuration, ActorSystem actorSystem) {
        Collector<FlowEnvelope> receiveMessages;
        receiveMessages = receiveMessages(flowHeaderConfig, idAwareConnectionFactory, jmsDestination, flowEnvelopeLogger, num, option, option2, option3, option4, finiteDuration, actorSystem);
        return receiveMessages;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Integer receiveMessages$default$5() {
        Integer receiveMessages$default$5;
        receiveMessages$default$5 = receiveMessages$default$5();
        return receiveMessages$default$5;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Option<FiniteDuration> receiveMessages$default$6() {
        Option<FiniteDuration> receiveMessages$default$6;
        receiveMessages$default$6 = receiveMessages$default$6();
        return receiveMessages$default$6;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Option<String> receiveMessages$default$7() {
        Option<String> receiveMessages$default$7;
        receiveMessages$default$7 = receiveMessages$default$7();
        return receiveMessages$default$7;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Option<Function1<Seq<FlowEnvelope>, Object>> receiveMessages$default$8() {
        Option<Function1<Seq<FlowEnvelope>, Object>> receiveMessages$default$8;
        receiveMessages$default$8 = receiveMessages$default$8();
        return receiveMessages$default$8;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Flow<FlowEnvelope, FlowEnvelope, NotUsed> jmsProducer(String str, JmsProducerSettings jmsProducerSettings, boolean z, ActorSystem actorSystem) {
        Flow<FlowEnvelope, FlowEnvelope, NotUsed> jmsProducer;
        jmsProducer = jmsProducer(str, jmsProducerSettings, z, actorSystem);
        return jmsProducer;
    }

    @Override // blended.streams.jms.JmsStreamSupport
    public Source<FlowEnvelope, NotUsed> jmsConsumer(String str, JmsConsumerSettings jmsConsumerSettings, Option<FiniteDuration> option, ActorSystem actorSystem) {
        Source<FlowEnvelope, NotUsed> jmsConsumer;
        jmsConsumer = jmsConsumer(str, jmsConsumerSettings, option, actorSystem);
        return jmsConsumer;
    }

    private Function1<FlowEnvelope, Try<FlowTransactionEvent>> updateEvent() {
        return this.updateEvent;
    }

    private Function1<IdAwareConnectionFactory, Flow<FlowEnvelope, FlowEnvelope, NotUsed>> logEventToJms() {
        return this.logEventToJms;
    }

    private Graph<FlowShape<Try<FlowTransactionEvent>, Try<FlowTransaction>>, NotUsed> recordTransaction() {
        return this.recordTransaction;
    }

    private Graph<FlowShape<Try<FlowTransaction>, Try<FlowEnvelope>>, NotUsed> logTransaction() {
        return this.logTransaction;
    }

    private Function1<Tuple2<FlowEnvelope, Try<FlowEnvelope>>, FlowEnvelope> createResult() {
        return this.createResult;
    }

    public Flow<FlowEnvelope, FlowEnvelope, NotUsed> build() {
        return Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(builder -> {
            GraphDSL.Implicits.PortOps $tilde$greater;
            FlowShape add = builder.add(Flow$.MODULE$.fromFunction(this.updateEvent()).named("updateEvent"));
            FlowShape add2 = builder.add(Flow$.MODULE$.fromGraph(this.recordTransaction()).named("recordTransaction"));
            FlowShape add3 = builder.add(Flow$.MODULE$.fromGraph(this.logTransaction()).named("logTransaction"));
            UniformFanOutShape add4 = builder.add(Broadcast$.MODULE$.apply(2, Broadcast$.MODULE$.apply$default$2()));
            FanInShape2 add5 = builder.add(Zip$.MODULE$.apply());
            GraphDSL$Implicits$.MODULE$.port2flow(add4.out(0), builder).$tilde$greater(add5.in0(), builder);
            Some some = this.internalCf;
            if (None$.MODULE$.equals(some)) {
                $tilde$greater = GraphDSL$Implicits$.MODULE$.port2flow(add4.out(1), builder).$tilde$greater(add, builder).$tilde$greater(add2, builder).$tilde$greater(add3, builder);
            } else {
                if (!(some instanceof Some)) {
                    throw new MatchError(some);
                }
                $tilde$greater = GraphDSL$Implicits$.MODULE$.port2flow(add4.out(1), builder).$tilde$greater(builder.add(((Flow) this.logEventToJms().apply((IdAwareConnectionFactory) some.value())).named("logToJms")), builder).$tilde$greater(add, builder).$tilde$greater(add2, builder).$tilde$greater(add3, builder);
            }
            GraphDSL$Implicits$.MODULE$.port2flow(add3.out(), builder).$tilde$greater(add5.in1(), builder);
            FlowShape add6 = builder.add(Flow$.MODULE$.fromFunction(this.createResult()).named("result"));
            GraphDSL$Implicits$.MODULE$.port2flow(add5.out(), builder).$tilde$greater(add6.in(), builder);
            return new FlowShape(add4.in(), add6.out());
        }));
    }

    public FlowTransactionStream(Option<IdAwareConnectionFactory> option, FlowHeaderConfig flowHeaderConfig, FlowTransactionManager flowTransactionManager, FlowEnvelopeLogger flowEnvelopeLogger, ActorSystem actorSystem) {
        this.internalCf = option;
        this.headerCfg = flowHeaderConfig;
        this.tMgr = flowTransactionManager;
        this.streamLogger = flowEnvelopeLogger;
        this.system = actorSystem;
        JmsStreamSupport.$init$(this);
        this.updateEvent = flowEnvelope -> {
            return Try$.MODULE$.apply(() -> {
                FlowTransactionEvent flowTransactionEvent = (FlowTransactionEvent) ((Try) ((Function1) FlowTransactionEvent$.MODULE$.envelope2event().apply(this.headerCfg)).apply(flowEnvelope)).get();
                this.streamLogger.logEnv(flowEnvelope, LogLevel$.MODULE$.Trace(), () -> {
                    return new StringBuilder(31).append("Received transaction event [").append(flowTransactionEvent.transactionId()).append("][").append(flowTransactionEvent.state()).append("]").toString();
                }, this.streamLogger.logEnv$default$4());
                return flowTransactionEvent;
            });
        };
        this.logEventToJms = idAwareConnectionFactory -> {
            return this.jmsProducer("logToJms", new JmsProducerSettings(this.streamLogger, flowEnvelope2 -> {
                return LogLevel$.MODULE$.Debug();
            }, this.headerCfg, JmsProducerSettings$.MODULE$.apply$default$4(), idAwareConnectionFactory, JmsProducerSettings$.MODULE$.apply$default$6(), new Some(new JmsTopic(new StringBuilder(19).append(this.headerCfg.prefix()).append(".topic.transactions").toString())), JmsProducerSettings$.MODULE$.apply$default$8(), JmsProducerSettings$.MODULE$.apply$default$9(), JmsProducerSettings$.MODULE$.apply$default$10(), JmsProducerSettings$.MODULE$.apply$default$11(), JmsProducerSettings$.MODULE$.apply$default$12(), JmsProducerSettings$.MODULE$.apply$default$13(), true), false, this.system);
        };
        this.recordTransaction = GraphDSL$.MODULE$.create(builder -> {
            FlowShape add = builder.add(Flow$.MODULE$.fromFunction(flowTransactionEvent -> {
                return this.tMgr.updateTransaction(flowTransactionEvent);
            }));
            FanOutShape2 add2 = builder.add(FlowProcessor$.MODULE$.partition(r2 -> {
                return BoxesRunTime.boxToBoolean(r2.isSuccess());
            }));
            GraphDSL$Implicits$.MODULE$.port2flow(add2.out0(), builder).$tilde$greater(builder.add(Flow$.MODULE$.fromFunction(r22 -> {
                return (FlowTransactionEvent) r22.get();
            })), builder).$tilde$greater(add, builder);
            FlowShape add3 = builder.add(Flow$.MODULE$.fromFunction(r4 -> {
                return new Failure((Throwable) r4.failed().get());
            }));
            GraphDSL$Implicits$.MODULE$.port2flow(add2.out1(), builder).$tilde$greater(add3, builder);
            UniformFanInShape add4 = builder.add(Merge$.MODULE$.apply(2, Merge$.MODULE$.apply$default$2()));
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(), builder).$tilde$greater(add4.in(0), builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add3.out(), builder).$tilde$greater(add4.in(1), builder);
            return new FlowShape(add2.in(), add4.out());
        });
        Function1 function1 = flowTransaction -> {
            return FlowEnvelopeLogger$.MODULE$.mdcMap((String) FlowEnvelopeLogger$.MODULE$.mdcPrefix().apply(this.headerCfg), flowTransaction.creationProps());
        };
        this.logTransaction = GraphDSL$.MODULE$.create(builder2 -> {
            FlowShape add = builder2.add(Flow$.MODULE$.fromFunction(r11 -> {
                Success failure;
                if (r11 instanceof Success) {
                    FlowTransaction flowTransaction2 = (FlowTransaction) ((Success) r11).value();
                    FlowTransactionState state = flowTransaction2.state();
                    FlowTransactionStateStarted$ flowTransactionStateStarted$ = FlowTransactionStateStarted$.MODULE$;
                    if (state != null ? !state.equals(flowTransactionStateStarted$) : flowTransactionStateStarted$ != null) {
                        if (flowTransaction2.first()) {
                            FlowTransaction copy = flowTransaction2.copy(flowTransaction2.copy$default$1(), flowTransaction2.copy$default$2(), flowTransaction2.copy$default$3(), flowTransaction2.copy$default$4(), flowTransaction2.copy$default$5(), Predef$.MODULE$.Map().empty(), FlowTransactionStateStarted$.MODULE$);
                            this.streamLogger.underlying().infoMdc((Map) function1.apply(copy), () -> {
                                return copy.toString();
                            });
                        }
                    }
                    FlowTransactionState state2 = flowTransaction2.state();
                    if (FlowTransactionStateStarted$.MODULE$.equals(state2) ? true : FlowTransactionStateCompleted$.MODULE$.equals(state2)) {
                        this.streamLogger.underlying().infoMdc((Map) function1.apply(flowTransaction2), () -> {
                            return flowTransaction2.toString();
                        });
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else if (FlowTransactionStateFailed$.MODULE$.equals(state2)) {
                        this.streamLogger.underlying().warnMdc((Map) function1.apply(flowTransaction2), () -> {
                            return flowTransaction2.toString();
                        });
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        this.streamLogger.underlying().debugMdc((Map) function1.apply(flowTransaction2), () -> {
                            return flowTransaction2.toString();
                        });
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    }
                    failure = new Success(((Function1) FlowTransaction$.MODULE$.transaction2envelope().apply(this.headerCfg)).apply(flowTransaction2));
                } else {
                    if (!(r11 instanceof Failure)) {
                        throw new MatchError(r11);
                    }
                    failure = new Failure(((Failure) r11).exception());
                }
                return failure;
            }));
            return new FlowShape(add.in(), add.out());
        });
        this.createResult = tuple2 -> {
            FlowEnvelope withException;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            FlowEnvelope flowEnvelope2 = (FlowEnvelope) tuple2._1();
            Success success = (Try) tuple2._2();
            if (success instanceof Success) {
                FlowEnvelope flowEnvelope3 = (FlowEnvelope) success.value();
                this.streamLogger.logEnv(flowEnvelope3, LogLevel$.MODULE$.Trace(), () -> {
                    return new StringBuilder(43).append("Successfully processed transaction event [").append(flowEnvelope3.id()).append("]").toString();
                }, this.streamLogger.logEnv$default$4());
                withException = flowEnvelope3.withAckHandler(flowEnvelope2.getAckHandler()).withRequiresAcknowledge(flowEnvelope2.requiresAcknowledge()).clearException();
            } else {
                if (!(success instanceof Failure)) {
                    throw new MatchError(success);
                }
                Throwable exception = ((Failure) success).exception();
                this.streamLogger.logEnv(flowEnvelope2, LogLevel$.MODULE$.Trace(), () -> {
                    return new StringBuilder(38).append("Failed to process transaction event [").append(flowEnvelope2.id()).append("]").toString();
                }, this.streamLogger.logEnv$default$4());
                withException = flowEnvelope2.withException(exception);
            }
            return withException;
        };
    }
}
