package org.nustaq.kontraktor.reactivestreams.impl;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.RemoteConnection;
import org.nustaq.kontraktor.annotations.AsCallback;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.impl.CallbackWrapper;
import org.nustaq.kontraktor.reactivestreams.KxPublisher;
import org.nustaq.kontraktor.reactivestreams.KxReactiveStreams;
import org.nustaq.kontraktor.remoting.base.RemotedActor;
import org.nustaq.kontraktor.util.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/nustaq/kontraktor/reactivestreams/impl/KxPublisherActor.class */
public class KxPublisherActor<IN, OUT> extends Actor<KxPublisherActor<IN, OUT>> implements Processor<IN, OUT>, KxPublisher<OUT>, RemotedActor {
    public static final boolean CRED_DEBUG = false;
    protected Map<Integer, SubscriberEntry> subscribers;
    protected Function<IN, OUT> processor;
    protected ArrayDeque pending;
    protected Object actorServer;
    public KxReactiveStreams _streams;
    public ArrayList<Subscriber> _callerSideSubscribers;
    protected Subscription producer;
    protected long batchSize;
    protected long requestNextTrigger;
    protected long openRequested;
    protected int subsIdCount = 1;
    protected ArrayList<Runnable> doOnSubscribe = new ArrayList<>();
    protected boolean isIteratorBased = false;
    protected boolean closeOnComplete = false;
    protected boolean lossy = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/nustaq/kontraktor/reactivestreams/impl/KxPublisherActor$KSubscription.class */
    public static class KSubscription implements Subscription, Serializable {
        protected KxPublisherActor publisher;
        protected int id;

        public KSubscription(KxPublisherActor kxPublisherActor, int i) {
            this.publisher = kxPublisherActor;
            this.id = i;
        }

        public int getId() {
            return this.id;
        }

        public void request(long j) {
            if (j <= 0) {
                this.publisher.onError(new IllegalArgumentException("spec rule 3.9: request > 0 elements"));
            } else {
                this.publisher._rq(j, this.id);
            }
        }

        public void cancel() {
            removeRegistration();
            this.publisher._cancel(this.id);
        }

        protected void removeRegistration() {
            if (this.publisher._callerSideSubscribers != null) {
                synchronized (this.publisher._callerSideSubscribers) {
                    this.publisher._callerSideSubscribers.remove(this);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/nustaq/kontraktor/reactivestreams/impl/KxPublisherActor$SubscriberEntry.class */
    public static class SubscriberEntry {
        protected int subsId;
        protected long credits;
        protected KSubscription subscription;
        protected Callback subscriber;

        public SubscriberEntry(int i, KSubscription kSubscription, Callback callback) {
            this.subsId = i;
            this.subscription = kSubscription;
            this.subscriber = callback;
        }

        public void addCredits(long j) {
            this.credits += j;
        }

        public int getSubsId() {
            return this.subsId;
        }

        public long getCredits() {
            return this.credits;
        }

        public KSubscription getSubscription() {
            return this.subscription;
        }

        public void onError(Throwable th) {
            this.subscriber.reject(th);
        }

        public void onNext(Object obj) {
            this.subscriber.pipe(obj);
            this.credits--;
        }

        public void onComplete() {
            this.subscriber.finish();
        }
    }

    public void init(Function<IN, OUT> function) {
        this.pending = new ArrayDeque();
        this.processor = function;
    }

    public void initFromIterator(final Iterator<IN> it) {
        this.pending = new ArrayDeque();
        this.isIteratorBased = true;
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 1, 10L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.producer = new Subscription() { // from class: org.nustaq.kontraktor.reactivestreams.impl.KxPublisherActor.1
            boolean complete = false;

            public void request(long j) {
                Executor executor = threadPoolExecutor;
                Iterator it2 = it;
                executor.execute(() -> {
                    if (this.complete) {
                        return;
                    }
                    long j2 = j;
                    while (it2.hasNext()) {
                        try {
                            long j3 = j2;
                            j2 = j3 - 1;
                            if (j3 <= 0) {
                                break;
                            } else {
                                ((KxPublisherActor) KxPublisherActor.this.self()).onNext(it2.next());
                            }
                        } catch (Throwable th) {
                            ((KxPublisherActor) KxPublisherActor.this.self()).onError(th);
                            return;
                        }
                    }
                    if (!it2.hasNext()) {
                        this.complete = true;
                        ((KxPublisherActor) KxPublisherActor.this.self()).onComplete();
                    }
                });
            }

            public void cancel() {
            }
        };
        this.processor = obj -> {
            return obj;
        };
        onSubscribe(this.producer);
        Thread.currentThread().setName(Thread.currentThread() + " (rx async stream processor)");
    }

    @CallerSideMethod
    public void subscribe(Subscriber<? super OUT> subscriber) {
        if (isRemote()) {
            synchronized (this) {
                if (this._callerSideSubscribers == null) {
                    this._callerSideSubscribers = new ArrayList<>();
                }
                this._callerSideSubscribers.add(subscriber);
            }
        }
        if (subscriber == null) {
            subscriber.onError(new IllegalArgumentException("cannot subscibe null"));
        } else {
            _subscribe((obj, obj2) -> {
                if (isError(obj2)) {
                    subscriber.onError((Throwable) obj2);
                } else if (isComplete(obj2)) {
                    subscriber.onComplete();
                } else {
                    subscriber.onNext(obj);
                }
            }).then(kSubscription -> {
                Log.Info(this, "stream subscribe acknowledged");
                subscriber.onSubscribe(kSubscription);
            });
        }
    }

    public IPromise<KSubscription> _subscribe(Callback callback) {
        if (this.subscribers == null) {
            this.subscribers = new HashMap();
        }
        int i = this.subsIdCount;
        this.subsIdCount = i + 1;
        KSubscription kSubscription = new KSubscription((KxPublisherActor) self(), i);
        this.subscribers.put(Integer.valueOf(i), new SubscriberEntry(i, kSubscription, callback));
        return new Promise(kSubscription);
    }

    public void _cancel(int i) {
        if (this.doOnSubscribe != null) {
            this.doOnSubscribe.add(() -> {
                _cancel(i);
            });
        } else {
            this.subscribers.remove(Integer.valueOf(i));
        }
    }

    @AsCallback
    public void _rq(long j, int i) {
        if (this.doOnSubscribe != null) {
            this.doOnSubscribe.add(() -> {
                _rq(j, i);
            });
            return;
        }
        SubscriberEntry se = getSE(Integer.valueOf(i));
        if (se != null) {
            se.addCredits(j);
        } else {
            Log.Warn(this, "ignored credits " + j + " on id " + i);
        }
        emitRequestNext();
    }

    protected SubscriberEntry getSE(Integer num) {
        return this.subscribers.get(num);
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
        this.requestNextTrigger = i / KxReactiveStreams.REQU_NEXT_DIVISOR;
    }

    @CallerSideMethod
    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw null;
        }
        ((KxPublisherActor) self())._onSubscribe(subscription);
    }

    public void _onSubscribe(Subscription subscription) {
        this.producer = subscription;
        ArrayList<Runnable> arrayList = this.doOnSubscribe;
        this.doOnSubscribe = null;
        arrayList.forEach(runnable -> {
            runnable.run();
        });
    }

    protected void emitRequestNext() {
        if (this.openRequested < this.requestNextTrigger) {
            long j = Long.MAX_VALUE;
            Iterator<Map.Entry<Integer, SubscriberEntry>> it = this.subscribers.entrySet().iterator();
            while (it.hasNext()) {
                long j2 = it.next().getValue().credits;
                if (j > j2) {
                    j = j2;
                }
                if (j2 < this.openRequested) {
                    return;
                }
            }
            if (!this.isIteratorBased) {
                this.producer.request(this.batchSize);
                this.openRequested += this.batchSize;
            } else if (j > 0) {
                long min = Math.min(j, this.batchSize);
                this.producer.request(min);
                this.openRequested += min;
            }
        }
    }

    @CallerSideMethod
    public void onNext(IN in) {
        if (in == null) {
            throw null;
        }
        _onNext(in);
    }

    public void _onNext(IN in) {
        if (this.subscribers == null) {
            return;
        }
        this.openRequested--;
        try {
            OUT apply = this.processor.apply(in);
            if (apply != null) {
                forwardMessage(apply);
            } else {
                emitRequestNext();
            }
        } catch (Throwable th) {
            th.printStackTrace();
            forwardError(th);
        }
    }

    protected void forwardError(Throwable th) {
        if (this.subscribers == null) {
            th.printStackTrace();
        } else {
            this.subscribers.forEach((num, subscriberEntry) -> {
                subscriberEntry.onError(th);
            });
        }
    }

    protected void forwardMessage(Object obj) {
        if (this.subscribers == null) {
            return;
        }
        if (this.lossy) {
            ArrayList arrayList = null;
            Iterator<Map.Entry<Integer, SubscriberEntry>> it = this.subscribers.entrySet().iterator();
            while (it.hasNext()) {
                SubscriberEntry value = it.next().getValue();
                if (value.getCredits() > 0) {
                    try {
                        value.onNext(obj);
                    } catch (Throwable th) {
                        if (arrayList == null) {
                            arrayList = new ArrayList();
                        }
                        arrayList.add(value);
                    }
                }
            }
            if (arrayList != null) {
                removeSubscribers(arrayList);
            }
            if (this.subscribers.size() > 0) {
                emitRequestNext();
                return;
            }
            return;
        }
        long calcMinCredits = calcMinCredits();
        if (calcMinCredits <= 0) {
            this.pending.addFirst(obj);
            return;
        }
        List list = null;
        if (this.pending.size() > 0) {
            this.pending.addFirst(obj);
            list = forwardPending(calcMinCredits, null);
        } else {
            Iterator<Map.Entry<Integer, SubscriberEntry>> it2 = this.subscribers.entrySet().iterator();
            while (it2.hasNext()) {
                SubscriberEntry value2 = it2.next().getValue();
                try {
                    value2.onNext(obj);
                } catch (Throwable th2) {
                    if (list == null) {
                        list = new ArrayList();
                    }
                    list.add(value2);
                }
            }
            long j = calcMinCredits - 1;
        }
        if (list != null) {
            removeSubscribers(list);
        }
        if (this.subscribers.size() > 0) {
            emitRequestNext();
            return;
        }
        if (this.openRequested > 0 || this.pending.size() > 0) {
            Log.Info(this, "no subscribers, deleting " + this.pending.size() + " messages");
            this.pending.clear();
        }
        this.openRequested = 0L;
    }

    protected long calcMinCredits() {
        long j = Long.MAX_VALUE;
        Iterator<Map.Entry<Integer, SubscriberEntry>> it = this.subscribers.entrySet().iterator();
        while (it.hasNext()) {
            SubscriberEntry value = it.next().getValue();
            if (j > value.getCredits()) {
                j = value.getCredits();
            }
        }
        return j;
    }

    protected void removeSubscribers(List list) {
        list.forEach(obj -> {
            int subsId = ((SubscriberEntry) obj).getSubsId();
            try {
                _cancel(subsId);
            } catch (Throwable th) {
            }
            this.subscribers.remove(Integer.valueOf(subsId));
            subscriberDisconnected(subsId);
        });
    }

    protected List forwardPending(long j, List list) {
        while (this.pending.size() > 0 && j > 0) {
            Object removeLast = this.pending.removeLast();
            Iterator<Map.Entry<Integer, SubscriberEntry>> it = this.subscribers.entrySet().iterator();
            while (it.hasNext()) {
                SubscriberEntry value = it.next().getValue();
                try {
                    value.onNext(removeLast);
                } catch (Throwable th) {
                    if (list == null) {
                        list = new ArrayList();
                    }
                    list.add(value);
                }
            }
            j--;
        }
        return list;
    }

    @CallerSideMethod
    public void onError(Throwable th) {
        if (th == null) {
            throw null;
        }
        _onError(th);
    }

    public void _onError(Throwable th) {
        forwardError(th);
        stop();
    }

    public void onComplete() {
        List forwardPending;
        if (this.pending.size() <= 0) {
            this.subscribers.forEach((num, subscriberEntry) -> {
                subscriberEntry.onComplete();
            });
            stop();
            return;
        }
        if (this.pending.size() > 0 && (forwardPending = forwardPending(calcMinCredits(), null)) != null && forwardPending.size() > 0) {
            removeSubscribers(forwardPending);
        }
        delayed(1L, () -> {
            ((KxPublisherActor) self()).onComplete();
        });
    }

    public void hasBeenUnpublished() {
        Iterator<Map.Entry<Integer, SubscriberEntry>> it = this.subscribers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SubscriberEntry> next = it.next();
            CallbackWrapper callbackWrapper = next.getValue().subscriber;
            if ((callbackWrapper instanceof CallbackWrapper) && callbackWrapper.isTerminated()) {
                it.remove();
                subscriberDisconnected(next.getKey().intValue());
            }
        }
    }

    public void stop() {
        if (isPublished() && this.closeOnComplete) {
            ConcurrentLinkedQueue connections = getConnections();
            close();
            Iterator it = connections.iterator();
            while (it.hasNext()) {
                ((RemoteConnection) it.next()).closeNetwork();
            }
        }
        super.stop();
    }

    public void subscriberDisconnected(int i) {
        Log.Info(this, "a stream client disconnected id:" + i + " remaining:" + this.subscribers.size());
        emitRequestNext();
    }

    public void setCloseOnComplete(boolean z) {
        this.closeOnComplete = z;
    }

    public void setLossy(boolean z) {
        this.lossy = z;
    }

    @Override // org.nustaq.kontraktor.reactivestreams.KxPublisher
    @CallerSideMethod
    public KxReactiveStreams getKxStreamsInstance() {
        if (this._streams != null) {
            if (this._streams == null) {
                System.out.println("POK");
            }
            return this._streams;
        }
        KxReactiveStreams kxReactiveStreams = ((KxPublisherActor) getActor())._streams;
        if (kxReactiveStreams == null) {
            System.out.println("POK");
        }
        return kxReactiveStreams;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 308548898:
                if (implMethodName.equals("lambda$subscribe$67338a2f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case CRED_DEBUG /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/reactivestreams/impl/KxPublisherActor") && serializedLambda.getImplMethodSignature().equals("(Lorg/reactivestreams/Subscriber;Ljava/lang/Object;Ljava/lang/Object;)V")) {
                    Subscriber subscriber = (Subscriber) serializedLambda.getCapturedArg(0);
                    return (obj, obj2) -> {
                        if (isError(obj2)) {
                            subscriber.onError((Throwable) obj2);
                        } else if (isComplete(obj2)) {
                            subscriber.onComplete();
                        } else {
                            subscriber.onNext(obj);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
