package reactor.rx.action.aggregation;

import java.util.concurrent.TimeUnit;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;
import reactor.rx.subscription.ReactiveSubscription;

/* loaded from: input_file:reactor/rx/action/aggregation/WindowAction.class */
public class WindowAction<T> extends BatchAction<T, Stream<T>> {
    private final Environment environment;
    private ReactiveSubscription<T> currentWindow;

    public WindowAction(Environment environment, Dispatcher dispatcher, int i) {
        super(dispatcher, i, true, true, true);
        this.environment = environment;
    }

    public WindowAction(Environment environment, Dispatcher dispatcher, int i, long j, TimeUnit timeUnit, Timer timer) {
        super(dispatcher, i, true, true, true, j, timeUnit, timer);
        this.environment = environment;
    }

    public ReactiveSubscription<T> currentWindow() {
        return this.currentWindow;
    }

    protected Stream<T> createWindowStream() {
        Broadcaster create = (this.timer == null || this.dispatcher != SynchronousDispatcher.INSTANCE) ? Broadcaster.create(this.environment, this.dispatcher) : SerializedBroadcaster.create(this.environment, this.dispatcher);
        ReactiveSubscription<T> reactiveSubscription = new ReactiveSubscription<>(null, create);
        this.currentWindow = reactiveSubscription;
        create.onSubscribe(reactiveSubscription);
        return create;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doError(Throwable th) {
        if (this.currentWindow != null) {
            this.currentWindow.onError(th);
        }
        super.doError(th);
    }

    @Override // reactor.rx.action.aggregation.BatchAction, reactor.rx.action.Action
    protected void doComplete() {
        if (this.currentWindow != null) {
            this.currentWindow.onComplete();
            this.currentWindow = null;
        }
        super.doComplete();
    }

    @Override // reactor.rx.action.aggregation.BatchAction
    protected void firstCallback(T t) {
        broadcastNext(createWindowStream());
    }

    @Override // reactor.rx.action.aggregation.BatchAction
    protected void nextCallback(T t) {
        if (this.currentWindow != null) {
            this.currentWindow.onNext(t);
        }
    }

    @Override // reactor.rx.action.aggregation.BatchAction
    protected void flushCallback(T t) {
        if (this.currentWindow != null) {
            this.currentWindow.onComplete();
            this.currentWindow = null;
        }
    }

    @Override // reactor.rx.Stream
    public final Environment getEnvironment() {
        return this.environment;
    }
}
