package net.uncontended.precipice.samples.kafka;

import net.uncontended.precipice.GuardRail;
import net.uncontended.precipice.Precipice;
import net.uncontended.precipice.concurrent.Eventual;
import net.uncontended.precipice.concurrent.PrecipiceFuture;
import net.uncontended.precipice.factories.Asynchronous;
import net.uncontended.precipice.rejected.Rejected;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:net/uncontended/precipice/samples/kafka/KafkaService.class */
public class KafkaService<K, V> implements Precipice<ProduceStatus, Rejected> {
    private final GuardRail<ProduceStatus, Rejected> guardRail;
    private final KafkaProducer<K, V> producer;

    public KafkaService(GuardRail<ProduceStatus, Rejected> guardRail, KafkaProducer<K, V> kafkaProducer) {
        this.guardRail = guardRail;
        this.producer = kafkaProducer;
    }

    public PrecipiceFuture<ProduceStatus, RecordMetadata> sendRecordAction(ProducerRecord<K, V> producerRecord) {
        final Eventual acquirePermitsAndPromise = Asynchronous.acquirePermitsAndPromise(this.guardRail, 1L);
        this.producer.send(producerRecord, new Callback() { // from class: net.uncontended.precipice.samples.kafka.KafkaService.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    acquirePermitsAndPromise.complete(ProduceStatus.SUCCESS, recordMetadata);
                    return;
                }
                if (exc instanceof TimeoutException) {
                    acquirePermitsAndPromise.completeExceptionally(ProduceStatus.TIMEOUT, exc);
                } else if (exc instanceof NetworkException) {
                    acquirePermitsAndPromise.completeExceptionally(ProduceStatus.NETWORK_EXCEPTION, exc);
                } else {
                    acquirePermitsAndPromise.completeExceptionally(ProduceStatus.OTHER_ERROR, exc);
                }
            }
        });
        return acquirePermitsAndPromise.future();
    }

    public GuardRail<ProduceStatus, Rejected> guardRail() {
        return this.guardRail;
    }

    public void shutdown() {
        shutdown(true);
    }

    public void shutdown(boolean z) {
        if (z) {
            this.producer.close();
        }
    }
}
