package com.gitlab.tixtix320.kiwi.internal.observable.decorator.single.collect;

import com.gitlab.tixtix320.kiwi.api.observable.ConditionalConsumer;
import com.gitlab.tixtix320.kiwi.api.observable.Observable;
import com.gitlab.tixtix320.kiwi.api.observable.Result;
import com.gitlab.tixtix320.kiwi.api.observable.Subscription;
import com.gitlab.tixtix320.kiwi.internal.observable.BaseObservable;
import com.gitlab.tixtix320.kiwi.internal.observable.decorator.DecoratorObservable;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;

/* loaded from: input_file:com/gitlab/tixtix320/kiwi/internal/observable/decorator/single/collect/CollectorObservable.class */
public abstract class CollectorObservable<S, R> extends DecoratorObservable<R> {
    private final BaseObservable<S> observable;
    private final Queue<S> objects = new ConcurrentLinkedQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CollectorObservable(BaseObservable<S> baseObservable) {
        this.observable = baseObservable;
    }

    @Override // com.gitlab.tixtix320.kiwi.api.observable.Observable
    public Subscription subscribeAndHandle(ConditionalConsumer<? super Result<? extends R>> conditionalConsumer) {
        Subscription subscribeAndHandle = this.observable.subscribeAndHandle(result -> {
            this.objects.add(result.getValue());
            return true;
        });
        this.observable.onComplete(() -> {
            conditionalConsumer.consume(Result.of(collect(this.objects.stream()), false));
            this.objects.clear();
        });
        return subscribeAndHandle;
    }

    protected abstract R collect(Stream<S> stream);

    @Override // com.gitlab.tixtix320.kiwi.internal.observable.decorator.DecoratorObservable
    protected Collection<Observable<?>> observables() {
        return Collections.singleton(this.observable);
    }
}
