package com.arpnetworking.metrics.common.kafka;

import com.arpnetworking.commons.builder.OvalBuilder;
import com.arpnetworking.commons.builder.annotations.WovenValidation;
import com.arpnetworking.commons.maven.javassist.Processed;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import net.sf.oval.ConstraintViolation;
import net.sf.oval.Validator;
import net.sf.oval.constraint.CheckWith;
import net.sf.oval.constraint.CheckWithCheck;
import net.sf.oval.constraint.NotNull;
import net.sf.oval.constraint.NotNullCheck;
import net.sf.oval.context.FieldContext;
import net.sf.oval.context.OValContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;

/* loaded from: input_file:com/arpnetworking/metrics/common/kafka/RunnableConsumerImpl.class */
public class RunnableConsumerImpl<V> implements RunnableConsumer {
    private static final Logger LOGGER;
    private final ConsumerListener<V> _listener;
    private final Consumer<?, V> _consumer;
    private final Duration _pollTime;
    private volatile boolean _isRunning = true;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;

    @Processed({"com.arpnetworking.commons.builder.ValidationProcessor"})
    @WovenValidation
    /* loaded from: input_file:com/arpnetworking/metrics/common/kafka/RunnableConsumerImpl$Builder.class */
    public static final class Builder<V> extends OvalBuilder<RunnableConsumerImpl<V>> {

        @NotNull
        private Consumer<?, V> _consumer;

        @NotNull
        private ConsumerListener<V> _listener;

        @CheckWith(value = PositiveDuration.class, message = "Poll duration must be positive.")
        @NotNull
        private Duration _pollTime;
        private static final NotNullCheck _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_consumer");
        private static final NotNullCheck _LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_listener");
        private static final NotNullCheck _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_pollTime");
        private static final CheckWithCheck _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK = new CheckWithCheck();
        private static final OValContext _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT = new FieldContext(Builder.class, "_pollTime");

        /* loaded from: input_file:com/arpnetworking/metrics/common/kafka/RunnableConsumerImpl$Builder$PositiveDuration.class */
        private static class PositiveDuration implements CheckWithCheck.SimpleCheck {
            private static final long serialVersionUID = 1;

            private PositiveDuration() {
            }

            public boolean isSatisfied(Object obj, Object obj2) {
                return (obj2 instanceof Duration) && !((Duration) obj2).isNegative();
            }
        }

        public Builder() {
            super(RunnableConsumerImpl::new);
        }

        public Builder<V> setListener(ConsumerListener<V> consumerListener) {
            this._listener = consumerListener;
            return this;
        }

        public Builder<V> setConsumer(Consumer<?, V> consumer) {
            this._consumer = consumer;
            return this;
        }

        public Builder<V> setPollTime(Duration duration) {
            this._pollTime = duration;
            return this;
        }

        protected void validate(List list) {
            if (!_CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._consumer, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._consumer, _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._listener, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._listener, _LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (!_POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._pollTime, (OValContext) null, (Validator) null)) {
                list.add(new ConstraintViolation(_POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._pollTime, _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
            }
            if (_POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.isSatisfied(this, this._pollTime, (OValContext) null, (Validator) null)) {
                return;
            }
            list.add(new ConstraintViolation(_POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK, _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.getMessage(), this, this._pollTime, _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK_CONTEXT));
        }

        static {
            try {
                _CONSUMER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_consumer").getDeclaredAnnotation(NotNull.class));
                _LISTENER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_listener").getDeclaredAnnotation(NotNull.class));
                _POLLTIME_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_pollTime").getDeclaredAnnotation(NotNull.class));
                _POLLTIME_NET_SF_OVAL_CONSTRAINT_CHECKWITHCHECK.configure(Builder.class.getDeclaredField("_pollTime").getDeclaredAnnotation(CheckWith.class));
            } catch (NoSuchFieldException e) {
                throw new RuntimeException("Constraint check configuration error", e);
            }
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(RunnableConsumerImpl.class);
    }

    RunnableConsumerImpl(Builder<V> builder) {
        this._consumer = ((Builder) builder)._consumer;
        this._listener = ((Builder) builder)._listener;
        this._pollTime = ((Builder) builder)._pollTime;
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setUncaughtExceptionHandler((thread, th) -> {
            LogBuilder throwable = LOGGER.error().setMessage("Unhandled exception").setThrowable(th);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, (Object) null, throwable));
            throwable.log();
        });
        while (isRunning()) {
            try {
                Iterator it = this._consumer.poll(this._pollTime).iterator();
                while (it.hasNext()) {
                    this._listener.handle((ConsumerRecord<?, V>) it.next());
                }
                this._consumer.commitSync();
            } catch (Exception e) {
                this._listener.handle(e);
            }
        }
    }

    @Override // com.arpnetworking.metrics.common.kafka.RunnableConsumer
    public void stop() {
        this._isRunning = false;
    }

    protected boolean isRunning() {
        return this._isRunning;
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("RunnableConsumerImpl.java", RunnableConsumerImpl.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 60);
    }
}
