package me.escoffier.fluid.kafka;

import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecord;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import me.escoffier.fluid.config.Config;
import me.escoffier.fluid.models.DefaultSource;
import me.escoffier.fluid.models.Message;
import me.escoffier.fluid.models.Source;

/* loaded from: input_file:me/escoffier/fluid/kafka/KafkaSource.class */
public class KafkaSource<T> extends DefaultSource<T> implements Source<T> {
    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSource(Vertx vertx, String str, Config config) {
        super(KafkaConsumer.create(vertx, toMap(config)).subscribe(config.getString("topic", str)).toFlowable().map(KafkaSource::createDataFromRecord).compose(flowable -> {
            int i = config.getInt("multicast.buffer.size", 0);
            if (i > 0) {
                return flowable.replay(i).autoConnect();
            }
            return Integer.valueOf(config.getInt("multicast.buffer.period.ms", -1)).intValue() != -1 ? flowable.replay(r0.intValue(), TimeUnit.MILLISECONDS).autoConnect() : flowable;
        }), str, getAttributes(config));
    }

    private static Map<String, Object> getAttributes(Config config) {
        HashMap hashMap = new HashMap();
        Map<String, String> map = toMap(config);
        hashMap.put("kafka-broker", map.get("bootstrap.servers"));
        hashMap.put("kafka-topic", map.get("topic"));
        return hashMap;
    }

    private static <T> Message<T> createDataFromRecord(KafkaConsumerRecord<String, T> kafkaConsumerRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put("timestamp", Long.valueOf(kafkaConsumerRecord.timestamp()));
        hashMap.put("timestamp-type", kafkaConsumerRecord.timestampType());
        hashMap.put("fluid.original", kafkaConsumerRecord);
        hashMap.put("partition", Integer.valueOf(kafkaConsumerRecord.partition()));
        hashMap.put("checksum", Long.valueOf(kafkaConsumerRecord.checksum()));
        hashMap.put("fluid.key", kafkaConsumerRecord.key());
        hashMap.put("fluid.address", kafkaConsumerRecord.topic());
        return new Message<>(kafkaConsumerRecord.value(), hashMap);
    }

    private static Map<String, String> toMap(Config config) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Config config2 = (Config) config.root().getConfig("kafka").orElse(Config.empty());
        config2.names().forEachRemaining(str -> {
        });
        config.names().forEachRemaining(str2 -> {
        });
        return linkedHashMap;
    }
}
