package kr.jm.utils.kafka.streams;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.helper.JMLambda;
import kr.jm.utils.helper.JMLog;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kr/jm/utils/kafka/streams/JMKafkaStreamsHelper.class */
public class JMKafkaStreamsHelper {
    private static final Logger log = LoggerFactory.getLogger(JMKafkaStreamsHelper.class);
    private static final Consumed<String, String> STRING_STRING_CONSUMED = Consumed.with(Serdes.String(), Serdes.String());
    private static ObjectMapper objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL);

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Optional<T> buildJsonStringAsOpt(ObjectMapper objectMapper2, String str, TypeReference<T> typeReference) {
        if (objectMapper2 == null) {
            try {
                objectMapper2 = objectMapper;
            } catch (Exception e) {
                return JMExceptionManager.handleExceptionAndReturnEmptyOptional(log, e, "buildJsonStringAsOpt", new Object[]{str});
            }
        }
        return Optional.of(objectMapper2.readValue(str.getBytes(), typeReference));
    }

    public static <T> Topology buildKStreamTopologyWithOpt(Consumer<KStream<String, Optional<T>>> consumer, TypeReference<T> typeReference, String... strArr) {
        return buildKStreamTopologyWithOpt(consumer, null, typeReference, strArr);
    }

    public static <T> Topology buildKStreamTopologyWithOpt(Consumer<KStream<String, Optional<T>>> consumer, ObjectMapper objectMapper2, TypeReference<T> typeReference, String... strArr) {
        return buildTopology(streamsBuilder -> {
            consumer.accept(buildKStreamWithOpt(streamsBuilder, objectMapper2, typeReference, strArr));
        });
    }

    public static <T> KStream<String, Optional<T>> buildKStreamWithOpt(StreamsBuilder streamsBuilder, TypeReference<T> typeReference, String... strArr) {
        return buildKStreamWithOpt(streamsBuilder, null, typeReference, strArr);
    }

    public static <T> KStream<String, Optional<T>> buildKStreamWithOpt(StreamsBuilder streamsBuilder, ObjectMapper objectMapper2, TypeReference<T> typeReference, String... strArr) {
        return buildKStream(streamsBuilder, strArr).mapValues(str -> {
            return buildJsonStringAsOpt(objectMapper2, str, typeReference);
        });
    }

    public static KStream<String, String> buildKStream(StreamsBuilder streamsBuilder, String[] strArr) {
        return streamsBuilder.stream(Arrays.asList(strArr), STRING_STRING_CONSUMED);
    }

    public static <T> Topology buildKStreamTopology(Consumer<KStream<String, T>> consumer, TypeReference<T> typeReference, String... strArr) {
        return buildKStreamTopology(consumer, null, typeReference, strArr);
    }

    public static <T> Topology buildKStreamTopology(Consumer<KStream<String, T>> consumer, ObjectMapper objectMapper2, TypeReference<T> typeReference, String... strArr) {
        return buildTopology(streamsBuilder -> {
            consumer.accept(buildKStreamWithOpt(streamsBuilder, objectMapper2, typeReference, strArr).filter(JMKafkaStreamsHelper::logAndFilter).mapValues((v0) -> {
                return v0.get();
            }));
        });
    }

    public static <T> Topology buildTopology(Consumer<StreamsBuilder> consumer) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        consumer.accept(streamsBuilder);
        return streamsBuilder.build();
    }

    private static <T> boolean logAndFilter(String str, Optional<T> optional) {
        return ((Boolean) JMLambda.consumeAndGetSelf(Boolean.valueOf(optional.isPresent()), bool -> {
            JMLambda.runIfTrue(bool.booleanValue(), () -> {
                JMLog.warn(log, "logAndFilter", new Object[]{str, optional});
            });
        })).booleanValue();
    }

    public static <T> Topology buildTableTopologyWithOpt(Consumer<KTable<String, Optional<T>>> consumer, TypeReference<T> typeReference, String str) {
        return buildTableTopologyWithOpt(consumer, null, typeReference, str);
    }

    public static <T> Topology buildTableTopologyWithOpt(Consumer<KTable<String, Optional<T>>> consumer, ObjectMapper objectMapper2, TypeReference<T> typeReference, String str) {
        return buildTopology(streamsBuilder -> {
            consumer.accept(buildKTableWithOpt(streamsBuilder, objectMapper2, typeReference, str));
        });
    }

    public static <T> KTable<String, Optional<T>> buildKTableWithOpt(StreamsBuilder streamsBuilder, TypeReference<T> typeReference, String str) {
        return buildKTableWithOpt(streamsBuilder, null, typeReference, str);
    }

    public static <T> KTable<String, Optional<T>> buildKTableWithOpt(StreamsBuilder streamsBuilder, ObjectMapper objectMapper2, TypeReference<T> typeReference, String str) {
        return streamsBuilder.table(str, STRING_STRING_CONSUMED).mapValues(str2 -> {
            return buildJsonStringAsOpt(objectMapper2, str2, typeReference);
        });
    }

    public static <T> Topology buildTableTopology(Consumer<KTable<String, T>> consumer, TypeReference<T> typeReference, String str) {
        return buildTableTopology(consumer, null, typeReference, str);
    }

    public static <T> Topology buildTableTopology(Consumer<KTable<String, T>> consumer, ObjectMapper objectMapper2, TypeReference<T> typeReference, String str) {
        return buildTopology(streamsBuilder -> {
            consumer.accept(buildKTableWithOpt(streamsBuilder, objectMapper2, typeReference, str).filter(JMKafkaStreamsHelper::logAndFilter).mapValues((v0) -> {
                return v0.get();
            }));
        });
    }

    public static KafkaStreams buildKafkaStreams(final boolean z, final String str, final String str2, Topology topology) {
        return buildKafkaStreams(topology, new Properties() { // from class: kr.jm.utils.kafka.streams.JMKafkaStreamsHelper.1
            {
                put("application.id", str2);
                put("bootstrap.servers", str);
                put("auto.offset.reset", z ? "latest" : "earliest");
                put("default.key.serde", Serdes.String().getClass());
                put("default.value.serde", Serdes.String().getClass());
            }
        });
    }

    public static KafkaStreams buildKafkaStreams(Topology topology, Properties properties) {
        JMLog.info(log, "buildKafkaStreams", new Object[]{topology.describe(), properties});
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
            JMExceptionManager.logException(log, th, "uncaughtException", new Object[]{thread});
        });
        return kafkaStreams;
    }

    public static KafkaStreams buildKafkaStreamsWithStart(String str, String str2, Topology topology) {
        return startKafkaStream(buildKafkaStreams(false, str, str2, topology));
    }

    public static KafkaStreams buildKafkaStreamsWithStart(Properties properties, Topology topology) {
        return startKafkaStream(buildKafkaStreams(topology, properties));
    }

    private static KafkaStreams startKafkaStream(KafkaStreams kafkaStreams) {
        JMLog.info(log, "startKafkaStream");
        kafkaStreams.start();
        return kafkaStreams;
    }
}
