package com.expediagroup.rhapsody.kafka.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.Headed;
import com.expediagroup.rhapsody.kafka.record.RecordHeaderConversion;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.SenderResult;

/* loaded from: input_file:com/expediagroup/rhapsody/kafka/factory/KafkaValueSenderFactory.class */
public class KafkaValueSenderFactory<V> extends KafkaSenderFactory<Object, V> {
    public KafkaValueSenderFactory(KafkaConfigFactory kafkaConfigFactory) {
        super(kafkaConfigFactory);
    }

    public Flux<Acknowledgeable<SenderResult<V>>> sendAcknowledgeableValues(Publisher<Acknowledgeable<V>> publisher, Function<V, String> function, Function<V, ?> function2) {
        return sendAcknowledgeable(Flux.from(publisher).map(acknowledgeable -> {
            return createAcknowledgeableProducerRecord(acknowledgeable, function, function2);
        }));
    }

    public Flux<SenderResult<V>> sendValues(Publisher<V> publisher, Function<V, String> function, Function<V, ?> function2) {
        return send(Flux.from(publisher).map(obj -> {
            return new ProducerRecord((String) function.apply(obj), (Integer) null, function2.apply(obj), obj, extractHeaders(obj));
        }));
    }

    protected Acknowledgeable<ProducerRecord<Object, V>> createAcknowledgeableProducerRecord(Acknowledgeable<V> acknowledgeable, Function<V, String> function, Function<V, ?> function2) {
        return acknowledgeable.map(obj -> {
            return new ProducerRecord((String) function.apply(obj), (Integer) null, function2.apply(obj), obj, createHeaders((Headed) acknowledgeable));
        });
    }

    protected List<Header> extractHeaders(V v) {
        return (List) Headed.tryCast(v).map(KafkaValueSenderFactory::createHeaders).orElseGet(Collections::emptyList);
    }

    protected static List<Header> createHeaders(Headed headed) {
        return createHeaders((Map<String, String>) headed.header().toMap());
    }

    protected static List<Header> createHeaders(Map<String, String> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return RecordHeaderConversion.toHeader((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toList());
    }
}
