package net.pincette.jes.util;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import net.pincette.function.SideEffect;
import net.pincette.json.JsonUtil;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.TimedCache;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:net/pincette/jes/util/Streams.class */
public class Streams {
    private static final String ACTION = "action";
    private static final String APPLICATION = "application";
    private static final String IN = "in";
    private static final String OUT = "out";
    private static final String START = "start";
    private static final String STOP = "stop";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/pincette/jes/util/Streams$TopologyEntry.class */
    public static class TopologyEntry {
        private final Properties properties;
        private final KafkaStreams streams;
        private final Topology topology;

        private TopologyEntry(Topology topology, Properties properties, KafkaStreams kafkaStreams) {
            this.topology = topology;
            this.properties = properties;
            this.streams = kafkaStreams;
        }
    }

    /* loaded from: input_file:net/pincette/jes/util/Streams$TopologyLifeCycle.class */
    public interface TopologyLifeCycle {
        void started(Topology topology, String str);

        void stopped(Topology topology, String str);
    }

    /* loaded from: input_file:net/pincette/jes/util/Streams$TopologyLifeCycleEmitter.class */
    public static class TopologyLifeCycleEmitter implements TopologyLifeCycle {
        private final KafkaProducer<String, JsonObject> producer;
        private final String topic;

        public TopologyLifeCycleEmitter(String str, KafkaProducer<String, JsonObject> kafkaProducer) {
            this.topic = str;
            this.producer = kafkaProducer;
        }

        private void sendMessage(Topology topology, String str, String str2) {
            Pair<Set<String>, Set<String>> pair = Streams.topics(topology);
            Kafka.send(this.producer, new ProducerRecord(this.topic, UUID.randomUUID().toString(), Streams.startStopMessage(str, (Set) pair.first, (Set) pair.second, str2))).toCompletableFuture().join();
        }

        @Override // net.pincette.jes.util.Streams.TopologyLifeCycle
        public void started(Topology topology, String str) {
            sendMessage(topology, str, Streams.START);
        }

        @Override // net.pincette.jes.util.Streams.TopologyLifeCycle
        public void stopped(Topology topology, String str) {
            sendMessage(topology, str, Streams.STOP);
        }
    }

    private Streams() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void closeStreams(List<TopologyEntry> list, CountDownLatch countDownLatch, TopologyLifeCycle topologyLifeCycle) {
        if (topologyLifeCycle != null) {
            list.forEach(topologyEntry -> {
                topologyLifeCycle.stopped(topologyEntry.topology, getApplication(topologyEntry.properties));
            });
        }
        list.forEach(topologyEntry2 -> {
            topologyEntry2.streams.close(Duration.ofSeconds(5L));
            topologyEntry2.streams.cleanUp();
            countDownLatch.countDown();
        });
    }

    private static Properties copy(Properties properties) {
        Properties properties2 = new Properties();
        properties.stringPropertyNames().forEach(str -> {
            properties2.setProperty(str, properties.getProperty(str));
        });
        return properties2;
    }

    public static <K, V, U> KStream<K, V> duplicateFilter(KStream<K, V> kStream, BiFunction<K, V, U> biFunction, Duration duration) {
        TimedCache timedCache = new TimedCache(duration);
        return kStream.filterNot((obj, obj2) -> {
            return timedCache.get(biFunction.apply(obj, obj2)).isPresent();
        }).map((obj3, obj4) -> {
            return (KeyValue) Optional.of(biFunction.apply(obj3, obj4)).map(obj3 -> {
                return (KeyValue) SideEffect.run(() -> {
                    timedCache.put(obj3, obj3);
                }).andThenGet(() -> {
                    return new KeyValue(obj3, obj4);
                });
            }).orElse(null);
        });
    }

    public static Properties fromConfig(Config config, String str) {
        return (Properties) config.getConfig(str).entrySet().stream().reduce(new Properties(), (properties, entry) -> {
            properties.put(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped().toString());
            return properties;
        }, (properties2, properties3) -> {
            return properties2;
        });
    }

    private static String getApplication(Properties properties) {
        return properties.getProperty("application.id");
    }

    private static Stream<TopologyDescription.Node> getNodes(Topology topology) {
        return getNodes(topology.describe().subtopologies().stream().flatMap(subtopology -> {
            return subtopology.nodes().stream();
        }), new HashSet());
    }

    private static Stream<TopologyDescription.Node> getNodes(Stream<TopologyDescription.Node> stream, Set<TopologyDescription.Node> set) {
        return stream.filter(node -> {
            return !set.contains(node);
        }).map(node2 -> {
            return (TopologyDescription.Node) SideEffect.run(() -> {
                set.add(node2);
            }).andThenGet(() -> {
                return node2;
            });
        }).flatMap(node3 -> {
            return Stream.concat(Stream.of(node3), getNodes(Stream.concat(node3.predecessors().stream(), node3.successors().stream()), set));
        });
    }

    private static boolean internalTopic(String str) {
        return str.contains("KSTREAM");
    }

    public static boolean start(Topology topology, Properties properties) {
        return start(topology, properties, null);
    }

    public static boolean start(Topology topology, Properties properties, TopologyLifeCycle topologyLifeCycle) {
        return start((Stream<Pair<Topology, Properties>>) Stream.of(Pair.pair(topology, properties)), topologyLifeCycle);
    }

    public static boolean start(Stream<Pair<Topology, Properties>> stream) {
        return start(stream, (TopologyLifeCycle) null);
    }

    public static boolean start(Stream<Pair<Topology, Properties>> stream, TopologyLifeCycle topologyLifeCycle) {
        List<TopologyEntry> list = topologyEntries(stream);
        boolean[] zArr = new boolean[1];
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(topologyEntry -> {
            topologyEntry.streams.setStateListener((state, state2) -> {
                if (state.equals(KafkaStreams.State.ERROR)) {
                    zArr[0] = true;
                    closeStreams(list, countDownLatch, topologyLifeCycle);
                }
            });
            topologyEntry.streams.start();
            topologyLifeCycle.started(topologyEntry.topology, getApplication(topologyEntry.properties));
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            closeStreams(list, countDownLatch, topologyLifeCycle);
        }));
        countDownLatch.getClass();
        net.pincette.util.Util.tryToDoRethrow(countDownLatch::await);
        return !zArr[0];
    }

    public static JsonObject startMessage(String str, Set<String> set, Set<String> set2) {
        return startStopMessage(str, set, set2, START);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static JsonObject startStopMessage(String str, Set<String> set, Set<String> set2, String str2) {
        return JsonUtil.createObjectBuilder().add(APPLICATION, str).add(IN, toJsonArray(set)).add(OUT, toJsonArray(set2)).add(ACTION, str2).build();
    }

    public static JsonObject stopMessage(String str, Set<String> set, Set<String> set2) {
        return startStopMessage(str, set, set2, STOP);
    }

    public static Properties streamsConfig(Properties properties) {
        Properties copy = copy(properties);
        if (copy.getProperty("default.deserialization.exception.handler") == null) {
            copy.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        }
        if (copy.getProperty("default.key.serde") == null) {
            copy.put("default.key.serde", Serdes.String().getClass());
        }
        if (copy.getProperty("default.value.serde") == null) {
            copy.put("default.value.serde", JsonSerde.class);
        }
        if (copy.getProperty("processing.guarantee") == null) {
            copy.put("processing.guarantee", "at_least_once");
        }
        return copy;
    }

    private static JsonArrayBuilder toJsonArray(Set<String> set) {
        return (JsonArrayBuilder) set.stream().reduce(JsonUtil.createArrayBuilder(), (v0, v1) -> {
            return v0.add(v1);
        }, (jsonArrayBuilder, jsonArrayBuilder2) -> {
            return jsonArrayBuilder;
        });
    }

    public static Pair<Set<String>, Set<String>> topics(Topology topology) {
        return (Pair) getNodes(topology).filter(node -> {
            return (node instanceof TopologyDescription.Source) || (node instanceof TopologyDescription.Sink);
        }).reduce(Pair.pair(new HashSet(), new HashSet()), (pair, node2) -> {
            if (node2 instanceof TopologyDescription.Source) {
                ((Set) pair.first).addAll(topics((TopologyDescription.Source) node2));
            } else {
                ((Set) pair.second).addAll(topics((TopologyDescription.Sink) node2));
            }
            return pair;
        }, (pair2, pair3) -> {
            return pair2;
        });
    }

    private static Set<String> topics(TopologyDescription.Source source) {
        return (Set) source.topicSet().stream().filter(str -> {
            return !internalTopic(str);
        }).collect(Collectors.toSet());
    }

    private static Set<String> topics(TopologyDescription.Sink sink) {
        return (Set) Optional.ofNullable(sink.topic()).filter(str -> {
            return !internalTopic(str);
        }).map(str2 -> {
            return Collections.set(new String[]{str2});
        }).orElseGet(java.util.Collections::emptySet);
    }

    private static List<TopologyEntry> topologyEntries(Stream<Pair<Topology, Properties>> stream) {
        return (List) stream.map(pair -> {
            return new TopologyEntry((Topology) pair.first, (Properties) pair.second, new KafkaStreams((Topology) pair.first, streamsConfig((Properties) pair.second)));
        }).collect(Collectors.toList());
    }
}
