package me.escoffier.fluid.models;

import io.reactivex.Flowable;
import io.reactivex.Single;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.util.Strings;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:me/escoffier/fluid/models/DefaultSource.class */
public class DefaultSource<T> implements Source<T> {
    public static final String FUNCTION_CANNOT_BE_NULL_MESSAGE = "The `mapper` function cannot be `null`";
    public static final String FILTER_CANNOT_BE_NULL_MESSAGE = "The `filter` function cannot be `null`";
    private final Publisher<Message<T>> flow;
    private final String name;
    private final Map<String, Object> attributes;

    public DefaultSource(Publisher<Message<T>> publisher, String str, Map<String, Object> map) {
        Objects.requireNonNull(publisher, "The given `items` cannot be `null`");
        this.flow = publisher;
        this.name = str;
        if (map != null) {
            this.attributes = Collections.unmodifiableMap(new HashMap(map));
        } else {
            this.attributes = Collections.emptyMap();
        }
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> named(String str) {
        if (Strings.isBlank(str)) {
            throw new IllegalArgumentException("The name cannot be `null` or blank");
        }
        return new DefaultSource(this.flow, str, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> unnamed() {
        return new DefaultSource(this.flow, null, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> withAttribute(String str, Object obj) {
        HashMap hashMap = new HashMap(this.attributes);
        hashMap.put(Objects.requireNonNull(str, "The key must not be `null`"), Objects.requireNonNull(obj, "The value must not be `null`"));
        return new DefaultSource(this.flow, this.name, hashMap);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> withoutAttribute(String str) {
        Objects.requireNonNull(str, "name must not be `null`");
        HashMap hashMap = new HashMap(this.attributes);
        hashMap.remove(str);
        return new DefaultSource(this.flow, this.name, hashMap);
    }

    public void subscribe(Subscriber<? super Message<T>> subscriber) {
        this.flow.subscribe(subscriber);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> orElse(Source<T> source) {
        Objects.requireNonNull(source, "The alternative source must not be `null`");
        return new DefaultSource(Flowable.fromPublisher(this.flow).switchIfEmpty(source), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public String name() {
        return this.name;
    }

    @Override // me.escoffier.fluid.models.Source
    public Optional<T> attr(String str) {
        return Optional.ofNullable(this.attributes.get(Objects.requireNonNull(str, "The key must not be `null`")));
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> map(Function<Message<T>, Message<X>> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        function.getClass();
        return new DefaultSource(fromPublisher.map((v1) -> {
            return r3.apply(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> mapPayload(Function<T, X> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).map(message -> {
            return message.with(function.apply(message.payload()));
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> filter(Predicate<Message<T>> predicate) {
        Objects.requireNonNull(predicate, FILTER_CANNOT_BE_NULL_MESSAGE);
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        predicate.getClass();
        return new DefaultSource(fromPublisher.filter((v1) -> {
            return r3.test(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> filterPayload(Predicate<T> predicate) {
        Objects.requireNonNull(predicate, FILTER_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).filter(message -> {
            return predicate.test(message.payload());
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> filterNot(Predicate<Message<T>> predicate) {
        Objects.requireNonNull(predicate, FILTER_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).filter(message -> {
            return !predicate.test(message);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> filterNotPayload(Predicate<T> predicate) {
        Objects.requireNonNull(predicate, FILTER_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).filter(message -> {
            return !predicate.test(message.payload());
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> flatMap(Function<Message<T>, Publisher<Message<X>>> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        function.getClass();
        return new DefaultSource(fromPublisher.flatMap((v1) -> {
            return r3.apply(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> concatMap(Function<Message<T>, Publisher<Message<X>>> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        function.getClass();
        return new DefaultSource(fromPublisher.concatMap((v1) -> {
            return r3.apply(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> flatMap(Function<Message<T>, Publisher<Message<X>>> function, int i) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        if (i < 1) {
            throw new IllegalArgumentException("The `maxConcurrency` cannot be less than 1");
        }
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        function.getClass();
        return new DefaultSource(fromPublisher.flatMap((v1) -> {
            return r3.apply(v1);
        }, i), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> flatMapPayload(Function<T, Publisher<X>> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).flatMap(message -> {
            Flowable fromPublisher = Flowable.fromPublisher((Publisher) function.apply(message.payload()));
            message.getClass();
            return fromPublisher.map(message::with);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> concatMapPayload(Function<T, Publisher<X>> function) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).concatMap(message -> {
            Flowable fromPublisher = Flowable.fromPublisher((Publisher) function.apply(message.payload()));
            message.getClass();
            return fromPublisher.map(message::with);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> flatMapPayload(Function<T, Publisher<X>> function, int i) {
        Objects.requireNonNull(function, FUNCTION_CANNOT_BE_NULL_MESSAGE);
        return new DefaultSource(Flowable.fromPublisher(this.flow).flatMap(message -> {
            Flowable fromPublisher = Flowable.fromPublisher((Publisher) function.apply(message.payload()));
            message.getClass();
            return fromPublisher.map(message::with);
        }, i), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> scan(Message<X> message, BiFunction<Message<X>, Message<T>, Message<X>> biFunction) {
        Objects.requireNonNull(biFunction, "The `function` cannot be `null`");
        Objects.requireNonNull(message, "The `zero` item (seed) cannot be `null`");
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        biFunction.getClass();
        return new DefaultSource(fromPublisher.scan(message, (v1, v2) -> {
            return r2.apply(v1, v2);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> scanPayloads(X x, BiFunction<X, T, X> biFunction) {
        Objects.requireNonNull(biFunction, "The `function` cannot be `null`");
        Flowable map = Flowable.fromPublisher(this.flow).map((v0) -> {
            return v0.payload();
        });
        biFunction.getClass();
        return new DefaultSource(map.scan(x, biFunction::apply).map(Message::new), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <K> Publisher<GroupedDataStream<K, T>> groupBy(Function<Message<T>, K> function) {
        Objects.requireNonNull(function, "The function computing the key must not be `null`");
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        function.getClass();
        return fromPublisher.groupBy((v1) -> {
            return r1.apply(v1);
        }).flatMapSingle(groupedFlowable -> {
            return Single.just(new GroupedDataStream(groupedFlowable.getKey(), groupedFlowable));
        });
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> log(String str) {
        return new DefaultSource(Flowable.fromPublisher(this.flow).doOnNext(message -> {
            if (this.name != null) {
                LogManager.getLogger(str).info("Received data on source '" + this.name + "': " + message.toString());
            } else {
                LogManager.getLogger(str).info("Received data on unnamed source: " + message.toString());
            }
        }).doOnComplete(() -> {
            if (this.name != null) {
                LogManager.getLogger(str).info("End of emissions on source '" + this.name + "'");
            } else {
                LogManager.getLogger(str).info("End of emissions on unnamed source");
            }
        }).doOnError(th -> {
            if (this.name != null) {
                LogManager.getLogger(str).error("Error emitted on source '" + this.name + "'", th);
            } else {
                LogManager.getLogger(str).error("Error emitted on unnamed source", th);
            }
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public List<Source<T>> broadcast(int i) {
        if (i <= 1) {
            throw new IllegalArgumentException("The number of branch must be at least 2");
        }
        ArrayList arrayList = new ArrayList(i);
        Flowable autoConnect = Flowable.fromPublisher(this.flow).publish().autoConnect(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new DefaultSource(autoConnect, this.name, this.attributes));
        }
        return arrayList;
    }

    @Override // me.escoffier.fluid.models.Source
    public Map<String, Source<T>> broadcast(String... strArr) {
        if (strArr == null || strArr.length <= 1) {
            throw new IllegalArgumentException("The number of branch must be at least 2");
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Flowable autoConnect = Flowable.fromPublisher(this.flow).publish().autoConnect(strArr.length);
        for (String str : strArr) {
            if (Strings.isBlank(str)) {
                throw new IllegalArgumentException("Illegal name for source. The name must not be `null` or blank");
            }
            linkedHashMap.put(str, new DefaultSource(autoConnect, str, this.attributes));
        }
        return linkedHashMap;
    }

    @Override // me.escoffier.fluid.models.Source
    public Pair<Source<T>, Source<T>> branch(Predicate<Message<T>> predicate) {
        List<Source<T>> broadcast = broadcast(2);
        return Pair.pair(broadcast.get(0).filter(predicate), broadcast.get(0).filterNot(predicate));
    }

    @Override // me.escoffier.fluid.models.Source
    public Pair<Source<T>, Source<T>> branchOnPayload(Predicate<T> predicate) {
        List<Source<T>> broadcast = broadcast(2);
        return Pair.pair(broadcast.get(0).filterPayload(predicate), broadcast.get(0).filterNotPayload(predicate));
    }

    @Override // me.escoffier.fluid.models.Source
    public Sink<T> to(Sink<T> sink) {
        Objects.requireNonNull(sink, "The sink must not be `null`");
        Flowable fromPublisher = Flowable.fromPublisher(this.flow);
        sink.getClass();
        fromPublisher.flatMapCompletable(sink::dispatch).doOnError((v0) -> {
            v0.printStackTrace();
        }).subscribe();
        return sink;
    }

    @Override // me.escoffier.fluid.models.Source
    public Flowable<Message<T>> asFlowable() {
        return Flowable.fromPublisher(this);
    }

    @Override // me.escoffier.fluid.models.Source
    public <O> Source<Pair<T, O>> zipWith(Publisher<Message<O>> publisher) {
        return new DefaultSource(asFlowable().zipWith(publisher, (message, message2) -> {
            return message.with(Pair.pair(message.payload(), message2.payload()));
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<Tuple> zipWith(Source... sourceArr) {
        return zipWith((Publisher<Message>[]) Arrays.stream(sourceArr).map((v0) -> {
            return v0.asFlowable();
        }).toArray(i -> {
            return new Publisher[i];
        }));
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<Tuple> zipWith(Publisher<Message>... publisherArr) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Flowable.fromPublisher(this));
        for (Publisher<Message> publisher : publisherArr) {
            arrayList.add(Flowable.fromPublisher(publisher));
        }
        return new DefaultSource(Flowable.zip(arrayList, objArr -> {
            ArrayList arrayList2 = new ArrayList();
            Message message = null;
            for (Object obj : objArr) {
                if (!(obj instanceof Message)) {
                    throw new IllegalArgumentException("Invalid incoming item - " + Message.class.getName() + " expected, received " + obj.getClass().getName());
                }
                if (message == null) {
                    message = (Message) obj;
                }
                arrayList2.add(((Message) obj).payload());
            }
            if (message == null) {
                throw new IllegalStateException("Invalid set of stream");
            }
            return message.with(Tuple.tuple(arrayList2.toArray(new Object[arrayList2.size()])));
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> mergeWith(Publisher<Message<T>> publisher) {
        return new DefaultSource(asFlowable().mergeWith(publisher), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public Source<T> mergeWith(Publisher<Message<T>>... publisherArr) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this);
        arrayList.addAll(Arrays.asList(publisherArr));
        return new DefaultSource(Flowable.merge(arrayList), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> compose(Function<Publisher<Message<T>>, Publisher<Message<X>>> function) {
        Flowable<Message<T>> asFlowable = asFlowable();
        function.getClass();
        return new DefaultSource(asFlowable.compose((v1) -> {
            return r3.apply(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> composeFlowable(Function<Flowable<Message<T>>, Flowable<Message<X>>> function) {
        Flowable<Message<T>> asFlowable = asFlowable();
        function.getClass();
        return new DefaultSource(asFlowable.compose((v1) -> {
            return r3.apply(v1);
        }), this.name, this.attributes);
    }

    @Override // me.escoffier.fluid.models.Source
    public <X> Source<X> composePayloadFlowable(Function<Flowable<T>, Flowable<X>> function) {
        return new DefaultSource(asFlowable().compose(flowable -> {
            return Flowable.defer(() -> {
                AtomicReference atomicReference = new AtomicReference();
                atomicReference.getClass();
                return ((Flowable) function.apply(flowable.doOnNext((v1) -> {
                    r2.set(v1);
                }).map((v0) -> {
                    return v0.payload();
                }))).map(obj -> {
                    return ((Message) atomicReference.get()).with(obj);
                });
            });
        }), this.name, this.attributes);
    }
}
