package com.arpnetworking.metrics.common.sources;

import com.arpnetworking.commons.builder.annotations.WovenValidation;
import com.arpnetworking.commons.maven.javassist.Processed;
import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.metrics.common.kafka.ConsumerListener;
import com.arpnetworking.metrics.common.kafka.RunnableConsumer;
import com.arpnetworking.metrics.common.kafka.RunnableConsumerImpl;
import com.arpnetworking.metrics.common.parsers.Parser;
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
import com.arpnetworking.metrics.common.sources.BaseSource;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.mad.model.Record;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Stopwatch;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.sf.oval.ConstraintViolation;
import net.sf.oval.Validator;
import net.sf.oval.constraint.CheckWith;
import net.sf.oval.constraint.CheckWithCheck;
import net.sf.oval.constraint.Min;
import net.sf.oval.constraint.MinCheck;
import net.sf.oval.constraint.NotNull;
import net.sf.oval.constraint.NotNullCheck;
import net.sf.oval.context.FieldContext;
import net.sf.oval.context.OValContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;

/* loaded from: input_file:com/arpnetworking/metrics/common/sources/KafkaSource.class */
public final class KafkaSource<V> extends BaseSource {
    private static final Logger LOGGER;
    private final Consumer<?, V> _consumer;
    private final RunnableConsumer _runnableConsumer;
    private final ExecutorService _consumerExecutor;
    private final ExecutorService _parserExecutor;
    private final Parser<List<Record>, V> _parser;
    private final Logger _logger;
    private final Duration _shutdownAwaitTime;
    private final Duration _backoffTime;
    private final Integer _numWorkerThreads;
    private final BlockingQueue<V> _buffer;
    private final KafkaSource<V>.ParsingWorker _parsingWorker;
    private final PeriodicMetrics _periodicMetrics;
    private final AtomicLong _currentRecordsProcessedCount;
    private final AtomicLong _currentRecordsIngestedCount;
    private final String _parsingTimeMetricName;
    final String _recordsInCountMetricName;
    final String _recordsOutCountMetricName;
    final String _parsingExceptionCountMetricName;
    final String _kafkaExceptionCountMetricName;
    final String _consumerExceptionCountMetricName;
    final String _queueSizeGaugeMetricName;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;

    @Processed({"com.arpnetworking.commons.builder.ValidationProcessor"})
    @WovenValidation
    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/KafkaSource$Builder.class */
    public static final class Builder<V> extends BaseSource.Builder<Builder<V>, KafkaSource<V>> {

        @NotNull
        private Consumer<?, V> _consumer;

        @NotNull
        private Parser<List<Record>, V> _parser;

        @CheckWith(value = PositiveDuration.class, message = "Poll duration must be positive.")
        @NotNull
        private Duration _pollTime;

        @CheckWith(value = PositiveDuration.class, message = "Shutdown await time must be positive.")
        @NotNull
        private Duration _shutdownAwaitTime;

        @CheckWith(value = PositiveDuration.class, message = "Backoff time must be positive.")
        @NotNull
        private Duration _backoffTime;

        @NotNull
        @Min(1.0d)
        private Integer _numWorkerThreads;

        @NotNull
        @Min(1.0d)
        private Integer _bufferSize;

        @NotNull
        @JacksonInject
        private PeriodicMetrics _periodicMetrics;
        private static final NotNullCheck _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_consumer");
        private static final NotNullCheck _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_parser");
        private static final NotNullCheck _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_pollTime");
        private static final CheckWithCheck _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK = new CheckWithCheck();
        private static final OValContext _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT = new FieldContext(Builder.class, "_pollTime");
        private static final NotNullCheck _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_shutdownAwaitTime");
        private static final CheckWithCheck _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK = new CheckWithCheck();
        private static final OValContext _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT = new FieldContext(Builder.class, "_shutdownAwaitTime");
        private static final NotNullCheck _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_backoffTime");
        private static final CheckWithCheck _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK = new CheckWithCheck();
        private static final OValContext _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT = new FieldContext(Builder.class, "_backoffTime");
        private static final NotNullCheck _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_numWorkerThreads");
        private static final MinCheck _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK = new MinCheck();
        private static final OValContext _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK_CONTEXT = new FieldContext(Builder.class, "_numWorkerThreads");
        private static final NotNullCheck _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_bufferSize");
        private static final MinCheck _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK = new MinCheck();
        private static final OValContext _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK_CONTEXT = new FieldContext(Builder.class, "_bufferSize");
        private static final NotNullCheck _PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_periodicMetrics");

        /* loaded from: input_file:com/arpnetworking/metrics/common/sources/KafkaSource$Builder$PositiveDuration.class */
        private static class PositiveDuration implements CheckWithCheck.SimpleCheck {
            private static final long serialVersionUID = 1;

            private PositiveDuration() {
            }

            public boolean isSatisfied(Object obj, Object obj2) {
                return (obj2 instanceof Duration) && !((Duration) obj2).isNegative();
            }
        }

        public Builder() {
            super(builder -> {
                return new KafkaSource(builder, (KafkaSource) null);
            });
            this._shutdownAwaitTime = Duration.ofSeconds(10L);
            this._backoffTime = Duration.ofSeconds(1L);
            this._numWorkerThreads = 1;
            this._bufferSize = 1000;
        }

        public Builder<V> setConsumer(Consumer<?, V> consumer) {
            this._consumer = consumer;
            return this;
        }

        public Builder<V> setParser(Parser<List<Record>, V> parser) {
            this._parser = parser;
            return this;
        }

        public Builder<V> setPollTime(Duration duration) {
            this._pollTime = duration;
            return this;
        }

        public Builder<V> setShutdownAwaitTime(Duration duration) {
            this._shutdownAwaitTime = duration;
            return this;
        }

        public Builder<V> setBackoffTime(Duration duration) {
            this._backoffTime = duration;
            return this;
        }

        public Builder<V> setNumWorkerThreads(Integer num) {
            this._numWorkerThreads = num;
            return this;
        }

        public Builder<V> setBufferSize(Integer num) {
            this._bufferSize = num;
            return this;
        }

        public Builder<V> setPeriodicMetrics(PeriodicMetrics periodicMetrics) {
            this._periodicMetrics = periodicMetrics;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.arpnetworking.metrics.common.sources.BaseSource.Builder
        public Builder<V> self() {
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.arpnetworking.metrics.common.sources.BaseSource.Builder
        public void validate(List list) {
            super.validate(list);
            if (!_CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._consumer, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._consumer, _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._parser, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._parser, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._pollTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._pollTime, _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.isSatisfied(this, this._pollTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK, _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.getMessage(), this, this._pollTime, _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT));
            }
            if (!_SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._shutdownAwaitTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._shutdownAwaitTime, _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.isSatisfied(this, this._shutdownAwaitTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK, _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.getMessage(), this, this._shutdownAwaitTime, _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT));
            }
            if (!_BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._backoffTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._backoffTime, _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.isSatisfied(this, this._backoffTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK, _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.getMessage(), this, this._backoffTime, _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT));
            }
            if (!_NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._numWorkerThreads, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._numWorkerThreads, _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK.isSatisfied(this, this._numWorkerThreads, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK, _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK.getMessage(), this, this._numWorkerThreads, _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK_CONTEXT));
            }
            if (!_BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._bufferSize, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._bufferSize, _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK.isSatisfied(this, this._bufferSize, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK, _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK.getMessage(), this, this._bufferSize, _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK_CONTEXT));
            }
            if (_PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._periodicMetrics, (OValContext) null, (Validator) null)) {
                return;
            }
            list.add(new ConstraintViolation(_PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._periodicMetrics, _PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
        }

        static {
            try {
                _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_consumer").getDeclaredAnnotation(NotNull.class));
                _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_parser").getDeclaredAnnotation(NotNull.class));
                _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_pollTime").getDeclaredAnnotation(NotNull.class));
                _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.configure(Builder.class.getDeclaredField("_pollTime").getDeclaredAnnotation(CheckWith.class));
                _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_shutdownAwaitTime").getDeclaredAnnotation(NotNull.class));
                _SHUTDOWNAWAITTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.configure(Builder.class.getDeclaredField("_shutdownAwaitTime").getDeclaredAnnotation(CheckWith.class));
                _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_backoffTime").getDeclaredAnnotation(NotNull.class));
                _BACKOFFTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.configure(Builder.class.getDeclaredField("_backoffTime").getDeclaredAnnotation(CheckWith.class));
                _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_numWorkerThreads").getDeclaredAnnotation(NotNull.class));
                _NUMWORKERTHREADS_NET_SF_OVAL_CONSTRAINT_MINCHECK.configure(Builder.class.getDeclaredField("_numWorkerThreads").getDeclaredAnnotation(Min.class));
                _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_bufferSize").getDeclaredAnnotation(NotNull.class));
                _BUFFERSIZE_NET_SF_OVAL_CONSTRAINT_MINCHECK.configure(Builder.class.getDeclaredField("_bufferSize").getDeclaredAnnotation(Min.class));
                _PERIODICMETRICS_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_periodicMetrics").getDeclaredAnnotation(NotNull.class));
            } catch (NoSuchFieldException e) {
                throw new RuntimeException("Constraint check configuration error", e);
            }
        }
    }

    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/KafkaSource$LogConsumerListener.class */
    private class LogConsumerListener implements ConsumerListener<V> {
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;

        private LogConsumerListener() {
        }

        @Override // com.arpnetworking.metrics.common.kafka.ConsumerListener
        public void handle(ConsumerRecord<?, V> consumerRecord) {
            try {
                KafkaSource.this._buffer.put(consumerRecord.value());
                KafkaSource.this._currentRecordsIngestedCount.getAndIncrement();
                KafkaSource.this._periodicMetrics.recordGauge(KafkaSource.this._queueSizeGaugeMetricName, KafkaSource.this._buffer.size());
            } catch (InterruptedException e) {
                LogBuilder throwable = KafkaSource.this._logger.info().setMessage("Consumer thread interrupted").addData("source", KafkaSource.this).addData("action", "stopping").setThrowable(e);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, throwable));
                throwable.log();
                KafkaSource.this._runnableConsumer.stop();
            }
        }

        @Override // com.arpnetworking.metrics.common.kafka.ConsumerListener
        public void handle(Throwable th) {
            if (th instanceof InterruptedException) {
                Thread.currentThread().interrupt();
                LogBuilder throwable = KafkaSource.this._logger.info().setMessage("Consumer thread interrupted").addData("source", KafkaSource.this).addData("action", "stopping").setThrowable(th);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, throwable));
                throwable.log();
                KafkaSource.this._runnableConsumer.stop();
                return;
            }
            if (th instanceof KafkaException) {
                KafkaSource.this._periodicMetrics.recordCounter(KafkaSource.this._kafkaExceptionCountMetricName, 1L);
                LogBuilder throwable2 = KafkaSource.this._logger.error().setMessage("Consumer received Kafka Exception").addData("source", KafkaSource.this).addData("action", "sleeping").setThrowable(th);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, throwable2));
                throwable2.log();
                backoff(th);
                return;
            }
            KafkaSource.this._periodicMetrics.recordCounter(KafkaSource.this._consumerExceptionCountMetricName, 1L);
            LogBuilder throwable3 = KafkaSource.this._logger.error().setMessage("Consumer thread error").addData("source", KafkaSource.this).addData("action", "sleeping").setThrowable(th);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, throwable3));
            throwable3.log();
            backoff(th);
        }

        private void backoff(Throwable th) {
            try {
                Thread.sleep(KafkaSource.this._backoffTime.toMillis());
            } catch (InterruptedException unused) {
                Thread.currentThread().interrupt();
                LogBuilder throwable = KafkaSource.this._logger.info().setMessage("Sleep interrupted").addData("source", KafkaSource.this).addData("action", "stopping").setThrowable(th);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, throwable));
                throwable.log();
                KafkaSource.this._runnableConsumer.stop();
            }
        }

        /* synthetic */ LogConsumerListener(KafkaSource kafkaSource, LogConsumerListener logConsumerListener) {
            this();
        }

        static {
            ajc$preClinit();
        }

        private static /* synthetic */ void ajc$preClinit() {
            Factory factory = new Factory("KafkaSource.java", LogConsumerListener.class);
            ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 233);
            ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 247);
            ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 256);
            ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 265);
            ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 281);
        }
    }

    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/KafkaSource$ParsingWorker.class */
    private class ParsingWorker implements Runnable {
        private volatile boolean _isRunning;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;

        private ParsingWorker() {
            this._isRunning = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (!this._isRunning && KafkaSource.this._buffer.isEmpty()) {
                    return;
                }
                try {
                    Object poll = KafkaSource.this._buffer.poll(KafkaSource.this._backoffTime.toMillis(), TimeUnit.MILLISECONDS);
                    KafkaSource.this._periodicMetrics.recordGauge(KafkaSource.this._queueSizeGaugeMetricName, KafkaSource.this._buffer.size());
                    if (poll != null) {
                        try {
                            Stopwatch createStarted = Stopwatch.createStarted();
                            List list = (List) KafkaSource.this._parser.parse(poll);
                            createStarted.stop();
                            KafkaSource.this._periodicMetrics.recordTimer(KafkaSource.this._parsingTimeMetricName, createStarted.elapsed(TimeUnit.NANOSECONDS), Optional.of(TimeUnit.NANOSECONDS));
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                KafkaSource.this.notify((Record) it.next());
                                KafkaSource.this._currentRecordsProcessedCount.getAndIncrement();
                            }
                        } catch (ParsingException e) {
                            KafkaSource.this._periodicMetrics.recordCounter(KafkaSource.this._parsingExceptionCountMetricName, 1L);
                            LogBuilder throwable = KafkaSource.this._logger.error().setMessage("Failed to parse data").setThrowable(e);
                            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, throwable));
                            throwable.log();
                        }
                    }
                } catch (InterruptedException unused) {
                    Thread.currentThread().interrupt();
                    stop();
                }
            }
        }

        public void stop() {
            this._isRunning = false;
        }

        /* synthetic */ ParsingWorker(KafkaSource kafkaSource, ParsingWorker parsingWorker) {
            this();
        }

        static {
            ajc$preClinit();
        }

        private static /* synthetic */ void ajc$preClinit() {
            Factory factory = new Factory("KafkaSource.java", ParsingWorker.class);
            ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 199);
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(KafkaSource.class);
    }

    @Override // com.arpnetworking.metrics.common.sources.Source
    public void start() {
        this._consumerExecutor.execute(this._runnableConsumer);
        for (int i = 0; i < this._numWorkerThreads.intValue(); i++) {
            this._parserExecutor.execute(this._parsingWorker);
        }
    }

    @Override // com.arpnetworking.metrics.common.sources.Source
    public void stop() {
        this._runnableConsumer.stop();
        this._consumerExecutor.shutdown();
        try {
            this._consumerExecutor.awaitTermination(this._shutdownAwaitTime.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LogBuilder throwable = this._logger.warn().setMessage("Unable to shutdown kafka consumer executor").setThrowable(e);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, throwable));
            throwable.log();
        } finally {
            this._consumer.close();
        }
        this._parsingWorker.stop();
        this._parserExecutor.shutdown();
        try {
            this._parserExecutor.awaitTermination(this._shutdownAwaitTime.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
            LogBuilder throwable2 = this._logger.warn().setMessage("Unable to shutdown parsing worker executor").setThrowable(e2);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, throwable2));
            throwable2.log();
        }
    }

    @Override // com.arpnetworking.metrics.common.sources.BaseSource
    @LogValue
    public Object toLogValue() {
        return LogValueMapFactory.builder(this).put("super", super.toLogValue()).put("consumer", this._consumer).build();
    }

    @Override // com.arpnetworking.metrics.common.sources.BaseSource
    public String toString() {
        return toLogValue().toString();
    }

    private KafkaSource(Builder<V> builder) {
        this(builder, LOGGER, new ArrayBlockingQueue(((Builder) builder)._bufferSize.intValue()));
    }

    KafkaSource(Builder<V> builder, Logger logger) {
        this(builder, logger, new ArrayBlockingQueue(((Builder) builder)._bufferSize.intValue()));
    }

    KafkaSource(Builder<V> builder, BlockingQueue<V> blockingQueue) {
        this(builder, LOGGER, blockingQueue);
    }

    private KafkaSource(Builder<V> builder, Logger logger, BlockingQueue<V> blockingQueue) {
        super(builder);
        this._parsingWorker = new ParsingWorker(this, null);
        this._currentRecordsProcessedCount = new AtomicLong(0L);
        this._currentRecordsIngestedCount = new AtomicLong(0L);
        this._parsingTimeMetricName = "sources/kafka/" + getMetricSafeName() + "/parsing_time";
        this._recordsInCountMetricName = "sources/kafka/" + getMetricSafeName() + "/records_in";
        this._recordsOutCountMetricName = "sources/kafka/" + getMetricSafeName() + "/records_out";
        this._parsingExceptionCountMetricName = "sources/kafka/" + getMetricSafeName() + "/parsing_exceptions";
        this._kafkaExceptionCountMetricName = "sources/kafka/" + getMetricSafeName() + "/kafka_exceptions";
        this._consumerExceptionCountMetricName = "sources/kafka/" + getMetricSafeName() + "/consumer_exceptions";
        this._queueSizeGaugeMetricName = "sources/kafka/" + getMetricSafeName() + "/queue_size";
        this._consumer = ((Builder) builder)._consumer;
        this._parser = ((Builder) builder)._parser;
        this._runnableConsumer = (RunnableConsumer) new RunnableConsumerImpl.Builder().setConsumer(((Builder) builder)._consumer).setListener(new LogConsumerListener(this, null)).setPollTime(((Builder) builder)._pollTime).build();
        this._numWorkerThreads = ((Builder) builder)._numWorkerThreads;
        this._consumerExecutor = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "KafkaConsumer");
        });
        this._parserExecutor = Executors.newFixedThreadPool(this._numWorkerThreads.intValue());
        this._shutdownAwaitTime = ((Builder) builder)._shutdownAwaitTime;
        this._backoffTime = ((Builder) builder)._backoffTime;
        this._periodicMetrics = ((Builder) builder)._periodicMetrics;
        this._periodicMetrics.registerPolledMetric(periodicMetrics -> {
            periodicMetrics.recordCounter(this._recordsOutCountMetricName, this._currentRecordsProcessedCount.getAndSet(0L));
        });
        this._periodicMetrics.registerPolledMetric(periodicMetrics2 -> {
            periodicMetrics2.recordCounter(this._recordsInCountMetricName, this._currentRecordsIngestedCount.getAndSet(0L));
        });
        this._logger = logger;
        this._buffer = blockingQueue;
    }

    /* synthetic */ KafkaSource(Builder builder, KafkaSource kafkaSource) {
        this(builder);
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("KafkaSource.java", KafkaSource.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 102);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 115);
    }
}
