package co.elastic.apm.agent.kafka.helper;

import co.elastic.apm.agent.configuration.MessagingConfiguration;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.GlobalTracer;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.matcher.WildcardMatcher;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.jctools.queues.atomic.AtomicQueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;

/* loaded from: input_file:co/elastic/apm/agent/kafka/helper/KafkaInstrumentationHelper.class */
public class KafkaInstrumentationHelper {
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaInstrumentationHelper.class);
    private static final KafkaInstrumentationHelper INSTANCE = new KafkaInstrumentationHelper(GlobalTracer.requireTracerImpl());
    private final ObjectPool<CallbackWrapper> callbackWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(ConcurrentQueueSpec.createBoundedMpmc(256)), false, new CallbackWrapperAllocator());
    private final ElasticApmTracer tracer;
    private final MessagingConfiguration messagingConfiguration;

    /* loaded from: input_file:co/elastic/apm/agent/kafka/helper/KafkaInstrumentationHelper$CallbackWrapperAllocator.class */
    private final class CallbackWrapperAllocator implements Allocator<CallbackWrapper> {
        private CallbackWrapperAllocator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.elastic.apm.agent.objectpool.Allocator
        public CallbackWrapper createInstance() {
            return new CallbackWrapper(KafkaInstrumentationHelper.this);
        }
    }

    public static KafkaInstrumentationHelper get() {
        return INSTANCE;
    }

    public KafkaInstrumentationHelper(ElasticApmTracer elasticApmTracer) {
        this.tracer = elasticApmTracer;
        this.messagingConfiguration = (MessagingConfiguration) elasticApmTracer.getConfig(MessagingConfiguration.class);
    }

    private boolean ignoreTopic(String str) {
        return WildcardMatcher.isAnyMatch(this.messagingConfiguration.getIgnoreMessageQueues(), str);
    }

    @Nullable
    public Span onSendStart(ProducerRecord<?, ?> producerRecord) {
        Span createExitChildSpan;
        String str = producerRecord.topic();
        if (ignoreTopic(str) || (createExitChildSpan = this.tracer.createExitChildSpan()) == null) {
            return null;
        }
        createExitChildSpan.withType("messaging").withSubtype("kafka").withAction("send").withName("KafkaProducer#send to ").appendToName(str);
        createExitChildSpan.getContext().getMessage().withQueue(str);
        createExitChildSpan.getContext().getServiceTarget().withType("kafka").withName(str);
        createExitChildSpan.activate();
        return createExitChildSpan;
    }

    @Nullable
    public Callback wrapCallback(@Nullable Callback callback, Span span) {
        if (callback instanceof CallbackWrapper) {
            return callback;
        }
        try {
            return this.callbackWrapperObjectPool.createInstance().wrap(callback, span);
        } catch (Throwable th) {
            logger.debug("Failed to wrap Kafka send callback", th);
            return callback;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recycle(CallbackWrapper callbackWrapper) {
        this.callbackWrapperObjectPool.recycle(callbackWrapper);
    }

    public void onSendEnd(Span span, ProducerRecord<?, ?> producerRecord, KafkaProducer<?, ?> kafkaProducer, @Nullable Throwable th) {
        Node leader;
        if (this.messagingConfiguration.shouldCollectQueueAddress()) {
            try {
                List partitionsFor = kafkaProducer.partitionsFor(producerRecord.topic());
                Integer partition = producerRecord.partition();
                PartitionInfo partitionInfo = null;
                if (partition != null) {
                    partitionInfo = (PartitionInfo) partitionsFor.get(partition.intValue());
                } else if (!partitionsFor.isEmpty()) {
                    partitionInfo = (PartitionInfo) partitionsFor.get(0);
                }
                if (partitionInfo != null && (leader = partitionInfo.leader()) != null) {
                    span.getContext().getDestination().withAddress(leader.host()).withPort(leader.port());
                }
            } catch (Exception e) {
                logger.error("Failed to get Kafka producer's destination", (Throwable) e);
            }
        }
        span.captureException(th);
        span.deactivate();
    }
}
