package net.pincette.jes.util;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.pincette.function.SideEffect;
import net.pincette.util.Pair;
import net.pincette.util.TimedCache;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:net/pincette/jes/util/Streams.class */
public class Streams {
    private Streams() {
    }

    public static <K, V, U> KStream<K, V> duplicateFilter(KStream<K, V> kStream, BiFunction<K, V, U> biFunction, Duration duration) {
        TimedCache timedCache = new TimedCache(duration);
        return kStream.filterNot((obj, obj2) -> {
            return timedCache.get(biFunction.apply(obj, obj2)).isPresent();
        }).map((obj3, obj4) -> {
            return (KeyValue) Optional.of(biFunction.apply(obj3, obj4)).map(obj3 -> {
                return (KeyValue) SideEffect.run(() -> {
                    timedCache.put(obj3, obj3);
                }).andThenGet(() -> {
                    return new KeyValue(obj3, obj4);
                });
            }).orElse(null);
        });
    }

    private static Properties copy(Properties properties) {
        Properties properties2 = new Properties();
        properties.stringPropertyNames().forEach(str -> {
            properties2.setProperty(str, properties.getProperty(str));
        });
        return properties2;
    }

    public static Properties fromConfig(Config config, String str) {
        return (Properties) config.getConfig(str).entrySet().stream().reduce(new Properties(), (properties, entry) -> {
            properties.put(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped().toString());
            return properties;
        }, (properties2, properties3) -> {
            return properties2;
        });
    }

    public static boolean start(Topology topology, Properties properties) {
        return start(Stream.of(Pair.pair(topology, properties)));
    }

    public static boolean start(Stream<Pair<Topology, Properties>> stream) {
        List list = (List) stream.collect(Collectors.toList());
        Boolean[] boolArr = new Boolean[list.size()];
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        KafkaStreams[] kafkaStreamsArr = new KafkaStreams[list.size()];
        for (int i = 0; i < list.size(); i++) {
            kafkaStreamsArr[i] = new KafkaStreams((Topology) ((Pair) list.get(i)).first, streamsConfig((Properties) ((Pair) list.get(i)).second));
            kafkaStreamsArr[i].setStateListener((state, state2) -> {
                if (state.equals(KafkaStreams.State.ERROR)) {
                    boolArr[0] = true;
                    countDownLatch.countDown();
                }
            });
        }
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            for (KafkaStreams kafkaStreams : kafkaStreamsArr) {
                kafkaStreams.close();
                countDownLatch.countDown();
            }
        }));
        for (KafkaStreams kafkaStreams : kafkaStreamsArr) {
            kafkaStreams.start();
        }
        countDownLatch.getClass();
        net.pincette.util.Util.tryToDoRethrow(countDownLatch::await);
        return Arrays.stream(boolArr).noneMatch(bool -> {
            return bool.booleanValue();
        });
    }

    public static Properties streamsConfig(Properties properties) {
        Properties copy = copy(properties);
        if (copy.getProperty("default.deserialization.exception.handler") == null) {
            copy.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        }
        if (copy.getProperty("default.key.serde") == null) {
            copy.put("default.key.serde", Serdes.String().getClass());
        }
        if (copy.getProperty("default.value.serde") == null) {
            copy.put("default.value.serde", JsonSerde.class);
        }
        if (copy.getProperty("processing.guarantee") == null) {
            copy.put("processing.guarantee", "at_least_once");
        }
        return copy;
    }
}
