package io.kgraph.library;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.TestGraphUtils;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphGenerators;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.StreamUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/library/ConnectedComponentsTest.class */
public class ConnectedComponentsTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(ConnectedComponentsTest.class);
    GraphAlgorithm<Long, Long, Long, KTable<Long, Long>> algorithm;

    /* loaded from: input_file:io/kgraph/library/ConnectedComponentsTest$InitVertices.class */
    private static final class InitVertices implements ValueMapper<Long, Long> {
        private InitVertices() {
        }

        public Long apply(Long l) {
            return l;
        }
    }

    /* loaded from: input_file:io/kgraph/library/ConnectedComponentsTest$InitVerticesFromId.class */
    private static final class InitVerticesFromId<VV> implements ValueMapperWithKey<Long, VV, Long> {
        private InitVerticesFromId() {
        }

        public Long apply(Long l, VV vv) {
            return l;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((Long) obj, (Long) obj2);
        }
    }

    @Test
    public void testConnectedComponents() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties()), new KryoSerde(), Serdes.Long(), TestGraphUtils.getTwoChains()), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        this.algorithm = new ConnectedComponents((String) null, "run-cc", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-cc", "edgesGroupedBySource-cc", fromEdges.serialized(), "solutionSet-cc", "solutionSetStore-cc", "workSet-cc", 2, (short) 1);
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-cc", "prepare-client-cc", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-cc", "edgesGroupedBySource-cc", 2, (short) 1).get();
        this.algorithm.configure(new StreamsBuilder(), ClientUtils.streamsConfig("run-cc", "run-client-cc", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class)).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Thread.sleep(2000L);
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-cc");
        log.debug("result: {}", mapFromStore);
        HashMap hashMap = new HashMap();
        hashMap.put(0L, 0L);
        hashMap.put(1L, 0L);
        hashMap.put(2L, 0L);
        hashMap.put(3L, 0L);
        hashMap.put(4L, 0L);
        hashMap.put(5L, 0L);
        hashMap.put(6L, 0L);
        hashMap.put(7L, 0L);
        hashMap.put(8L, 0L);
        hashMap.put(9L, 0L);
        hashMap.put(10L, 10L);
        hashMap.put(11L, 10L);
        hashMap.put(12L, 10L);
        hashMap.put(13L, 10L);
        hashMap.put(14L, 10L);
        hashMap.put(15L, 10L);
        hashMap.put(16L, 10L);
        hashMap.put(17L, 10L);
        hashMap.put(18L, 10L);
        hashMap.put(19L, 10L);
        hashMap.put(20L, 10L);
        Assert.assertEquals(hashMap, mapFromStore);
    }

    @Test
    public void testGridConnectedComponents() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGraph gridGraph = GraphGenerators.gridGraph(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties()), 10, 10);
        KGraph kGraph = new KGraph(gridGraph.vertices().mapValues(new InitVerticesFromId()), gridGraph.edges(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        this.algorithm = new ConnectedComponents((String) null, "run-grid", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-grid", "edgesGroupedBySource-grid", kGraph.serialized(), "solutionSet-grid", "solutionSetStore-grid", "workSet-grid", 2, (short) 1);
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-grid", "prepare-client-grid", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), kGraph.vertexValueSerde().getClass()), kGraph, "vertices-grid", "edgesGroupedBySource-grid", 2, (short) 1).get();
        this.algorithm.configure(new StreamsBuilder(), ClientUtils.streamsConfig("run-grid", "run-client-grid", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class)).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Thread.sleep(2000L);
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-grid");
        log.debug("result: {}", mapFromStore);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                return;
            }
            Assert.assertEquals(0L, ((Long) mapFromStore.get(Long.valueOf(j2))).longValue());
            j = j2 + 1;
        }
    }

    public void testGridConnectedComponentsMultipleClients() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGraph gridGraph = GraphGenerators.gridGraph(streamsBuilder, ClientUtils.producerConfig("localhost:9092", LongSerializer.class, LongSerializer.class, new Properties()), 10, 10);
        KGraph kGraph = new KGraph(gridGraph.vertices().mapValues(new InitVerticesFromId()), gridGraph.edges(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        ConnectedComponents connectedComponents = new ConnectedComponents((String) null, "run-grid", "localhost:9092", "localhost:2181", "vertices-grid", "edgesGroupedBySource-grid", kGraph.serialized(), "solutionSet-grid", "solutionSetStore-grid", "workSet-grid", 2, (short) 1);
        ConnectedComponents connectedComponents2 = new ConnectedComponents((String) null, "run-grid", "localhost:9092", "localhost:2181", "vertices-grid", "edgesGroupedBySource-grid", kGraph.serialized(), "solutionSet-grid", "solutionSetStore-grid", "workSet-grid", 2, (short) 1);
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-grid", "prepare-client-grid", "localhost:9092", kGraph.keySerde().getClass(), kGraph.vertexValueSerde().getClass()), kGraph, "vertices-grid", "edgesGroupedBySource-grid", 2, (short) 1).get();
        Properties streamsConfig = ClientUtils.streamsConfig("run-grid", "run-client-grid", "localhost:9092", kGraph.keySerde().getClass(), KryoSerde.class);
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        StreamsBuilder streamsBuilder3 = new StreamsBuilder();
        connectedComponents.configure(streamsBuilder2, streamsConfig);
        streamsConfig.put("state.dir", ClientUtils.tempDirectory().getAbsolutePath());
        connectedComponents2.configure(streamsBuilder3, streamsConfig).streams();
        GraphAlgorithmState run = connectedComponents.run(Integer.MAX_VALUE);
        GraphAlgorithmState run2 = connectedComponents2.run(Integer.MAX_VALUE);
        run.result().get();
        run2.result().get();
        Thread.sleep(2000L);
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-grid");
        Map mapFromStore2 = StreamUtils.mapFromStore(run2.streams(), "solutionSetStore-grid");
        log.debug("result: {}", mapFromStore);
        log.debug("result: {}", mapFromStore2);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                connectedComponents.close();
                connectedComponents2.close();
                return;
            } else {
                Long l = (Long) mapFromStore.get(Long.valueOf(j2));
                if (l == null) {
                    l = (Long) mapFromStore2.get(Long.valueOf(j2));
                }
                Assert.assertEquals(0L, l.longValue());
                j = j2 + 1;
            }
        }
    }

    public void testConnectedComponentsMultipleClientsFromFile() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties producerConfig = ClientUtils.producerConfig("localhost:9092", LongSerializer.class, LongSerializer.class, new Properties());
        GraphUtils.verticesToTopic(GraphUtils.class.getResourceAsStream("/vertices.txt"), Long::parseLong, new LongSerializer(), producerConfig, "initVertices-file", 50, (short) 1);
        GraphUtils.edgesToTopic(GraphUtils.class.getResourceAsStream("/edges.txt"), Long::parseLong, new LongSerializer(), producerConfig, "initEdges-file", 50, (short) 1);
        KGraph kGraph = new KGraph(streamsBuilder.table("initVertices-file", Consumed.with(Serdes.Long(), Serdes.Long())), streamsBuilder.table("initEdges-file", Consumed.with(new KryoSerde(), Serdes.Long())), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        ConnectedComponents connectedComponents = new ConnectedComponents((String) null, "run-file", "localhost:9092", "localhost:2181", "vertices-file", "edgesGroupedBySource-file", kGraph.serialized(), "solutionSet-file", "solutionSetStore-file", "workSet-file", 2, (short) 1);
        ConnectedComponents connectedComponents2 = new ConnectedComponents((String) null, "run-file", "localhost:9092", "localhost:2181", "vertices-file", "edgesGroupedBySource-file", kGraph.serialized(), "solutionSet-file", "solutionSetStore-file", "workSet-file", 2, (short) 1);
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-file", "prepare-client-file", "localhost:9092", kGraph.keySerde().getClass(), kGraph.vertexValueSerde().getClass()), kGraph, "vertices-file", "edgesGroupedBySource-file", 2, (short) 1).get();
        Properties streamsConfig = ClientUtils.streamsConfig("run-file", "run-client-file", "localhost:9092", kGraph.keySerde().getClass(), KryoSerde.class);
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        StreamsBuilder streamsBuilder3 = new StreamsBuilder();
        connectedComponents.configure(streamsBuilder2, streamsConfig).streams();
        streamsConfig.put("state.dir", ClientUtils.tempDirectory().getAbsolutePath());
        connectedComponents2.configure(streamsBuilder3, streamsConfig).streams();
        GraphAlgorithmState run = connectedComponents.run();
        GraphAlgorithmState run2 = connectedComponents2.run();
        run.result().get();
        run2.result().get();
        Thread.sleep(2000L);
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-file");
        Map mapFromStore2 = StreamUtils.mapFromStore(run2.streams(), "solutionSetStore-file");
        log.debug("result: {}", mapFromStore);
        log.debug("result: {}", mapFromStore2);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            Long l = (Long) mapFromStore.get(Long.valueOf(j2));
            if (l == null) {
                l = (Long) mapFromStore2.get(Long.valueOf(j2));
            }
            Assert.assertEquals(0L, l.longValue());
            j = j2 + 1;
        }
        long j3 = 10;
        while (true) {
            long j4 = j3;
            if (j4 >= 21) {
                connectedComponents.close();
                connectedComponents2.close();
                return;
            } else {
                Long l2 = (Long) mapFromStore.get(Long.valueOf(j4));
                if (l2 == null) {
                    l2 = (Long) mapFromStore2.get(Long.valueOf(j4));
                }
                Assert.assertEquals(10L, l2.longValue());
                j3 = j4 + 1;
            }
        }
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
