package io.kipe.streams.test.kafka;

import io.micronaut.configuration.kafka.serde.JsonSerdeRegistry;
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:io/kipe/streams/test/kafka/TopologyTestContext.class */
public class TopologyTestContext {
    private static final Properties CONFIG = new Properties();
    private static final JsonSerdeRegistry JSONSERDEREGISTRY;
    private final ConfiguredStreamBuilder streamBuilder;
    private TopologyTestDriver driver = null;

    public static TopologyTestContext create(Map<String, String> map) {
        CONFIG.putAll(map);
        return new TopologyTestContext(new ConfiguredStreamBuilder(CONFIG));
    }

    private TopologyTestContext(ConfiguredStreamBuilder configuredStreamBuilder) {
        this.streamBuilder = configuredStreamBuilder;
    }

    public ConfiguredStreamBuilder getStreamsBuilder() {
        return this.streamBuilder;
    }

    public JsonSerdeRegistry getJsonSerdeRegistry() {
        return JSONSERDEREGISTRY;
    }

    public <K, V> KStream<K, V> createKStream(String str, Class<K> cls, Class<V> cls2) {
        return this.streamBuilder.stream(str, Consumed.with(JSONSERDEREGISTRY.getSerde(cls), JSONSERDEREGISTRY.getSerde(cls2)).withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
    }

    public <K, V> KStream<K, V> createKStream(String str, Serde<K> serde, Serde<V> serde2) {
        return this.streamBuilder.stream(str, Consumed.with(serde, serde2).withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
    }

    public <K, V> KStream<K, V> createKStream(String str) {
        return this.streamBuilder.stream(str);
    }

    public TopologyTestDriver initTopologyTestDriver() {
        this.driver = new TopologyTestDriver(this.streamBuilder.build(), CONFIG);
        return this.driver;
    }

    public <K, V> TestInputTopic<K, V> createTestInputTopic(String str, Class<K> cls, Class<V> cls2) {
        Objects.requireNonNull(this.driver, "TopologyTestDriver must be initialized before by calling 'initTopologyTestDriver()'");
        return this.driver.createInputTopic(str, JSONSERDEREGISTRY.getSerializer(cls), JSONSERDEREGISTRY.getSerializer(cls2));
    }

    public <K, V> TestInputTopic<K, V> createTestInputTopic(String str, Serde<K> serde, Serde<V> serde2) {
        Objects.requireNonNull(this.driver, "TopologyTestDriver must be initialized before by calling 'initTopologyTestDriver()'");
        return this.driver.createInputTopic(str, serde.serializer(), serde2.serializer());
    }

    public <K, V> TestOutputTopic<K, V> createTestOutputTopic(String str, Class<K> cls, Class<V> cls2) {
        Objects.requireNonNull(this.driver, "TopologyTestDriver must be initialized before by calling 'initTopologyTestDriver()'");
        return this.driver.createOutputTopic(str, JSONSERDEREGISTRY.getDeserializer(cls), JSONSERDEREGISTRY.getDeserializer(cls2));
    }

    public <K, V> TestOutputTopic<K, V> createTestOutputTopic(String str, Serde<K> serde, Serde<V> serde2) {
        Objects.requireNonNull(this.driver, "TopologyTestDriver must be initialized before by calling 'initTopologyTestDriver()'");
        return this.driver.createOutputTopic(str, serde.deserializer(), serde2.deserializer());
    }

    public void close() {
        Objects.requireNonNull(this.driver, "TopologyTestDriver must be initialized before by calling 'initTopologyTestDriver()'");
        this.driver.close();
        this.driver = null;
    }

    static {
        CONFIG.put("application.id", "test");
        CONFIG.put("bootstrap.servers", "dummy:1234");
        JSONSERDEREGISTRY = MockedJsonSerdeRegistry.create();
    }
}
