package monifu.reactive.internals;

import monifu.concurrent.Scheduler;
import monifu.concurrent.locks.Lock;
import monifu.concurrent.locks.Lock$;
import monifu.concurrent.locks.SpinLock;
import monifu.concurrent.locks.SpinLock$;
import monifu.reactive.Observable;
import monifu.reactive.Observer;
import monifu.reactive.api.Ack;
import monifu.reactive.api.Ack$Cancel$;
import monifu.reactive.api.Ack$Continue$;
import monifu.reactive.api.BufferPolicy;
import monifu.reactive.observers.BufferedObserver;
import monifu.reactive.observers.BufferedObserver$;
import scala.Option;
import scala.Predef$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

/* compiled from: MergeBuffer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dcAB\u0001\u0003\u0005\u0019A1H\u0001\nC_VtG-\u001a3NKJ<WMQ;gM\u0016\u0014(BA\u0002\u0005\u0003%Ig\u000e^3s]\u0006d7O\u0003\u0002\u0006\r\u0005A!/Z1di&4XMC\u0001\b\u0003\u0019iwN\\5gkV\u0011\u0011BF\n\u0004\u0001)\u0001\u0002CA\u0006\u000f\u001b\u0005a!\"A\u0007\u0002\u000bM\u001c\u0017\r\\1\n\u0005=a!AB!osJ+g\rE\u0002\u0012%Qi\u0011\u0001B\u0005\u0003'\u0011\u0011\u0001b\u00142tKJ4XM\u001d\t\u0003+Ya\u0001\u0001B\u0003\u0018\u0001\t\u0007\u0011DA\u0001V\u0007\u0001\t\"AG\u000f\u0011\u0005-Y\u0012B\u0001\u000f\r\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"a\u0003\u0010\n\u0005}a!aA!os\"A\u0011\u0005\u0001B\u0001B\u0003%\u0001#\u0001\u0006e_^t7\u000f\u001e:fC6D\u0001b\t\u0001\u0003\u0002\u0003\u0006I\u0001J\u0001\u000f[\u0016\u0014x-\u001a\"bi\u000eD7+\u001b>f!\tYQ%\u0003\u0002'\u0019\t\u0019\u0011J\u001c;\t\u0011!\u0002!\u0011!Q\u0001\n%\nABY;gM\u0016\u0014\bk\u001c7jGf\u0004\"AK\u0017\u000e\u0003-R!\u0001\f\u0003\u0002\u0007\u0005\u0004\u0018.\u0003\u0002/W\ta!)\u001e4gKJ\u0004v\u000e\\5ds\"A\u0001\u0007\u0001B\u0001B\u0003-\u0011'A\u0005tG\",G-\u001e7feB\u0011!'N\u0007\u0002g)\u0011AGB\u0001\u000bG>t7-\u001e:sK:$\u0018B\u0001\u001c4\u0005%\u00196\r[3ek2,'\u000fC\u00039\u0001\u0011\u0005\u0011(\u0001\u0004=S:LGO\u0010\u000b\u0005uyz\u0004\t\u0006\u0002<{A\u0019A\b\u0001\u000b\u000e\u0003\tAQ\u0001M\u001cA\u0004EBQ!I\u001cA\u0002AAQaI\u001cA\u0002\u0011BQ\u0001K\u001cA\u0002%BaA\u0011\u0001!\u0002\u0013\u0019\u0015\u0001\u00027pG.\u0004\"\u0001R$\u000e\u0003\u0015S!AR\u001a\u0002\u000b1|7m[:\n\u0005!+%\u0001C*qS:dunY6\t\r)\u0003\u0001\u0015!\u0003L\u0003\u0019\u0011WO\u001a4feB\u0019Aj\u0014\u000b\u000e\u00035S!A\u0014\u0003\u0002\u0013=\u00147/\u001a:wKJ\u001c\u0018B\u0001)N\u0005A\u0011UO\u001a4fe\u0016$wJY:feZ,'\u000f\u0003\u0004S\u0001\u0001\u0006KaU\u0001\u000ba\u0016\u0014X.[:tS>t\u0007c\u0001+W16\tQK\u0003\u00025\u0019%\u0011q+\u0016\u0002\b!J|W.[:f!\tQ\u0013,\u0003\u0002[W\t\u0019\u0011iY6\t\rq\u0003\u0001\u0015)\u0003%\u00035\t7\r^5wKN#(/Z1ng\"1a\f\u0001Q!\n\u0011\na\u0002]3oI&twm\u0015;sK\u0006l7\u000f\u0003\u0004a\u0001\u0001\u0006K!Y\u0001\u0007SN$uN\\3\u0011\u0005-\u0011\u0017BA2\r\u0005\u001d\u0011un\u001c7fC:DQ!\u001a\u0001\u0005\u0002\u0019\fQ!\\3sO\u0016$2a\u001a6p!\r!\u0006\u000eW\u0005\u0003SV\u0013aAR;ukJ,\u0007\"B6e\u0001\u0004a\u0017\u0001C;qgR\u0014X-Y7\u0011\u0007EiG#\u0003\u0002o\t\tQqJY:feZ\f'\r\\3\t\u000fA$\u0007\u0013!a\u0001C\u0006Qq/Y:QK:$\u0017N\\4\t\rI\u0004\u0001\u0015\"\u0003t\u0003=\u0019\u0017M\\2fYN#(/Z1nS:<GC\u0001;x!\tYQ/\u0003\u0002w\u0019\t!QK\\5u\u0011\u001dA\u0018\u000f%AA\u0002e\f1b]5h]\u0006dWI\u001d:peB\u0019!0!\u0002\u000f\u0007m\f\tA\u0004\u0002}\u007f6\tQP\u0003\u0002\u007f1\u00051AH]8pizJ\u0011!D\u0005\u0004\u0003\u0007a\u0011a\u00029bG.\fw-Z\u0005\u0005\u0003\u000f\tIAA\u0005UQJ|w/\u00192mK*\u0019\u00111\u0001\u0007\t\u000f\u00055\u0001\u0001\"\u0001\u0002\u0010\u00051qN\u001c(fqR$2aZA\t\u0011\u001d\t\u0019\"a\u0003A\u0002Q\tA!\u001a7f[\"9\u0011q\u0003\u0001\u0005\u0002\u0005e\u0011aB8o\u000bJ\u0014xN\u001d\u000b\u0004i\u0006m\u0001bBA\u000f\u0003+\u0001\r!_\u0001\u0003KbDq!!\t\u0001\t\u0003\t\u0019#\u0001\u0006p]\u000e{W\u000e\u001d7fi\u0016$\u0012\u0001\u001e\u0005\n\u0003O\u0001\u0011\u0013!C\u0001\u0003S\tq\"\\3sO\u0016$C-\u001a4bk2$HEM\u000b\u0003\u0003WQ3!YA\u0017W\t\ty\u0003\u0005\u0003\u00022\u0005mRBAA\u001a\u0015\u0011\t)$a\u000e\u0002\u0013Ut7\r[3dW\u0016$'bAA\u001d\u0019\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005u\u00121\u0007\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007\"CA!\u0001E\u0005I\u0011BA\"\u0003e\u0019\u0017M\\2fYN#(/Z1nS:<G\u0005Z3gCVdG\u000fJ\u0019\u0016\u0005\u0005\u0015#fA=\u0002.\u0001")
/* loaded from: input_file:monifu/reactive/internals/BoundedMergeBuffer.class */
public final class BoundedMergeBuffer<U> implements Observer<U> {
    private final int mergeBatchSize;
    private final Scheduler scheduler;
    private final SpinLock lock;
    private final BufferedObserver<U> buffer;
    private Promise<Ack> permission;
    private int activeStreams;
    private int pendingStreams;
    private boolean isDone;

    public Future<Ack> merge(Observable<U> observable, boolean z) {
        boolean z2;
        Future<Ack> future;
        Lock Extensions = Lock$.MODULE$.Extensions(this.lock);
        if (Extensions.isAcquiredByCurrentThread()) {
            z2 = false;
        } else {
            Extensions.unsafeLock();
            z2 = true;
        }
        boolean z3 = z2;
        try {
            if (this.isDone) {
                future = Ack$Cancel$.MODULE$;
            } else if (this.mergeBatchSize <= 0 || this.activeStreams < this.mergeBatchSize + 1) {
                if (z) {
                    this.pendingStreams--;
                }
                this.activeStreams++;
                try {
                    observable.unsafeSubscribe(this);
                    future = Ack$Continue$.MODULE$;
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    onError((Throwable) unapply.get());
                    future = Ack$Cancel$.MODULE$;
                }
            } else {
                if (!z) {
                    this.pendingStreams++;
                }
                future = this.permission.future().flatMap(new BoundedMergeBuffer$$anonfun$merge$1(this, observable), this.scheduler);
            }
            return future;
        } finally {
            if (z3) {
                Extensions.unsafeUnlock();
            }
        }
    }

    public boolean merge$default$2() {
        return false;
    }

    public void monifu$reactive$internals$BoundedMergeBuffer$$cancelStreaming(Throwable th) {
        boolean z;
        Lock Extensions = Lock$.MODULE$.Extensions(this.lock);
        if (Extensions.isAcquiredByCurrentThread()) {
            z = false;
        } else {
            Extensions.unsafeLock();
            z = true;
        }
        boolean z2 = z;
        try {
            if (!this.isDone) {
                this.isDone = true;
                this.activeStreams = 0;
                this.pendingStreams = 0;
                if (this.mergeBatchSize > 0) {
                    this.permission.success(Ack$Cancel$.MODULE$);
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                if (th != null) {
                    this.buffer.onError(th);
                }
            }
        } finally {
            if (z2) {
                Extensions.unsafeUnlock();
            }
        }
    }

    public Throwable monifu$reactive$internals$BoundedMergeBuffer$$cancelStreaming$default$1() {
        return null;
    }

    @Override // monifu.reactive.Observer
    public Future<Ack> onNext(U u) {
        return package$FutureAckExtensions$.MODULE$.onCancel$extension(package$.MODULE$.FutureAckExtensions(this.buffer.onNext(u)), new BoundedMergeBuffer$$anonfun$onNext$1(this), this.scheduler);
    }

    @Override // monifu.reactive.Observer
    public void onError(Throwable th) {
        monifu$reactive$internals$BoundedMergeBuffer$$cancelStreaming(th);
    }

    @Override // monifu.reactive.Observer
    public void onComplete() {
        boolean z;
        Lock Extensions = Lock$.MODULE$.Extensions(this.lock);
        if (Extensions.isAcquiredByCurrentThread()) {
            z = false;
        } else {
            Extensions.unsafeLock();
            z = true;
        }
        boolean z2 = z;
        try {
            if (!this.isDone) {
                if (this.activeStreams == 1 && this.pendingStreams == 0) {
                    this.activeStreams = 0;
                    if (this.mergeBatchSize > 0) {
                        this.permission.success(Ack$Cancel$.MODULE$);
                    } else {
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    }
                    this.buffer.onComplete();
                    this.isDone = true;
                } else if (this.mergeBatchSize > 0 && this.activeStreams == this.mergeBatchSize + 1) {
                    this.permission.success(Ack$Continue$.MODULE$);
                    this.permission = Promise$.MODULE$.apply();
                    this.activeStreams--;
                } else if (this.activeStreams > 0) {
                    this.activeStreams--;
                }
            }
        } finally {
            if (z2) {
                Extensions.unsafeUnlock();
            }
        }
    }

    public BoundedMergeBuffer(Observer<U> observer, int i, BufferPolicy bufferPolicy, Scheduler scheduler) {
        this.mergeBatchSize = i;
        this.scheduler = scheduler;
        Predef$.MODULE$.require(i > 0, new BoundedMergeBuffer$$anonfun$1(this));
        this.lock = SpinLock$.MODULE$.apply();
        this.buffer = BufferedObserver$.MODULE$.apply(observer, bufferPolicy, scheduler);
        this.permission = i <= 0 ? null : Promise$.MODULE$.apply();
        this.activeStreams = 1;
        this.pendingStreams = 0;
        this.isDone = false;
    }
}
