package blended.streams.jms;

import akka.stream.Attributes;
import blended.jms.utils.JmsAckSession;
import blended.jms.utils.JmsAckSession$;
import blended.jms.utils.JmsAckState$;
import blended.jms.utils.JmsDestination;
import blended.streams.jms.JmsAckSourceStage;
import blended.streams.message.AcknowledgeHandler;
import blended.streams.message.FlowEnvelope;
import blended.streams.message.FlowEnvelope$;
import blended.streams.message.FlowMessage;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ManifestFactory$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.NonFatal$;

/* compiled from: JmsAckSourceStage.scala */
/* loaded from: input_file:blended/streams/jms/JmsAckSourceStage$$anon$1.class */
public final class JmsAckSourceStage$$anon$1 extends SourceStageLogic<JmsAckSession> {
    private Map<String, FlowEnvelope> inflight;
    private Map<String, MessageConsumer> consumer;
    public Option<Object> blended$streams$jms$JmsAckSourceStage$$anon$$nextPoll;
    private final Function1<FlowEnvelope, Option<JmsAcknowledgeHandler>> ackHandler;
    private final Function1<FlowEnvelope, Function1<JmsAcknowledgeHandler, Try<BoxedUnit>>> acknowledge;
    private final JmsDestination dest;
    private final /* synthetic */ JmsAckSourceStage $outer;

    private void addInflight(String str, FlowEnvelope flowEnvelope) {
        this.inflight = this.inflight.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), flowEnvelope));
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(40).append("Inflight message count of [").append(this.id()).append("] count is [").append(this.inflight.size()).append("]").toString();
        });
    }

    private void removeInflight(String str) {
        this.inflight = this.inflight.filterKeys(str2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$removeInflight$1(str, str2));
        });
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(40).append("Inflight message count of [").append(this.id()).append("] count is [").append(this.inflight.size()).append("]").toString();
        });
    }

    private void addConsumer(String str, MessageConsumer messageConsumer) {
        this.consumer = this.consumer.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(str), messageConsumer));
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(26).append("Consumer count of [").append(this.id()).append("] is [").append(this.consumer.size()).append("]").toString();
        });
    }

    private void removeConsumer(String str) {
        this.consumer = this.consumer.filterKeys(str2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$removeConsumer$1(str, str2));
        });
        cancelTimer(new JmsAckSourceStage.Poll(this.$outer, str));
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(26).append("Consumer count of [").append(this.id()).append("] is [").append(this.consumer.size()).append("]").toString();
        });
    }

    private Function1<FlowEnvelope, Option<JmsAcknowledgeHandler>> ackHandler() {
        return this.ackHandler;
    }

    @Override // blended.streams.jms.JmsStageLogic, blended.streams.jms.JmsConnector
    public void afterSessionClose(JmsAckSession jmsAckSession) {
        removeConsumer(jmsAckSession.sessionId());
    }

    private Function1<FlowEnvelope, Function1<JmsAcknowledgeHandler, Try<BoxedUnit>>> acknowledge() {
        return this.acknowledge;
    }

    public void blended$streams$jms$JmsAckSourceStage$$anon$$ackQueued(String str) {
        this.inflight.get(str).foreach(flowEnvelope -> {
            $anonfun$ackQueued$1(this, flowEnvelope);
            return BoxedUnit.UNIT;
        });
    }

    public void blended$streams$jms$JmsAckSourceStage$$anon$$poll(String str) {
        BoxedUnit boxedUnit;
        Tuple2 receive$1;
        String str2;
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
            return new StringBuilder(47).append("Trying to receive message from [").append(this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.jmsDestination().map(jmsDestination -> {
                return jmsDestination.asString();
            })).append("] in session [").append(str).append("]").toString();
        });
        Tuple2 tuple2 = new Tuple2(jmsSessions().get(str), this.consumer.get(str));
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some instanceof Some) {
                JmsAckSession jmsAckSession = (JmsAckSession) some.value();
                if (option instanceof Some) {
                    try {
                        receive$1 = receive$1(str);
                    } catch (JMSException e) {
                        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().warn(() -> {
                            return new StringBuilder(28).append("Error receiving message : [").append(e.getMessage()).append("]").toString();
                        });
                        closeSession(jmsAckSession);
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    if (receive$1 != null) {
                        Some some2 = (Option) receive$1._1();
                        if (some2 instanceof Some) {
                            Message message = (Message) some2.value();
                            FlowMessage flowMessage = (FlowMessage) ((Try) ((Function1) ((Function1) JmsFlowSupport$.MODULE$.jms2flowMessage().apply(this.$outer.blended$streams$jms$JmsAckSourceStage$$headerConfig())).apply(jmsSettings())).apply(message)).get();
                            Some header = flowMessage.header(this.$outer.blended$streams$jms$JmsAckSourceStage$$headerConfig().headerTransId(), ManifestFactory$.MODULE$.classType(String.class));
                            if (None$.MODULE$.equals(header)) {
                                String uuid = UUID.randomUUID().toString();
                                this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
                                    return new StringBuilder(26).append("Created new envelope id [").append(uuid).append("]").toString();
                                });
                                str2 = uuid;
                            } else {
                                if (!(header instanceof Some)) {
                                    throw new MatchError(header);
                                }
                                String str3 = (String) header.value();
                                this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
                                    return new StringBuilder(40).append("Reusing transaction id [").append(str3).append("] as envelope id").toString();
                                });
                                str2 = str3;
                            }
                            String str4 = str2;
                            this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().info(() -> {
                                return new StringBuilder(26).append("Message received [").append(str4).append("][").append(this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.jmsDestination().map(jmsDestination -> {
                                    return jmsDestination.asString();
                                })).append("][").append(jmsAckSession.sessionId()).append("] : ").append(flowMessage).toString();
                            });
                            JmsAcknowledgeHandler jmsAcknowledgeHandler = new JmsAcknowledgeHandler(str4, message, jmsAckSession, JmsAcknowledgeHandler$.MODULE$.apply$default$4(), this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log());
                            FlowEnvelope apply = FlowEnvelope$.MODULE$.apply(flowMessage, str4);
                            FlowEnvelope withAckHandler = ((FlowEnvelope) apply.withHeader(this.$outer.blended$streams$jms$JmsAckSourceStage$$headerConfig().headerTransId(), str4, apply.withHeader$default$3()).get()).withRequiresAcknowledge(true).withAckHandler(new Some(jmsAcknowledgeHandler));
                            jmsAckSession.resetAck();
                            addInflight(jmsAckSession.sessionId(), withAckHandler);
                            handleMessage().invoke(withAckHandler);
                            blended$streams$jms$JmsAckSourceStage$$anon$$ackQueued(str);
                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                            boxedUnit = BoxedUnit.UNIT;
                            return;
                        }
                    }
                    if (receive$1 != null) {
                        Option option2 = (Option) receive$1._1();
                        FiniteDuration finiteDuration = (FiniteDuration) receive$1._2();
                        if (None$.MODULE$.equals(option2)) {
                            this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
                                return new StringBuilder(27).append("No message available for [").append(jmsAckSession.sessionId()).append("]").toString();
                            });
                            scheduleOnce(new JmsAckSourceStage.Poll(this.$outer, str), finiteDuration);
                            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                            boxedUnit = BoxedUnit.UNIT;
                            return;
                        }
                    }
                    throw new MatchError(receive$1);
                }
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
            return new StringBuilder(39).append("Session or consumer not available in [").append(str).append("]").toString();
        });
        scheduleOnce(new JmsAckSourceStage.Poll(this.$outer, str), new package.DurationInt(package$.MODULE$.DurationInt(100)).millis());
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    @Override // blended.streams.jms.JmsConnector
    public Try<JmsAckSession> createSession(Connection connection) {
        try {
            Some jmsDestination = jmsSettings().jmsDestination();
            if (jmsDestination instanceof Some) {
                return new Success(new JmsAckSession(connection, connection.createSession(false, AcknowledgeMode$.MODULE$.ClientAcknowledge().mode()), nextSessionId(), (JmsDestination) jmsDestination.value(), JmsAckSession$.MODULE$.$lessinit$greater$default$5()));
            }
            if (!None$.MODULE$.equals(jmsDestination)) {
                throw new MatchError(jmsDestination);
            }
            String sb = new StringBuilder(42).append("Destination must be set for consumer in [").append(id()).append("]").toString();
            this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().error(() -> {
                return sb;
            });
            throw new IllegalArgumentException(sb);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            jmsSettings().log().error(() -> {
                return new StringBuilder(32).append("Error creating JMS session : [").append(th2).append(".]").toString();
            });
            handleError().invoke(th2);
            return new Failure(th2);
        }
    }

    @Override // blended.streams.jms.JmsStageLogic, blended.streams.jms.JmsConnector
    public PartialFunction<Object, BoxedUnit> handleTimer() {
        return handleTimer().orElse(new JmsAckSourceStage$$anon$1$$anonfun$handleTimer$1(this));
    }

    private JmsDestination dest() {
        return this.dest;
    }

    @Override // blended.streams.jms.SourceStageLogic
    public void pushMessage(FlowEnvelope flowEnvelope) {
        push(this.$outer.blended$streams$jms$JmsAckSourceStage$$out(), flowEnvelope);
    }

    private void createConsumer(JmsAckSession jmsAckSession) {
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(72).append("Creating message consumer for session [").append(jmsAckSession.sessionId()).append("], destination [").append(this.dest()).append("] and selector [").append(this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.selector()).append("]").toString();
        });
        Success createConsumer = jmsAckSession.createConsumer(this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.selector(), ec());
        if (createConsumer instanceof Success) {
            addConsumer(jmsAckSession.sessionId(), (MessageConsumer) createConsumer.value());
            scheduleOnce(new JmsAckSourceStage.Poll(this.$outer, jmsAckSession.sessionId()), new package.DurationInt(package$.MODULE$.DurationInt(100)).millis());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(createConsumer instanceof Failure)) {
            throw new MatchError(createConsumer);
        }
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
            return new StringBuilder(40).append("Failed to create consumer for session [").append(jmsAckSession.sessionId()).append("]").toString();
        });
        closeSession(jmsAckSession);
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    @Override // blended.streams.jms.JmsConnector
    public void onSessionOpened(JmsAckSession jmsAckSession) {
        if (jmsAckSession == null) {
            throw new IllegalArgumentException(new StringBuilder(47).append("Session must be of type JMSAckSession, it is a ").append(jmsAckSession.getClass().getName()).toString());
        }
        createConsumer(jmsAckSession);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public /* synthetic */ JmsAckSourceStage blended$streams$jms$JmsAckSourceStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ boolean $anonfun$removeInflight$1(String str, String str2) {
        return str2 != null ? !str2.equals(str) : str != null;
    }

    public static final /* synthetic */ boolean $anonfun$removeConsumer$1(String str, String str2) {
        return str2 != null ? !str2.equals(str) : str != null;
    }

    public static final /* synthetic */ void $anonfun$ackQueued$2(JmsAckSourceStage$$anon$1 jmsAckSourceStage$$anon$1, FlowEnvelope flowEnvelope, JmsAcknowledgeHandler jmsAcknowledgeHandler) {
        ((Try) ((Function1) jmsAckSourceStage$$anon$1.acknowledge().apply(flowEnvelope)).apply(jmsAcknowledgeHandler)).get();
    }

    public static final /* synthetic */ void $anonfun$ackQueued$1(JmsAckSourceStage$$anon$1 jmsAckSourceStage$$anon$1, FlowEnvelope flowEnvelope) {
        ((Option) jmsAckSourceStage$$anon$1.ackHandler().apply(flowEnvelope)).foreach(jmsAcknowledgeHandler -> {
            $anonfun$ackQueued$2(jmsAckSourceStage$$anon$1, flowEnvelope, jmsAcknowledgeHandler);
            return BoxedUnit.UNIT;
        });
    }

    private final Tuple2 receive$1(String str) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        Tuple2 tuple23;
        Tuple2 tuple24;
        Tuple2 tuple25 = new Tuple2(jmsSessions().get(str), this.consumer.get(str));
        if (tuple25 != null) {
            Some some = (Option) tuple25._1();
            Some some2 = (Option) tuple25._2();
            if (some instanceof Some) {
                JmsAckSession jmsAckSession = (JmsAckSession) some.value();
                if (some2 instanceof Some) {
                    MessageConsumer messageConsumer = (MessageConsumer) some2.value();
                    Option apply = this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.receiveTimeout().toMillis() <= 0 ? Option$.MODULE$.apply(messageConsumer.receiveNoWait()) : Option$.MODULE$.apply(messageConsumer.receive(this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.receiveTimeout().toMillis()));
                    if (None$.MODULE$.equals(apply)) {
                        tuple23 = new Tuple2(None$.MODULE$, this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.pollInterval());
                    } else {
                        if (!(apply instanceof Some)) {
                            throw new MatchError(apply);
                        }
                        Message message = (Message) ((Some) apply).value();
                        Some some3 = this.$outer.blended$streams$jms$JmsAckSourceStage$$minMessageDelay;
                        if (some3 instanceof Some) {
                            FiniteDuration finiteDuration = (FiniteDuration) some3.value();
                            if (System.currentTimeMillis() - message.getJMSTimestamp() <= finiteDuration.toMillis()) {
                                this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
                                    return "Message has not reached the minimum message delay yet ...";
                                });
                                closeSession(jmsAckSession);
                                this.blended$streams$jms$JmsAckSourceStage$$anon$$nextPoll = new Some(BoxesRunTime.boxToLong(message.getJMSTimestamp() + finiteDuration.toMillis()));
                                tuple24 = new Tuple2(None$.MODULE$, finiteDuration);
                            } else {
                                this.blended$streams$jms$JmsAckSourceStage$$anon$$nextPoll = None$.MODULE$;
                                tuple24 = new Tuple2(new Some(message), this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.pollInterval());
                            }
                            tuple22 = tuple24;
                        } else {
                            if (!None$.MODULE$.equals(some3)) {
                                throw new MatchError(some3);
                            }
                            tuple22 = new Tuple2(apply, this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.pollInterval());
                        }
                        tuple23 = tuple22;
                    }
                    tuple2 = tuple23;
                    return tuple2;
                }
            }
        }
        if (tuple25 == null) {
            throw new MatchError(tuple25);
        }
        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().trace(() -> {
            return new StringBuilder(39).append("Session or consumer not available in [").append(str).append("]").toString();
        });
        tuple2 = new Tuple2(None$.MODULE$, new package.DurationInt(package$.MODULE$.DurationInt(100)).millis());
        return tuple2;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public JmsAckSourceStage$$anon$1(JmsAckSourceStage jmsAckSourceStage, Attributes attributes) {
        super(jmsAckSourceStage.m16shape(), jmsAckSourceStage.blended$streams$jms$JmsAckSourceStage$$out(), jmsAckSourceStage.blended$streams$jms$JmsAckSourceStage$$settings, attributes);
        if (jmsAckSourceStage == null) {
            throw null;
        }
        this.$outer = jmsAckSourceStage;
        this.inflight = Predef$.MODULE$.Map().empty();
        this.consumer = Predef$.MODULE$.Map().empty();
        this.blended$streams$jms$JmsAckSourceStage$$anon$$nextPoll = None$.MODULE$;
        this.ackHandler = flowEnvelope -> {
            None$ none$;
            Some ackHandler = flowEnvelope.ackHandler();
            if (None$.MODULE$.equals(ackHandler)) {
                none$ = None$.MODULE$;
            } else {
                if (ackHandler instanceof Some) {
                    AcknowledgeHandler acknowledgeHandler = (AcknowledgeHandler) ackHandler.value();
                    if (acknowledgeHandler instanceof JmsAcknowledgeHandler) {
                        none$ = new Some((JmsAcknowledgeHandler) acknowledgeHandler);
                    }
                }
                none$ = None$.MODULE$;
            }
            return none$;
        };
        this.acknowledge = flowEnvelope2 -> {
            return jmsAcknowledgeHandler -> {
                return Try$.MODULE$.apply(() -> {
                    BoxedUnit boxedUnit;
                    BoxedUnit boxedUnit2;
                    String sessionId = jmsAcknowledgeHandler.session().sessionId();
                    Enumeration.Value ackState = jmsAcknowledgeHandler.session().ackState();
                    Enumeration.Value Acknowledged = JmsAckState$.MODULE$.Acknowledged();
                    if (Acknowledged != null ? Acknowledged.equals(ackState) : ackState == null) {
                        try {
                            jmsAcknowledgeHandler.jmsMessage().acknowledge();
                            this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
                                return new StringBuilder(47).append("Acknowledged envelope [").append(flowEnvelope2.id()).append("] message for session [").append(sessionId).append("]").toString();
                            });
                            this.removeInflight(sessionId);
                            this.blended$streams$jms$JmsAckSourceStage$$anon$$poll(sessionId);
                            boxedUnit = BoxedUnit.UNIT;
                        } catch (Throwable th) {
                            this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().error(th, () -> {
                                return new StringBuilder(47).append("Failed to acknowledge message [").append(flowEnvelope2.id()).append("] for session [").append(sessionId).append("]").toString();
                            });
                            this.closeSession(jmsAcknowledgeHandler.session());
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        return;
                    }
                    Enumeration.Value Denied = JmsAckState$.MODULE$.Denied();
                    if (Denied != null ? Denied.equals(ackState) : ackState == null) {
                        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().debug(() -> {
                            return new StringBuilder(33).append("Denying message [").append(flowEnvelope2.id()).append("] for session [").append(sessionId).append("]").toString();
                        });
                        this.closeSession(jmsAcknowledgeHandler.session());
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                        return;
                    }
                    Enumeration.Value Pending = JmsAckState$.MODULE$.Pending();
                    if (Pending != null ? !Pending.equals(ackState) : ackState != null) {
                        throw new MatchError(ackState);
                    }
                    if (System.currentTimeMillis() - jmsAcknowledgeHandler.created() > this.jmsSettings().ackTimeout().toMillis()) {
                        this.$outer.blended$streams$jms$JmsAckSourceStage$$settings.log().warn(() -> {
                            return new StringBuilder(50).append("Acknowledge timed out for message [").append(flowEnvelope2.id()).append("] in session [").append(sessionId).append("]").toString();
                        });
                        this.closeSession(jmsAcknowledgeHandler.session());
                        boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        this.scheduleOnce(new JmsAckSourceStage.Ack(this.$outer, sessionId), new package.DurationInt(package$.MODULE$.DurationInt(10)).millis());
                        boxedUnit2 = BoxedUnit.UNIT;
                    }
                });
            };
        };
        Some jmsDestination = jmsSettings().jmsDestination();
        if (jmsDestination instanceof Some) {
            this.dest = (JmsDestination) jmsDestination.value();
        } else {
            if (!None$.MODULE$.equals(jmsDestination)) {
                throw new MatchError(jmsDestination);
            }
            throw new Exception("Destination must be set for Consumer");
        }
    }
}
