package co.elastic.apm.agent.kafka.helper;

import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.jms.JmsInstrumentationHelper;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
import co.elastic.apm.agent.tracer.pooling.Allocator;
import co.elastic.apm.agent.tracer.pooling.ObjectPool;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

/* loaded from: input_file:agent/co/elastic/apm/agent/kafka/helper/KafkaInstrumentationHelper.esclazz */
public class KafkaInstrumentationHelper {
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaInstrumentationHelper.class);
    private static final KafkaInstrumentationHelper INSTANCE = new KafkaInstrumentationHelper(GlobalTracer.get());
    private final ObjectPool<CallbackWrapper> callbackWrapperObjectPool;
    private final Tracer tracer;
    private final MessagingConfiguration messagingConfiguration;

    /* loaded from: input_file:agent/co/elastic/apm/agent/kafka/helper/KafkaInstrumentationHelper$CallbackWrapperAllocator.esclazz */
    private final class CallbackWrapperAllocator implements Allocator<CallbackWrapper> {
        private CallbackWrapperAllocator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.elastic.apm.agent.tracer.pooling.Allocator
        public CallbackWrapper createInstance() {
            return new CallbackWrapper(KafkaInstrumentationHelper.this);
        }
    }

    public static KafkaInstrumentationHelper get() {
        return INSTANCE;
    }

    public KafkaInstrumentationHelper(Tracer tracer) {
        this.tracer = tracer;
        this.messagingConfiguration = (MessagingConfiguration) tracer.getConfig(MessagingConfiguration.class);
        this.callbackWrapperObjectPool = tracer.getObjectPoolFactory().createRecyclableObjectPool(256, new CallbackWrapperAllocator());
    }

    private boolean ignoreTopic(String str) {
        return WildcardMatcher.isAnyMatch(this.messagingConfiguration.getIgnoreMessageQueues(), str);
    }

    @Nullable
    public Span<?> onSendStart(ProducerRecord<?, ?> producerRecord) {
        Span<?> createExitSpan;
        String str = producerRecord.topic();
        if (ignoreTopic(str) || (createExitSpan = this.tracer.currentContext().createExitSpan()) == null) {
            return null;
        }
        ((Span) ((Span) createExitSpan.withType(JmsInstrumentationHelper.MESSAGING_TYPE)).withSubtype("kafka").withAction("send").withName("KafkaProducer#send to ")).appendToName(str);
        createExitSpan.getContext().getMessage().withQueue(str);
        createExitSpan.getContext().getServiceTarget().withType("kafka").withName(str);
        createExitSpan.activate();
        return createExitSpan;
    }

    @Nullable
    public Callback wrapCallback(@Nullable Callback callback, Span<?> span) {
        if (callback instanceof CallbackWrapper) {
            return callback;
        }
        try {
            return this.callbackWrapperObjectPool.createInstance().wrap(callback, span);
        } catch (Throwable th) {
            logger.debug("Failed to wrap Kafka send callback", th);
            return callback;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recycle(CallbackWrapper callbackWrapper) {
        this.callbackWrapperObjectPool.recycle(callbackWrapper);
    }

    public void onSendEnd(Span<?> span, ProducerRecord<?, ?> producerRecord, KafkaProducer<?, ?> kafkaProducer, @Nullable Throwable th) {
        Node leader;
        if (this.messagingConfiguration.shouldCollectQueueAddress()) {
            try {
                List partitionsFor = kafkaProducer.partitionsFor(producerRecord.topic());
                Integer partition = producerRecord.partition();
                PartitionInfo partitionInfo = null;
                if (partition != null) {
                    partitionInfo = (PartitionInfo) partitionsFor.get(partition.intValue());
                } else if (!partitionsFor.isEmpty()) {
                    partitionInfo = (PartitionInfo) partitionsFor.get(0);
                }
                if (partitionInfo != null && (leader = partitionInfo.leader()) != null) {
                    span.getContext().getDestination().withAddress(leader.host()).withPort(leader.port());
                }
            } catch (Exception e) {
                logger.error("Failed to get Kafka producer's destination", (Throwable) e);
            }
        }
        span.captureException(th);
        span.deactivate();
    }
}
