package net.pincette.rs;

import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.function.Function;

/* loaded from: input_file:net/pincette/rs/Commit.class */
public class Commit<T> extends ProcessorBase<T, T> {
    private final Function<List<T>, CompletionStage<Boolean>> fn;
    private final Deque<T> uncommitted = new ConcurrentLinkedDeque();
    private boolean completed;
    private long requested;

    public Commit(Function<List<T>, CompletionStage<Boolean>> function) {
        this.fn = function;
    }

    public static <T> Flow.Processor<T, T> commit(Function<List<T>, CompletionStage<Boolean>> function) {
        return new Commit(function);
    }

    @Override // net.pincette.rs.ProcessorBase
    protected void emit(long j) {
        ((CompletionStage) last().map(this.fn).orElseGet(() -> {
            return CompletableFuture.completedFuture(true);
        })).thenAccept(bool -> {
            if (Boolean.TRUE.equals(bool)) {
                Serializer.dispatch(() -> {
                    if (this.completed) {
                        super.onComplete();
                    } else {
                        request(j);
                    }
                });
            }
        });
    }

    private Optional<List<T>> last() {
        ArrayList arrayList = new ArrayList();
        while (!this.uncommitted.isEmpty()) {
            arrayList.add(this.uncommitted.removeLast());
        }
        return Optional.of(arrayList).filter(list -> {
            return !list.isEmpty();
        });
    }

    @Override // net.pincette.rs.ProcessorBase, java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        Serializer.dispatch(() -> {
            this.completed = true;
            if (this.requested > 0) {
                super.onComplete();
            }
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        Serializer.dispatch(() -> {
            this.uncommitted.addFirst(t);
            this.requested--;
            this.subscriber.onNext(t);
        });
    }

    private void request(long j) {
        this.requested += j;
        this.subscription.request(j);
    }
}
