package net.pincette.rs;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.logging.Logger;
import net.pincette.util.ScheduledCompletionStage;

/* loaded from: input_file:net/pincette/rs/Buffered.class */
public abstract class Buffered<T, R> extends ProcessorBase<T, R> {
    private final Deque<R> buf;
    private final Logger logger;
    private final long requestSize;
    private final Duration timeout;
    private boolean cancelled;
    private boolean completed;
    private boolean completedSent;
    private boolean lastRequested;
    private long received;
    private long requested;
    private long requestedUpstream;

    /* JADX INFO: Access modifiers changed from: protected */
    public Buffered(int i) {
        this(i, Duration.ofNanos(0L));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Buffered(int i, Duration duration) {
        this.buf = new ArrayDeque(1000);
        this.logger = Logger.getLogger(getClass().getName());
        if (i < 1) {
            throw new IllegalArgumentException("Request size should be at least 1.");
        }
        if (duration != null && duration.isNegative()) {
            throw new IllegalArgumentException("The timeout should be positive.");
        }
        this.requestSize = i;
        this.timeout = duration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addValues(List<R> list) {
        Util.trace(this.logger, () -> {
            return "addValues values: " + list;
        });
        Deque<R> deque = this.buf;
        Objects.requireNonNull(deque);
        list.forEach(deque::addFirst);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.pincette.rs.ProcessorBase
    public void cancelling() {
        dispatch(() -> {
            this.cancelled = true;
            doLast();
            emit();
        });
        super.cancelling();
    }

    private boolean done() {
        return this.completed && (this.received == 0 || this.buf.isEmpty());
    }

    private void doLast() {
        if (this.lastRequested) {
            return;
        }
        this.lastRequested = true;
        last();
    }

    @Override // net.pincette.rs.ProcessorBase
    protected void emit(long j) {
        Util.trace(this.logger, () -> {
            return "dispatch emit number: " + j;
        });
        dispatch(() -> {
            Util.trace(this.logger, () -> {
                return "emit number: " + j;
            });
            this.requested += j;
            more();
            emit();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit() {
        Util.trace(this.logger, () -> {
            return "dispatch emit";
        });
        dispatch(() -> {
            Util.trace(this.logger, () -> {
                return "emit";
            });
            if (getRequested() > 0) {
                Util.trace(this.logger, () -> {
                    return "emit buf: " + this.buf;
                });
                Util.trace(this.logger, () -> {
                    return "emit requested: " + getRequested();
                });
                Util.nextValues(this.buf, getRequested()).ifPresent(list -> {
                    this.requested -= list.size();
                    sendValues(list);
                });
                more();
            }
        });
    }

    protected long getRequested() {
        return this.requested;
    }

    protected boolean isCancelled() {
        return this.cancelled;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isCompleted() {
        return this.completed;
    }

    protected void last() {
    }

    private void keepItGoing() {
        if (shouldWakeUp()) {
            Util.trace(this.logger, () -> {
                return "shouldWakeUp";
            });
            more(1L);
        }
    }

    private void more() {
        Util.trace(this.logger, () -> {
            return "dispatch more";
        });
        dispatch(() -> {
            Util.trace(this.logger, () -> {
                return "more";
            });
            if (needMore()) {
                Util.trace(this.logger, () -> {
                    return "needMore";
                });
                more(this.requestSize);
            } else {
                if (this.timeout == null || !this.timeout.isZero()) {
                    return;
                }
                keepItGoing();
            }
        });
    }

    private void more(long j) {
        this.requestedUpstream += j;
        Util.trace(this.logger, () -> {
            return "more requestedUpstream: " + this.requestedUpstream;
        });
        Util.trace(this.logger, () -> {
            return "more subscription request: " + j;
        });
        this.subscription.request(j);
    }

    private boolean needMore() {
        return !isCompleted() && !isCancelled() && this.received == this.requestedUpstream && getRequested() > ((long) this.buf.size());
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        Util.trace(this.logger, () -> {
            return "dispatch onComplete";
        });
        dispatch(() -> {
            Util.trace(this.logger, () -> {
                return "onComplete buf: " + this.buf;
            });
            this.completed = true;
            doLast();
            if (!done()) {
                emit();
            } else {
                Util.trace(this.logger, () -> {
                    return "sendComplete from onComplete";
                });
                sendComplete();
            }
        });
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("Can't throw null.");
        }
        dispatch(() -> {
            setError(true);
            this.subscriber.onError(th);
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        if (t == null) {
            throw new NullPointerException("Can't emit null.");
        }
        if (getError()) {
            return;
        }
        Util.trace(this.logger, () -> {
            return "dispatch onNext value: " + t;
        });
        dispatch(() -> {
            if (this.received == this.requestedUpstream) {
                Util.throwBackpressureViolation(this, this.subscription, this.requestedUpstream);
            }
            this.received++;
            Util.trace(this.logger, () -> {
                long j = this.received;
                long j2 = this.requestedUpstream;
                this.buf.size();
                return "onNext received: " + j + ", requested upstream: " + j + ", buffer size: " + j2;
            });
            if (!onNextAction(t)) {
                Util.trace(this.logger, () -> {
                    return "onNext onNextAction false";
                });
                more();
            }
            Util.trace(this.logger, () -> {
                return "onNextAction buffer size: " + this.buf.size();
            });
        });
    }

    protected abstract boolean onNextAction(T t);

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        super.onSubscribe(subscription);
        if (this.timeout == null || this.timeout.isZero()) {
            return;
        }
        runRequestTimeout();
    }

    private void runRequestTimeout() {
        ScheduledCompletionStage.runAsyncAfter(() -> {
            dispatch(() -> {
                if (isCompleted() || getError()) {
                    return;
                }
                runRequestTimeout();
                keepItGoing();
            });
        }, this.timeout);
    }

    private void sendComplete() {
        Util.trace(this.logger, () -> {
            return "dispatch sendComplete";
        });
        dispatch(() -> {
            if (this.completedSent) {
                return;
            }
            this.completedSent = true;
            Util.trace(this.logger, () -> {
                return "send onComplete";
            });
            this.subscriber.onComplete();
        });
    }

    private void sendValues(List<R> list) {
        if (getError()) {
            return;
        }
        Util.trace(this.logger, () -> {
            return "dispatch values: " + list;
        });
        list.forEach(obj -> {
            dispatch(() -> {
                Util.trace(this.logger, () -> {
                    return "sendValue: " + obj;
                });
                this.subscriber.onNext(obj);
            });
        });
        dispatch(() -> {
            if (this.completed) {
                doLast();
                if (this.buf.isEmpty()) {
                    Util.trace(this.logger, () -> {
                        return "sendComplete from sendValues";
                    });
                    sendComplete();
                }
            }
        });
    }

    private boolean shouldWakeUp() {
        return !isCompleted() && !getError() && this.received < this.requestedUpstream && ((long) this.buf.size()) < this.requestSize && getRequested() > 0;
    }
}
