package com.expediagroup.rhapsody.kafka.acknowledgement;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.kafka.factory.AcknowledgeableConsumerRecordFactory;
import java.util.Map;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverRecord;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/expediagroup/rhapsody/kafka/acknowledgement/AbstractReceiverAcknowledgementStrategy.class */
public abstract class AbstractReceiverAcknowledgementStrategy implements ReceiverAcknowledgementStrategy {
    @Override // com.expediagroup.rhapsody.kafka.acknowledgement.ReceiverAcknowledgementStrategy
    public final <K, V> Function<? super Publisher<ReceiverRecord<K, V>>, ? extends Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> createRecordTransformer(Map<String, ?> map) {
        AcknowledgeableConsumerRecordFactory create = AcknowledgeableConsumerRecordFactory.create(map);
        long longValue = ReceiverAcknowledgementStrategy.loadMaxInFlightPerTopicPartition(map).orElse(Long.MAX_VALUE).longValue();
        return publisher -> {
            return Flux.defer(() -> {
                return transform(publisher, create, longValue);
            });
        };
    }

    protected final <K, V> Publisher<Acknowledgeable<ConsumerRecord<K, V>>> transform(Publisher<? extends ReceiverRecord<K, V>> publisher, AcknowledgeableConsumerRecordFactory<K, V> acknowledgeableConsumerRecordFactory, long j) {
        EmitterProcessor create = EmitterProcessor.create();
        FluxSink sink = create.sink();
        return Flux.from(publisher).map(receiverRecord -> {
            ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
            receiverOffset.getClass();
            Runnable runnable = receiverOffset::acknowledge;
            sink.getClass();
            return acknowledgeableConsumerRecordFactory.create(receiverRecord, runnable, sink::error);
        }).mergeWith(create).transform(createOperator(j));
    }

    protected abstract <K, V> UnaryOperator<Publisher<Acknowledgeable<ConsumerRecord<K, V>>>> createOperator(long j);
}
