package co.paralleluniverse.strands.channels.reactivestreams;

import co.paralleluniverse.fibers.FiberFactory;
import co.paralleluniverse.strands.SuspendableAction2;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import co.paralleluniverse.strands.channels.Topic;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:co/paralleluniverse/strands/channels/reactivestreams/ReactiveStreams.class */
public class ReactiveStreams {
    public static <T> ReceivePort<T> subscribe(int i, Channels.OverflowPolicy overflowPolicy, Publisher<T> publisher) {
        ChannelSubscriber channelSubscriber = new ChannelSubscriber(Channels.newChannel(i, overflowPolicy, true, true), false);
        publisher.subscribe(channelSubscriber);
        return channelSubscriber;
    }

    public static <T> Publisher<T> toPublisher(ReceivePort<T> receivePort, FiberFactory fiberFactory) {
        return Channels.isTickerChannel(receivePort) ? new ChannelPublisher<T>(fiberFactory, receivePort, false) { // from class: co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // co.paralleluniverse.strands.channels.reactivestreams.ChannelPublisher
            public ChannelSubscription<T> newChannelSubscription(Subscriber<? super T> subscriber, Object obj) {
                return super.newChannelSubscription(subscriber, Channels.newTickerConsumerFor((Channel) obj));
            }
        } : new ChannelPublisher(fiberFactory, receivePort, true);
    }

    public static <T> Publisher<T> toPublisher(ReceivePort<T> receivePort) {
        return toPublisher(receivePort, (FiberFactory) null);
    }

    public static <T> Publisher<T> toPublisher(Topic<T> topic, FiberFactory fiberFactory) {
        return new ChannelPublisher<T>(fiberFactory, topic, false) { // from class: co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams.2
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // co.paralleluniverse.strands.channels.reactivestreams.ChannelPublisher
            public ChannelSubscription<T> newChannelSubscription(Subscriber<? super T> subscriber, Object obj) {
                final Topic topic2 = (Topic) obj;
                final Channel newChannel = Channels.newChannel(0);
                try {
                    topic2.subscribe(newChannel);
                    return new ChannelSubscription<T>(subscriber, newChannel) { // from class: co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams.2.1
                        @Override // co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscription
                        public void cancel() {
                            super.cancel();
                            topic2.unsubscribe(newChannel);
                        }
                    };
                } catch (Exception e) {
                    topic2.unsubscribe(newChannel);
                    throw e;
                }
            }
        };
    }

    public static <T> Publisher<T> toPublisher(Topic<T> topic) {
        return toPublisher(topic, (FiberFactory) null);
    }

    public static <T, R> Processor<T, R> toProcessor(FiberFactory fiberFactory, int i, Channels.OverflowPolicy overflowPolicy, SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> suspendableAction2) {
        return new ChannelProcessor(fiberFactory, false, Channels.newChannel(i, overflowPolicy, true, true), Channels.newChannel(i, overflowPolicy, true, true), suspendableAction2);
    }

    public static <T, R> Processor<T, R> toProcessor(int i, Channels.OverflowPolicy overflowPolicy, SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> suspendableAction2) {
        return toProcessor(null, i, overflowPolicy, suspendableAction2);
    }
}
