package cern.streaming.pool.ext.tensorics.streamfactory;

import cern.streaming.pool.core.service.DiscoveryService;
import cern.streaming.pool.core.service.ReactiveStream;
import cern.streaming.pool.core.service.StreamFactory;
import cern.streaming.pool.core.service.StreamId;
import cern.streaming.pool.core.service.util.ReactiveStreams;
import cern.streaming.pool.ext.tensorics.streamid.BufferedStreamId;
import cern.streaming.pool.ext.tensorics.streamid.FunctionStreamId;
import java.util.Optional;
import org.tensorics.core.commons.operations.Conversion;
import org.tensorics.core.function.DiscreteFunction;
import org.tensorics.core.function.MapBackedDiscreteFunction;

@Deprecated
/* loaded from: input_file:cern/streaming/pool/ext/tensorics/streamfactory/DiscreteFunctionStreamFactory.class */
public class DiscreteFunctionStreamFactory implements StreamFactory {
    public <T> Optional<ReactiveStream<T>> create(StreamId<T> streamId, DiscoveryService discoveryService) {
        return !(streamId instanceof FunctionStreamId) ? Optional.empty() : Optional.of(createFunctionStream((FunctionStreamId) streamId, discoveryService));
    }

    <R, X, Y> ReactiveStream<DiscreteFunction<X, Y>> createFunctionStream(FunctionStreamId<R, X, Y> functionStreamId, DiscoveryService discoveryService) {
        BufferedStreamId<R> sourceStream = functionStreamId.getSourceStream();
        Conversion<? super R, ? extends X> toX = functionStreamId.getToX();
        Conversion<? super R, ? extends Y> toY = functionStreamId.getToY();
        return ReactiveStreams.fromRx(ReactiveStreams.rxFrom(discoveryService.discover(sourceStream)).map(list -> {
            MapBackedDiscreteFunction.Builder builder = MapBackedDiscreteFunction.builder();
            for (Object obj : list) {
                builder.put(toX.apply(obj), toY.apply(obj));
            }
            return builder.build();
        }));
    }
}
