package me.escoffier.fluid.kafka;

import io.reactivex.Completable;
import io.vertx.core.Future;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.impl.AsyncResultCompletable;
import java.util.LinkedHashMap;
import java.util.Map;
import me.escoffier.fluid.config.Config;
import me.escoffier.fluid.impl.DataExpressionFactories;
import me.escoffier.fluid.models.Message;
import me.escoffier.fluid.models.Sink;
import me.escoffier.fluid.spi.DataExpression;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:me/escoffier/fluid/kafka/KafkaSink.class */
public class KafkaSink<T> implements Sink<T> {
    private final KafkaWriteStream<String, T> stream;
    private final String topic;
    private final Integer partition;
    private final DataExpression key;
    private final String name;
    private final String brokers;
    private Long timestamp;

    public KafkaSink(Vertx vertx, String str, Config config) {
        Map<String, Object> map = toMap(config);
        this.stream = KafkaWriteStream.create(vertx.getDelegate(), map);
        this.topic = config.getString("topic", str);
        this.partition = Integer.valueOf(config.getInt("partition", 0));
        this.timestamp = (Long) config.getLong("timestamp").orElse(null);
        this.key = DataExpressionFactories.requiredEventExpression(config.getString("key", (String) null));
        this.brokers = map.get("bootstrap.servers").toString();
        this.name = str;
    }

    public String topic() {
        return this.topic;
    }

    public String brokers() {
        return this.brokers;
    }

    private static Map<String, Object> toMap(Config config) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        Config config2 = (Config) config.root().getConfig("kafka").orElse(Config.empty());
        config2.names().forEachRemaining(str -> {
            linkedHashMap.put(str, config2.getString(str).orElse(null));
        });
        config.names().forEachRemaining(str2 -> {
            linkedHashMap.put(str2, config.getString(str2).orElse(null));
        });
        return linkedHashMap;
    }

    public Completable dispatch(Message<T> message) {
        ProducerRecord producerRecord = new ProducerRecord(this.topic, this.partition, this.timestamp, this.key.evaluate(message), message.payload());
        return new AsyncResultCompletable(handler -> {
            this.stream.write(producerRecord, asyncResult -> {
                if (asyncResult.succeeded()) {
                    handler.handle(Future.succeededFuture());
                } else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
            });
        });
    }

    public String name() {
        return this.name;
    }
}
