package net.pincette.rs;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Flow;
import net.pincette.util.ScheduledCompletionStage;

/* loaded from: input_file:net/pincette/rs/AskForever.class */
public class AskForever<T> extends ProcessorBase<T, T> {
    private final Duration timeout;
    private boolean completed;
    private Instant last = Instant.now();

    public AskForever(Duration duration) {
        if (duration == null || duration.isZero() || duration.isNegative()) {
            throw new IllegalArgumentException("The timeout should be positive.");
        }
        this.timeout = duration;
    }

    public static <T> Flow.Processor<T, T> askForever(Duration duration) {
        return new AskForever(duration);
    }

    @Override // net.pincette.rs.ProcessorBase
    protected void emit(long j) {
        this.subscription.request(j);
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        this.completed = true;
        super.onComplete();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        this.last = Instant.now();
        this.subscriber.onNext(t);
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        super.onSubscribe(subscription);
        runTimeout();
    }

    private void runTimeout() {
        ScheduledCompletionStage.runAsyncAfter(() -> {
            if (this.completed) {
                return;
            }
            if (Duration.between(this.last, Instant.now()).compareTo(this.timeout) > 0) {
                this.subscription.request(1L);
            }
            runTimeout();
        }, this.timeout);
    }
}
