package me.ehp246.aufkafka.core.producer;

import java.util.concurrent.CompletableFuture;
import me.ehp246.aufkafka.api.producer.PartitionMapProvider;
import me.ehp246.aufkafka.api.producer.ProducerFn;
import me.ehp246.aufkafka.api.producer.ProducerFnProvider;
import me.ehp246.aufkafka.api.producer.ProducerProvider;
import me.ehp246.aufkafka.api.producer.ProducerRecordBuilder;
import me.ehp246.aufkafka.api.producer.ProducerRecordBuilderProvider;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:me/ehp246/aufkafka/core/producer/DefaultProducerFnProvider.class */
public final class DefaultProducerFnProvider implements ProducerFnProvider {
    private final ProducerProvider producerProvider;
    private final ProducerRecordBuilderProvider recordBuilderProvider;
    private final PartitionMapProvider partitionKeyMapProvider;

    DefaultProducerFnProvider(ProducerProvider producerProvider, PartitionMapProvider partitionMapProvider, ProducerRecordBuilderProvider producerRecordBuilderProvider) {
        this.producerProvider = producerProvider;
        this.partitionKeyMapProvider = partitionMapProvider;
        this.recordBuilderProvider = producerRecordBuilderProvider;
    }

    @Override // me.ehp246.aufkafka.api.producer.ProducerFnProvider
    public ProducerFn get(ProducerFnProvider.ProducerFnConfig producerFnConfig) {
        Producer<String, String> producer = this.producerProvider.get(producerFnConfig.producerConfigName(), producerFnConfig.producerProperties());
        ProducerRecordBuilder apply = this.recordBuilderProvider.apply(str -> {
            return producer.partitionsFor(str);
        }, this.partitionKeyMapProvider.get(producerFnConfig.partitionMapType()));
        return outboundRecord -> {
            ProducerRecord<String, String> apply2 = apply.apply(outboundRecord);
            CompletableFuture completableFuture = new CompletableFuture();
            producer.send(apply2, (recordMetadata, exc) -> {
                if (exc == null) {
                    completableFuture.complete(new ProducerFn.Sent(apply2, recordMetadata));
                } else {
                    completableFuture.completeExceptionally(exc);
                }
            });
            return completableFuture;
        };
    }
}
