package com.qubit.pubsub.akka;

import akka.stream.Attributes;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import com.qubit.pubsub.FuturePool$;
import com.qubit.pubsub.Retry$;
import com.qubit.pubsub.akka.attributes.Cpackage;
import com.qubit.pubsub.akka.attributes.package$PubSubClientAttribute$;
import com.qubit.pubsub.akka.attributes.package$PubSubPullTimeoutAttribute$;
import com.qubit.pubsub.akka.attributes.package$PubSubStageBufferSizeAttribute$;
import com.qubit.pubsub.akka.attributes.package$PubSubStageMaxRetriesAttribute$;
import com.qubit.pubsub.akka.attributes.package$PubSubStageRetryJitterAttribute$;
import com.qubit.pubsub.client.PubSubClient;
import com.qubit.pubsub.client.ReceivedPubSubMessage;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.collection.immutable.Queue$;
import scala.collection.mutable.Queue;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;

/* compiled from: PubSubSource.scala */
/* loaded from: input_file:com/qubit/pubsub/akka/PubSubSource$$anon$1.class */
public final class PubSubSource$$anon$1 extends TimerGraphStageLogic {
    private final PubSubClient com$qubit$pubsub$akka$PubSubSource$$anon$$client;
    private final int com$qubit$pubsub$akka$PubSubSource$$anon$$maxBufferSize;
    private final FiniteDuration com$qubit$pubsub$akka$PubSubSource$$anon$$pullTimeout;
    private final int maxRetries;
    private final Cpackage.PubSubStageRetryJitterAttribute jitter;
    private final Queue<ReceivedPubSubMessage> com$qubit$pubsub$akka$PubSubSource$$anon$$buffer;
    private int com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize;
    private final Queue<String> ackBuffer;
    private int ackBufferSize;
    private final /* synthetic */ PubSubSource $outer;

    public PubSubClient com$qubit$pubsub$akka$PubSubSource$$anon$$client() {
        return this.com$qubit$pubsub$akka$PubSubSource$$anon$$client;
    }

    public int com$qubit$pubsub$akka$PubSubSource$$anon$$maxBufferSize() {
        return this.com$qubit$pubsub$akka$PubSubSource$$anon$$maxBufferSize;
    }

    public FiniteDuration com$qubit$pubsub$akka$PubSubSource$$anon$$pullTimeout() {
        return this.com$qubit$pubsub$akka$PubSubSource$$anon$$pullTimeout;
    }

    private int maxRetries() {
        return this.maxRetries;
    }

    private Cpackage.PubSubStageRetryJitterAttribute jitter() {
        return this.jitter;
    }

    public Queue<ReceivedPubSubMessage> com$qubit$pubsub$akka$PubSubSource$$anon$$buffer() {
        return this.com$qubit$pubsub$akka$PubSubSource$$anon$$buffer;
    }

    public int com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize() {
        return this.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize;
    }

    public void com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize_$eq(int i) {
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize = i;
    }

    private Queue<String> ackBuffer() {
        return this.ackBuffer;
    }

    private int ackBufferSize() {
        return this.ackBufferSize;
    }

    private void ackBufferSize_$eq(int i) {
        this.ackBufferSize = i;
    }

    public void preStart() {
        super/*akka.stream.stage.GraphStageLogic*/.preStart();
        schedulePeriodicallyWithInitialDelay("ack-timer", this.$outer.com$qubit$pubsub$akka$PubSubSource$$ackInterval, this.$outer.com$qubit$pubsub$akka$PubSubSource$$ackInterval);
    }

    public void onTimer(Object obj) {
        if (this.$outer.logger().underlying().isTraceEnabled()) {
            this.$outer.logger().underlying().trace("Buffer size = {}. Ack buffer size = {}", (Object[]) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{BoxesRunTime.boxToInteger(com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize()).toString(), BoxesRunTime.boxToInteger(ackBufferSize()).toString()})).toArray(ClassTag$.MODULE$.apply(String.class)));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        com$qubit$pubsub$akka$PubSubSource$$anon$$ackConsumedMessages();
    }

    public void com$qubit$pubsub$akka$PubSubSource$$anon$$addToAckQueue(String str) {
        ackBuffer().enqueue(Predef$.MODULE$.wrapRefArray(new String[]{str}));
        ackBufferSize_$eq(ackBufferSize() + 1);
        if (ackBufferSize() >= com$qubit$pubsub$akka$PubSubSource$$anon$$maxBufferSize()) {
            com$qubit$pubsub$akka$PubSubSource$$anon$$ackConsumedMessages();
        }
    }

    public void com$qubit$pubsub$akka$PubSubSource$$anon$$pullNextBatch() {
        Try apply = Retry$.MODULE$.apply(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"pull from ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.com$qubit$pubsub$akka$PubSubSource$$subscription})), new PubSubSource$$anon$1$$anonfun$1(this), maxRetries(), com$qubit$pubsub$akka$PubSubSource$$anon$$pullTimeout(), jitter().minSeconds(), jitter().maxSeconds());
        if (apply.isSuccess()) {
            com$qubit$pubsub$akka$PubSubSource$$anon$$buffer().enqueue((Seq) apply.get());
            com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize_$eq(com$qubit$pubsub$akka$PubSubSource$$anon$$buffer().size());
            return;
        }
        if (this.$outer.logger().underlying().isErrorEnabled()) {
            this.$outer.logger().underlying().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Pull from ", " failed. Failing stage"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.com$qubit$pubsub$akka$PubSubSource$$subscription})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        failStage((Throwable) apply.failed().get());
    }

    public void com$qubit$pubsub$akka$PubSubSource$$anon$$ackConsumedMessages() {
        if (ackBuffer().isEmpty()) {
            return;
        }
        scala.collection.immutable.Queue apply = Queue$.MODULE$.apply(ackBuffer());
        ackBuffer().clear();
        ackBufferSize_$eq(0);
        if (this.$outer.logger().underlying().isTraceEnabled()) {
            this.$outer.logger().underlying().trace("Acking {} messages", new Object[]{BoxesRunTime.boxToInteger(apply.size()).toString()});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        com$qubit$pubsub$akka$PubSubSource$$anon$$client().ack(this.$outer.com$qubit$pubsub$akka$PubSubSource$$subscription, apply).onFailure(new PubSubSource$$anon$1$$anonfun$com$qubit$pubsub$akka$PubSubSource$$anon$$ackConsumedMessages$1(this), FuturePool$.MODULE$.executionContext());
    }

    public /* synthetic */ PubSubSource com$qubit$pubsub$akka$PubSubSource$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PubSubSource$$anon$1(PubSubSource pubSubSource, Attributes attributes) {
        super(pubSubSource.m7shape());
        if (pubSubSource == null) {
            throw null;
        }
        this.$outer = pubSubSource;
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$client = ((Cpackage.PubSubClientAttribute) attributes.getAttribute(Cpackage.PubSubClientAttribute.class, new Cpackage.PubSubClientAttribute(package$PubSubClientAttribute$.MODULE$.apply$default$1()))).client();
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$maxBufferSize = ((Cpackage.PubSubStageBufferSizeAttribute) attributes.getAttribute(Cpackage.PubSubStageBufferSizeAttribute.class, new Cpackage.PubSubStageBufferSizeAttribute(package$PubSubStageBufferSizeAttribute$.MODULE$.apply$default$1()))).bufferSize();
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$pullTimeout = ((Cpackage.PubSubPullTimeoutAttribute) attributes.getAttribute(Cpackage.PubSubPullTimeoutAttribute.class, new Cpackage.PubSubPullTimeoutAttribute(package$PubSubPullTimeoutAttribute$.MODULE$.apply$default$1()))).timeout();
        this.maxRetries = ((Cpackage.PubSubStageMaxRetriesAttribute) attributes.getAttribute(Cpackage.PubSubStageMaxRetriesAttribute.class, new Cpackage.PubSubStageMaxRetriesAttribute(package$PubSubStageMaxRetriesAttribute$.MODULE$.apply$default$1()))).maxRetries();
        this.jitter = (Cpackage.PubSubStageRetryJitterAttribute) attributes.getAttribute(Cpackage.PubSubStageRetryJitterAttribute.class, new Cpackage.PubSubStageRetryJitterAttribute(package$PubSubStageRetryJitterAttribute$.MODULE$.apply$default$1(), package$PubSubStageRetryJitterAttribute$.MODULE$.apply$default$2()));
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$buffer = scala.collection.mutable.Queue$.MODULE$.empty();
        this.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize = 0;
        this.ackBuffer = scala.collection.mutable.Queue$.MODULE$.empty();
        this.ackBufferSize = 0;
        setHandler(pubSubSource.out(), new OutHandler(this) { // from class: com.qubit.pubsub.akka.PubSubSource$$anon$1$$anon$2
            private final /* synthetic */ PubSubSource$$anon$1 $outer;

            public void onPull() {
                if (this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize() == 0 && !this.$outer.isClosed(this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().out())) {
                    if (this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().isTraceEnabled()) {
                        this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().trace("Pulling next batch from PubSub");
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$pullNextBatch();
                }
                ReceivedPubSubMessage receivedPubSubMessage = (ReceivedPubSubMessage) this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$buffer().dequeue();
                this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize_$eq(this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize() - 1);
                this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$addToAckQueue(receivedPubSubMessage.ackId());
                if (this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().isTraceEnabled()) {
                    this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().trace(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Pushing message downstream. Buffer size = ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$bufferSize())})));
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                this.$outer.push(this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().out(), receivedPubSubMessage.payload());
            }

            public void onDownstreamFinish() {
                if (this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().isInfoEnabled()) {
                    this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$$outer().logger().underlying().info("Downstream finished. Committing consumed messages so far");
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                this.$outer.com$qubit$pubsub$akka$PubSubSource$$anon$$ackConsumedMessages();
                OutHandler.class.onDownstreamFinish(this);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.class.$init$(this);
            }
        });
    }
}
