package org.nustaq.kontraktor.reactivestreams;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Iterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.ActorProxy;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.annotations.CallerSideMethod;
import org.nustaq.kontraktor.impl.SimpleScheduler;
import org.nustaq.kontraktor.reactivestreams.impl.KxPublisherActor;
import org.nustaq.kontraktor.reactivestreams.impl.KxSubscriber;
import org.nustaq.kontraktor.reactivestreams.impl.SyncProcessor;
import org.nustaq.kontraktor.remoting.base.ActorClientConnector;
import org.nustaq.kontraktor.remoting.base.ActorPublisher;
import org.nustaq.kontraktor.remoting.base.ConnectableActor;
import org.nustaq.kontraktor.util.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:org/nustaq/kontraktor/reactivestreams/KxReactiveStreams.class */
public class KxReactiveStreams extends Actors {
    public static final int MAX_BATCH_SIZE = 50000;
    public static final int DEFQSIZE = 128000;
    public static final int DEFBATCHSIZE = 50000;
    public static int REQU_NEXT_DIVISOR = 1;
    protected static KxReactiveStreams instance = new KxReactiveStreams(true);
    protected int batchSize;
    SimpleScheduler scheduler;

    public static KxReactiveStreams get() {
        return instance;
    }

    public KxReactiveStreams() {
        this(false);
    }

    public KxReactiveStreams(boolean z) {
        this(50000, DEFQSIZE, z);
    }

    public KxReactiveStreams(int i, int i2, boolean z) {
        this.batchSize = 50000;
        if (i * 2 > i2) {
            throw new RuntimeException("queuesize must be >= 2 * batchSize");
        }
        this.scheduler = new SimpleScheduler(i2, z);
        this.batchSize = i;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void terminateScheduler() {
        this.scheduler.setKeepAlive(false);
    }

    public <T> KxPublisher<T> asKxPublisher(final Publisher<T> publisher) {
        return publisher instanceof KxPublisher ? (KxPublisher) publisher : new KxPublisher<T>() { // from class: org.nustaq.kontraktor.reactivestreams.KxReactiveStreams.1
            public void subscribe(Subscriber<? super T> subscriber) {
                publisher.subscribe(subscriber);
            }

            @Override // org.nustaq.kontraktor.reactivestreams.KxPublisher
            @CallerSideMethod
            public KxReactiveStreams getKxStreamsInstance() {
                return KxReactiveStreams.this;
            }
        };
    }

    public <T> Subscriber<T> subscriber(Callback<T> callback) {
        return subscriber(this.batchSize, callback);
    }

    public <T> Subscriber<T> subscriber(int i, Callback<T> callback) {
        if (i > 50000) {
            throw new RuntimeException("batch size exceeds maximum of 50000");
        }
        return new KxSubscriber(i, callback);
    }

    public <T> KxPublisher<T> connect(Class<T> cls, ConnectableActor connectableActor) {
        return (KxPublisher) connect(cls, connectableActor, null).await();
    }

    public <T> KxPublisher<T> produce(Stream<T> stream) {
        return produce(this.batchSize, stream.iterator());
    }

    public <T> KxPublisher<T> produce(Collection<T> collection) {
        return produce(this.batchSize, collection.iterator());
    }

    public KxPublisher<Integer> produce(IntStream intStream) {
        return produce(this.batchSize, intStream.mapToObj(i -> {
            return Integer.valueOf(i);
        }).iterator());
    }

    public KxPublisher<Long> produce(LongStream longStream) {
        return produce(this.batchSize, longStream.mapToObj(j -> {
            return Long.valueOf(j);
        }).iterator());
    }

    public KxPublisher<Double> produce(DoubleStream doubleStream) {
        return produce(this.batchSize, doubleStream.mapToObj(d -> {
            return Double.valueOf(d);
        }).iterator());
    }

    public <T> KxPublisher<T> produce(int i, Stream<T> stream) {
        return produce(i, stream.iterator());
    }

    public <T> KxPublisher<T> produce(Iterator<T> it) {
        return produce(this.batchSize, it);
    }

    public <T> KxPublisher<T> produce(int i, Iterator<T> it) {
        if (i > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor kxPublisherActor = (KxPublisherActor) Actors.AsActor(KxPublisherActor.class, this.scheduler);
        kxPublisherActor._streams = this;
        ((KxPublisherActor) kxPublisherActor.getActor())._streams = this;
        kxPublisherActor.setBatchSize(i);
        kxPublisherActor.setThrowExWhenBlocked(true);
        kxPublisherActor.initFromIterator(it);
        return kxPublisherActor;
    }

    public <T> Stream<T> stream(Publisher<T> publisher) {
        return stream(publisher, this.batchSize);
    }

    public <T> Stream<T> stream(Publisher publisher, int i) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator(publisher, i), 1280), false);
    }

    public <T> Iterator<T> iterator(Publisher<T> publisher) {
        return iterator(publisher, this.batchSize);
    }

    public <T> Iterator<T> iterator(Publisher<T> publisher, int i) {
        if (i > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxSubscriber kxSubscriber = new KxSubscriber(i);
        publisher.subscribe(kxSubscriber);
        return kxSubscriber;
    }

    public <T> IPromise<KxPublisher<T>> connect(Class<T> cls, ConnectableActor connectableActor, Callback<ActorClientConnector> callback) {
        IPromise connect = connectableActor.actorClass(KxPublisherActor.class).inboundQueueSize(this.scheduler.getDefaultQSize()).connect((actorClientConnector, obj) -> {
            Log.Info(this, "Client disconnected");
            actorClientConnector.closeClient();
            if (callback != null) {
                callback.complete(actorClientConnector, obj);
            }
        }, actor -> {
            if (((KxPublisherActor) actor)._callerSideSubscribers != null) {
                ((KxPublisherActor) actor)._callerSideSubscribers.forEach(obj2 -> {
                    ((Subscriber) obj2).onError(new IOException("connection lost"));
                });
                ((KxPublisherActor) actor)._callerSideSubscribers = null;
            }
        });
        Promise promise = new Promise();
        connect.then((kxPublisher, obj2) -> {
            if (kxPublisher == null) {
                promise.reject(obj2);
            } else {
                ((KxPublisherActor) kxPublisher)._streams = this;
                promise.resolve(kxPublisher);
            }
        });
        return promise;
    }

    public <OUT> IPromise serve(Publisher<OUT> publisher, ActorPublisher actorPublisher, boolean z, Consumer<Actor> consumer) {
        if (actorPublisher.getClass().getSimpleName().equals("HttpPublisher")) {
            throw new RuntimeException("Http long poll cannot be supported. Use WebSockets instead.");
        }
        if (!(publisher instanceof KxPublisherActor) || !(publisher instanceof ActorProxy)) {
            Publisher<OUT> newAsyncProcessor = newAsyncProcessor(obj -> {
                return obj;
            });
            publisher.subscribe(newAsyncProcessor);
            publisher = newAsyncProcessor;
        }
        ((KxPublisherActor) publisher).setCloseOnComplete(z);
        return actorPublisher.facade((Actor) publisher).publish(consumer);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> function) {
        return newAsyncProcessor(function, this.scheduler, this.batchSize);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> function, int i) {
        return newAsyncProcessor(function, this.scheduler, i);
    }

    public <IN, OUT> Processor<IN, OUT> newAsyncProcessor(Function<IN, OUT> function, Scheduler scheduler, int i) {
        if (i > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor kxPublisherActor = (KxPublisherActor) Actors.AsActor(KxPublisherActor.class, scheduler);
        kxPublisherActor._streams = this;
        ((KxPublisherActor) kxPublisherActor.getActor())._streams = this;
        kxPublisherActor.setBatchSize(i);
        kxPublisherActor.setThrowExWhenBlocked(true);
        kxPublisherActor.init(function);
        return kxPublisherActor;
    }

    public <IN, OUT> Processor<IN, OUT> newSyncProcessor(Function<IN, OUT> function) {
        return new SyncProcessor(this.batchSize, function, this);
    }

    public <T, OUT> Processor<T, OUT> newLossyProcessor(Function<T, OUT> function, int i) {
        if (i > 50000) {
            throw new RuntimeException("batch size exceeds max of 50000");
        }
        KxPublisherActor kxPublisherActor = (KxPublisherActor) Actors.AsActor(KxPublisherActor.class, this.scheduler);
        kxPublisherActor._streams = this;
        ((KxPublisherActor) kxPublisherActor.getActor())._streams = this;
        kxPublisherActor.setBatchSize(i);
        kxPublisherActor.setThrowExWhenBlocked(true);
        kxPublisherActor.setLossy(true);
        kxPublisherActor.init(function);
        return kxPublisherActor;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 407866063:
                if (implMethodName.equals("lambda$connect$7c519408$1")) {
                    z = false;
                    break;
                }
                break;
            case 1034786821:
                if (implMethodName.equals("lambda$connect$fba679c1$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case KxPublisherActor.CRED_DEBUG /* 0 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/reactivestreams/KxReactiveStreams") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Promise;Lorg/nustaq/kontraktor/reactivestreams/KxPublisher;Ljava/lang/Object;)V")) {
                    KxReactiveStreams kxReactiveStreams = (KxReactiveStreams) serializedLambda.getCapturedArg(0);
                    Promise promise = (Promise) serializedLambda.getCapturedArg(1);
                    return (kxPublisher, obj2) -> {
                        if (kxPublisher == null) {
                            promise.reject(obj2);
                        } else {
                            ((KxPublisherActor) kxPublisher)._streams = this;
                            promise.resolve(kxPublisher);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/nustaq/kontraktor/Callback") && serializedLambda.getFunctionalInterfaceMethodName().equals("complete") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/nustaq/kontraktor/reactivestreams/KxReactiveStreams") && serializedLambda.getImplMethodSignature().equals("(Lorg/nustaq/kontraktor/Callback;Lorg/nustaq/kontraktor/remoting/base/ActorClientConnector;Ljava/lang/Object;)V")) {
                    KxReactiveStreams kxReactiveStreams2 = (KxReactiveStreams) serializedLambda.getCapturedArg(0);
                    Callback callback = (Callback) serializedLambda.getCapturedArg(1);
                    return (actorClientConnector, obj) -> {
                        Log.Info(this, "Client disconnected");
                        actorClientConnector.closeClient();
                        if (callback != null) {
                            callback.complete(actorClientConnector, obj);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
