package com.arpnetworking.metrics.common.sources;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.io.Tcp;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
import com.arpnetworking.commons.builder.annotations.WovenValidation;
import com.arpnetworking.commons.maven.javassist.Processed;
import com.arpnetworking.metrics.common.parsers.Parser;
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
import com.arpnetworking.metrics.common.sources.BaseTcpSource;
import com.arpnetworking.metrics.mad.model.Record;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import net.sf.oval.ConstraintViolation;
import net.sf.oval.Validator;
import net.sf.oval.constraint.NotNull;
import net.sf.oval.constraint.NotNullCheck;
import net.sf.oval.context.FieldContext;
import net.sf.oval.context.OValContext;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;

/* loaded from: input_file:com/arpnetworking/metrics/common/sources/TcpLineSource.class */
public final class TcpLineSource extends BaseTcpSource {
    private final Parser<List<Record>, ByteBuffer> _parser;
    private static final Logger LOGGER = LoggerFactory.getLogger(TcpLineSource.class);

    @Processed({"com.arpnetworking.commons.builder.ValidationProcessor"})
    @WovenValidation
    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/TcpLineSource$Builder.class */
    public static final class Builder extends BaseTcpSource.Builder<Builder, TcpLineSource> {

        @NotNull
        private Parser<List<Record>, ByteBuffer> _parser;
        private static final NotNullCheck _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_parser");

        public Builder() {
            super(builder -> {
                return new TcpLineSource(builder, null);
            });
        }

        public Builder setParser(Parser<List<Record>, ByteBuffer> parser) {
            this._parser = parser;
            return self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.arpnetworking.metrics.common.sources.BaseSource.Builder
        public Builder self() {
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.arpnetworking.metrics.common.sources.BaseTcpSource.Builder, com.arpnetworking.metrics.common.sources.ActorSource.Builder, com.arpnetworking.metrics.common.sources.BaseSource.Builder
        public void validate(List list) {
            super.validate(list);
            if (_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._parser, (OValContext) null, (Validator) null)) {
                return;
            }
            list.add(new ConstraintViolation(_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._parser, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
        }

        static {
            try {
                _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_parser").getDeclaredAnnotation(NotNull.class));
            } catch (NoSuchFieldException e) {
                throw new RuntimeException("Constraint check configuration error", e);
            }
        }
    }

    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/TcpLineSource$TcpListenerActor.class */
    static final class TcpListenerActor extends BaseTcpSource.BaseTcpListenerActor {
        static Props props(TcpLineSource tcpLineSource) {
            return Props.create(TcpListenerActor.class, new Object[]{tcpLineSource});
        }

        @Override // com.arpnetworking.metrics.common.sources.BaseTcpSource.BaseTcpListenerActor
        protected ActorRef createHandler(BaseTcpSource baseTcpSource, Tcp.Connected connected) {
            return getContext().actorOf(Props.create(TcpRequestHandlerActor.class, new Object[]{getSink(), connected.remoteAddress()}));
        }

        TcpListenerActor(TcpLineSource tcpLineSource) {
            super(tcpLineSource);
        }
    }

    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/TcpLineSource$TcpRequestHandlerActor.class */
    static final class TcpRequestHandlerActor extends AbstractActor {
        private final ByteStringBuilder _buffer = new ByteStringBuilder();
        private final TcpLineSource _sink;
        private final InetSocketAddress _remoteAddress;
        private static final Logger BAD_REQUEST_LOGGER;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
        private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;

        static {
            ajc$preClinit();
            BAD_REQUEST_LOGGER = LoggerFactory.getRateLimitLogger(TcpLineSource.class, Duration.ofSeconds(30L));
        }

        TcpRequestHandlerActor(TcpLineSource tcpLineSource, InetSocketAddress inetSocketAddress) {
            this._sink = tcpLineSource;
            this._remoteAddress = inetSocketAddress;
        }

        public AbstractActor.Receive createReceive() {
            return receiveBuilder().match(Tcp.Received.class, this::tcpReceived).match(Tcp.ConnectionClosed.class, connectionClosed -> {
                getContext().stop(getSelf());
                LogBuilder addData = TcpLineSource.LOGGER.debug().setMessage("Tcp connection close").addData("name", this._sink.getName()).addData("remoteAddress", this._remoteAddress.getAddress().getHostAddress()).addData("remotePort", Integer.valueOf(this._remoteAddress.getPort()));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addData));
                addData.log();
            }).build();
        }

        private void tcpReceived(Tcp.Received received) {
            ByteString data = received.data();
            LogBuilder addData = TcpLineSource.LOGGER.trace().setMessage("Tcp data received").addData("name", this._sink.getName()).addData("remoteAddress", this._remoteAddress.getAddress().getHostAddress()).addData("remotePort", Integer.valueOf(this._remoteAddress.getPort())).addData("data", data);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
            addData.log();
            try {
                processData(data);
            } catch (RuntimeException e) {
                LogBuilder throwable = BAD_REQUEST_LOGGER.warn().setMessage("Error processing data").addData("name", this._sink.getName()).addData("remoteAddress", this._remoteAddress.getAddress().getHostAddress()).addData("remotePort", Integer.valueOf(this._remoteAddress.getPort())).addData("data", data).setThrowable(e);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, throwable));
                throwable.log();
            }
        }

        private void processData(ByteString byteString) {
            int i = 0;
            int indexOf = byteString.indexOf('\n');
            while (true) {
                int i2 = indexOf;
                if (i2 < 0) {
                    break;
                }
                this._buffer.append(byteString.slice(i, i2));
                processRecords(this._buffer.result());
                this._buffer.clear();
                if (i2 + 1 < byteString.size()) {
                    i = i2 + 1;
                    indexOf = byteString.indexOf('\n', i);
                } else {
                    i = -1;
                    indexOf = -1;
                }
            }
            if (i >= 0) {
                this._buffer.append(byteString.slice(i, byteString.size()));
            }
        }

        private void processRecords(ByteString byteString) {
            try {
                List<Record> parse = this._sink.getParser().parse(byteString.toByteBuffer());
                LogBuilder addData = TcpLineSource.LOGGER.trace().setMessage("Parsed records").addData("name", this._sink.getName()).addData("records", Integer.valueOf(parse.size())).addData("remoteAddress", this._remoteAddress.getAddress().getHostAddress()).addData("remotePort", Integer.valueOf(this._remoteAddress.getPort()));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData));
                addData.log();
                TcpLineSource tcpLineSource = this._sink;
                tcpLineSource.getClass();
                parse.forEach((v1) -> {
                    r1.notify(v1);
                });
            } catch (ParsingException e) {
                LogBuilder throwable = BAD_REQUEST_LOGGER.warn().setMessage("Error processing records").addData("name", this._sink.getName()).addData("remoteAddress", this._remoteAddress.getAddress().getHostAddress()).addData("remotePort", Integer.valueOf(this._remoteAddress.getPort())).setThrowable(e);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, throwable));
                throwable.log();
            }
        }

        private static /* synthetic */ void ajc$preClinit() {
            Factory factory = new Factory("TcpLineSource.java", TcpRequestHandlerActor.class);
            ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 135);
            ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 149);
            ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 195);
            ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 205);
            ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 121);
        }
    }

    @Override // com.arpnetworking.metrics.common.sources.ActorSource
    protected Props createProps() {
        return TcpListenerActor.props(this);
    }

    Parser<List<Record>, ByteBuffer> getParser() {
        return this._parser;
    }

    private TcpLineSource(Builder builder) {
        super(builder);
        this._parser = builder._parser;
    }

    /* synthetic */ TcpLineSource(Builder builder, TcpLineSource tcpLineSource) {
        this(builder);
    }
}
