package net.pincette.rs;

import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Supplier;
import net.pincette.util.Util;

/* loaded from: input_file:net/pincette/rs/Buffered.class */
public abstract class Buffered<T, R> extends ProcessorBase<T, R> {
    private final Deque<R> buf = new ConcurrentLinkedDeque();
    private final long requestSize;
    private boolean completed;
    private boolean completedSent;
    private boolean lastRequested;
    private long received;
    private long requested;
    private long requestedUpstream;

    /* JADX INFO: Access modifiers changed from: protected */
    public Buffered(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Request size should be at least 1.");
        }
        this.requestSize = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addValues(List<R> list) {
        trace(() -> {
            return "addValues values: " + list;
        });
        Deque<R> deque = this.buf;
        Objects.requireNonNull(deque);
        list.forEach(deque::addFirst);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatch(Runnable runnable) {
        Serializer.dispatch(runnable);
    }

    private boolean done() {
        return this.completed && (this.received == 0 || this.buf.isEmpty());
    }

    private void doLast() {
        if (this.lastRequested) {
            return;
        }
        this.lastRequested = true;
        last();
    }

    @Override // net.pincette.rs.ProcessorBase
    protected void emit(long j) {
        trace(() -> {
            return "dispatch emit number: " + j;
        });
        dispatch(() -> {
            trace(() -> {
                return "emit number: " + j;
            });
            this.requested += j;
            more();
            emit();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit() {
        trace(() -> {
            return "dispatch emit";
        });
        dispatch(() -> {
            trace(() -> {
                return "emit";
            });
            if (getRequested() > 0) {
                trace(() -> {
                    return "emit buf: " + this.buf;
                });
                trace(() -> {
                    return "emit requested: " + getRequested();
                });
                Util.nextValues(this.buf, getRequested()).ifPresent(list -> {
                    this.requested -= list.size();
                    sendValues(list);
                });
                more();
            }
        });
    }

    protected long getRequested() {
        return this.requested;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isCompleted() {
        return this.completed;
    }

    protected void last() {
    }

    private void more() {
        trace(() -> {
            return "dispatch more";
        });
        dispatch(() -> {
            trace(() -> {
                return "more";
            });
            if (needMore()) {
                this.requestedUpstream += this.requestSize;
                trace(() -> {
                    return "more requestedUpstream: " + this.requestedUpstream;
                });
                trace(() -> {
                    return "more subscription request: " + this.requestSize;
                });
                this.subscription.request(this.requestSize);
            }
        });
    }

    private boolean needMore() {
        return !isCompleted() && this.received == this.requestedUpstream && getRequested() > ((long) this.buf.size());
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        trace(() -> {
            return "dispatch onComplete";
        });
        dispatch(() -> {
            trace(() -> {
                return "onComplete buf: " + this.buf;
            });
            this.completed = true;
            doLast();
            if (!done()) {
                emit();
            } else {
                trace(() -> {
                    return "sendComplete from onComplete";
                });
                sendComplete();
            }
        });
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("Can't throw null.");
        }
        setError(true);
        this.subscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        if (t == null) {
            throw new NullPointerException("Can't emit null.");
        }
        if (getError()) {
            return;
        }
        if (this.received != this.requestedUpstream) {
            trace(() -> {
                return "dispatch onNext value: " + t;
            });
            dispatch(() -> {
                this.received++;
                trace(() -> {
                    return "onNext received: " + this.received;
                });
                if (onNextAction(t)) {
                    return;
                }
                trace(() -> {
                    return "onNext onNextAction false";
                });
                more();
            });
        } else {
            String name = this.subscription.getClass().getName();
            long j = this.requestedUpstream;
            getClass().getName();
            Util.GeneralException generalException = new Util.GeneralException("Backpressure violation in " + name + ". Requested " + j + " elements in " + generalException + ", which have already been received.");
            throw generalException;
        }
    }

    protected abstract boolean onNextAction(T t);

    private void sendComplete() {
        trace(() -> {
            return "dispatch sendComplete";
        });
        dispatch(() -> {
            if (this.completedSent) {
                return;
            }
            this.completedSent = true;
            trace(() -> {
                return "send onComplete";
            });
            this.subscriber.onComplete();
        });
    }

    private void sendValues(List<R> list) {
        if (getError()) {
            return;
        }
        trace(() -> {
            return "dispatch values: " + list;
        });
        list.forEach(obj -> {
            dispatch(() -> {
                trace(() -> {
                    return "sendValue: " + obj;
                });
                this.subscriber.onNext(obj);
            });
        });
        dispatch(() -> {
            if (this.completed) {
                doLast();
                if (this.buf.isEmpty()) {
                    trace(() -> {
                        return "sendComplete from sendValues";
                    });
                    sendComplete();
                }
            }
        });
    }

    private void trace(Supplier<String> supplier) {
        Util.LOGGER.finest(() -> {
            return getClass().getName() + ": " + ((String) supplier.get());
        });
    }
}
