package io.vlingo.symbio.store.state.jdbc;

import io.vlingo.actors.Logger;
import io.vlingo.actors.Stage;
import io.vlingo.common.Completes;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Sink;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.reactivestreams.StreamPublisher;
import io.vlingo.reactivestreams.StreamSubscriber;
import io.vlingo.reactivestreams.Streams;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.StateBundle;
import java.sql.ResultSet;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vlingo/symbio/store/state/jdbc/JDBCStateStoreStream.class */
public class JDBCStateStoreStream<RS> implements Stream {
    private final JDBCStorageDelegate<State.TextState> delegate;
    private long flowElementsRate;
    private final Logger logger;
    private Publisher<RS> publisher;
    private final Stage stage;
    private final ResultSet resultSet;
    private ResultSetSource<RS> resultSetSource;
    private final StateAdapterProvider stateAdapterProvider;
    private StateStreamSubscriber<RS> subscriber;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vlingo/symbio/store/state/jdbc/JDBCStateStoreStream$ResultSetSource.class */
    public static final class ResultSetSource<RS> implements Source<RS> {
        final JDBCStorageDelegate<State.TextState> delegate;
        private final long flowElementsRate;
        private final Logger logger;
        private final ResultSet resultSet;
        private final StateAdapterProvider stateAdapterProvider;

        public ResultSetSource(ResultSet resultSet, JDBCStorageDelegate<State.TextState> jDBCStorageDelegate, StateAdapterProvider stateAdapterProvider, long j, Logger logger) {
            this.resultSet = resultSet;
            this.delegate = jDBCStorageDelegate;
            this.stateAdapterProvider = stateAdapterProvider;
            this.flowElementsRate = j;
            this.logger = logger;
        }

        /* JADX WARN: Code restructure failed: missing block: B:16:0x0044, code lost:
        
            r8 = true;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public io.vlingo.common.Completes<io.vlingo.reactivestreams.Elements<RS>> next() {
            /*
                Method dump skipped, instructions count: 260
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: io.vlingo.symbio.store.state.jdbc.JDBCStateStoreStream.ResultSetSource.next():io.vlingo.common.Completes");
        }

        public Completes<Elements<RS>> next(int i) {
            return next();
        }

        public Completes<Elements<RS>> next(long j) {
            return next();
        }

        public Completes<Elements<RS>> next(long j, int i) {
            return next();
        }

        public Completes<Boolean> isSlow() {
            return Completes.withSuccess(false);
        }

        private StateBundle[] arrayFrom(List<StateBundle> list) {
            return (StateBundle[]) list.toArray(new StateBundle[list.size()]);
        }
    }

    /* loaded from: input_file:io/vlingo/symbio/store/state/jdbc/JDBCStateStoreStream$StateStreamSubscriber.class */
    public static class StateStreamSubscriber<RS> extends StreamSubscriber<RS> {
        Subscription subscriptionHook;

        public StateStreamSubscriber(Sink<RS> sink, long j, JDBCStateStoreStream<RS> jDBCStateStoreStream) {
            super(sink, j);
            ((JDBCStateStoreStream) jDBCStateStoreStream).subscriber = this;
        }

        public void onComplete() {
            super.onComplete();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionHook = subscription;
            super.onSubscribe(subscription);
        }
    }

    public JDBCStateStoreStream(Stage stage, JDBCStorageDelegate<State.TextState> jDBCStorageDelegate, StateAdapterProvider stateAdapterProvider, ResultSet resultSet, Logger logger) {
        this.stage = stage;
        this.delegate = jDBCStorageDelegate;
        this.stateAdapterProvider = stateAdapterProvider;
        this.resultSet = resultSet;
        this.logger = logger;
    }

    public void request(long j) {
        this.flowElementsRate = j;
        this.subscriber.subscriptionHook.request(this.flowElementsRate);
    }

    public <S> void flowInto(Sink<S> sink) {
        flowInto(sink, 100L, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j) {
        flowInto(sink, j, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long j, int i) {
        this.flowElementsRate = j;
        PublisherConfiguration with = PublisherConfiguration.with(i, -1, 256, Streams.OverflowPolicy.DropCurrent);
        this.resultSetSource = new ResultSetSource<>(this.resultSet, this.delegate, this.stateAdapterProvider, j, this.logger);
        this.publisher = (Publisher) this.stage.actorFor(Publisher.class, StreamPublisher.class, new Object[]{this.resultSetSource, with});
        this.publisher.subscribe((Subscriber) this.stage.actorFor(Subscriber.class, StateStreamSubscriber.class, new Object[]{sink, Long.valueOf(j), this}));
    }

    public void stop() {
        this.subscriber.subscriptionHook.cancel();
    }
}
