package com.arpnetworking.metrics.common.sources;

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.FanInShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Broadcast;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Zip;
import akka.util.ByteString;
import com.arpnetworking.commons.builder.annotations.WovenValidation;
import com.arpnetworking.commons.maven.javassist.Processed;
import com.arpnetworking.http.RequestReply;
import com.arpnetworking.metrics.common.parsers.Parser;
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
import com.arpnetworking.metrics.common.sources.ActorSource;
import com.arpnetworking.metrics.mad.model.HttpRequest;
import com.arpnetworking.metrics.mad.model.Record;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.google.common.collect.ImmutableMultimap;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import net.sf.oval.ConstraintViolation;
import net.sf.oval.Validator;
import net.sf.oval.constraint.NotNull;
import net.sf.oval.constraint.NotNullCheck;
import net.sf.oval.context.FieldContext;
import net.sf.oval.context.OValContext;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;

/* loaded from: input_file:com/arpnetworking/metrics/common/sources/HttpSource.class */
public class HttpSource extends ActorSource {
    private final Parser<List<Record>, HttpRequest> _parser;

    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/HttpSource$Actor.class */
    static final class Actor extends AbstractActor {
        private final Sink<Record, CompletionStage<Done>> _sink;
        private final Parser<List<Record>, HttpRequest> _parser;
        private final Materializer _materializer = ActorMaterializer.create(ActorMaterializerSettings.create(context().system()).withSupervisionStrategy(Supervision.stoppingDecider()), context());
        private final Graph<FlowShape<akka.http.javadsl.model.HttpRequest, Record>, NotUsed> _processGraph = GraphDSL.create(builder -> {
            Flow named = Flow.create().map(httpRequest -> {
                return httpRequest.entity();
            }).flatMapConcat(requestEntity -> {
                return requestEntity.getDataBytes();
            }).reduce((byteString, byteString2) -> {
                return byteString.concat(byteString2);
            }).named("getBody");
            Flow named2 = Flow.create().map(httpRequest2 -> {
                return httpRequest2.getHeaders();
            }).map(iterable -> {
                return createHeaderMultimap(iterable);
            }).named("getHeaders");
            Flow named3 = Flow.create().map(pair -> {
                return mapModel(pair);
            }).mapConcat(httpRequest3 -> {
                return parseRecords(httpRequest3);
            }).named("createAndParseRequest");
            UniformFanOutShape add = builder.add(Broadcast.create(2));
            FlowShape add2 = builder.add(named);
            FlowShape add3 = builder.add(named2);
            FanInShape2 add4 = builder.add(Zip.create());
            FlowShape add5 = builder.add(named3);
            builder.from(add.out(0)).via(add2).toInlet(add4.in0());
            builder.from(add.out(1)).via(add3).toInlet(add4.in1());
            builder.from(add4.out()).toInlet(add5.in());
            return new FlowShape(add.in(), add5.out());
        });
        private static final Logger BAD_REQUEST_LOGGER;
        private static final JoinPoint.StaticPart ajc$tjp_0 = null;

        static {
            ajc$preClinit();
            BAD_REQUEST_LOGGER = LoggerFactory.getRateLimitLogger(HttpSource.class, Duration.ofSeconds(30L));
        }

        static Props props(HttpSource httpSource) {
            return Props.create(Actor.class, new Object[]{httpSource});
        }

        public AbstractActor.Receive createReceive() {
            return receiveBuilder().match(RequestReply.class, requestReply -> {
                ((CompletionStage) akka.stream.javadsl.Source.single(requestReply.getRequest()).via(this._processGraph).toMat(this._sink, Keep.right()).run(this._materializer)).whenComplete((done, th) -> {
                    CompletableFuture<HttpResponse> response = requestReply.getResponse();
                    if (th == null) {
                        response.complete(HttpResponse.create().withStatus(200));
                        return;
                    }
                    LogBuilder throwable = BAD_REQUEST_LOGGER.warn().setMessage("Error handling http post").setThrowable(th);
                    LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, (Object) null, throwable));
                    throwable.log();
                    if (th instanceof ParsingException) {
                        response.complete(HttpResponse.create().withStatus(400));
                    } else {
                        response.complete(HttpResponse.create().withStatus(500));
                    }
                });
            }).build();
        }

        Actor(HttpSource httpSource) {
            this._parser = httpSource._parser;
            this._sink = Sink.foreach(record -> {
                httpSource.notify(record);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static ImmutableMultimap<String, String> createHeaderMultimap(Iterable<HttpHeader> iterable) {
            ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
            for (HttpHeader httpHeader : iterable) {
                builder.put(httpHeader.lowercaseName(), httpHeader.value());
            }
            return builder.build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static HttpRequest mapModel(Pair<ByteString, ImmutableMultimap<String, String>> pair) {
            return new HttpRequest((ImmutableMultimap) pair.second(), (ByteString) pair.first());
        }

        private List<Record> parseRecords(HttpRequest httpRequest) throws ParsingException {
            return this._parser.parse(httpRequest);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1647345004:
                    if (implMethodName.equals("lambda$1")) {
                        z = false;
                        break;
                    }
                    break;
                case -1647345003:
                    if (implMethodName.equals("lambda$2")) {
                        z = true;
                        break;
                    }
                    break;
                case -1647345001:
                    if (implMethodName.equals("lambda$4")) {
                        z = 2;
                        break;
                    }
                    break;
                case -1647345000:
                    if (implMethodName.equals("lambda$5")) {
                        z = 3;
                        break;
                    }
                    break;
                case -1647344999:
                    if (implMethodName.equals("lambda$6")) {
                        z = 4;
                        break;
                    }
                    break;
                case -1647344998:
                    if (implMethodName.equals("lambda$7")) {
                        z = 5;
                        break;
                    }
                    break;
                case -1647344997:
                    if (implMethodName.equals("lambda$8")) {
                        z = 6;
                        break;
                    }
                    break;
                case -1647344996:
                    if (implMethodName.equals("lambda$9")) {
                        z = 7;
                        break;
                    }
                    break;
                case 471912476:
                    if (implMethodName.equals("lambda$10")) {
                        z = 8;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lcom/arpnetworking/metrics/common/sources/HttpSource;Lcom/arpnetworking/metrics/mad/model/Record;)V")) {
                        HttpSource httpSource = (HttpSource) serializedLambda.getCapturedArg(0);
                        return record -> {
                            httpSource.notify(record);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/FlowShape;")) {
                        Actor actor = (Actor) serializedLambda.getCapturedArg(0);
                        return builder -> {
                            Flow named = Flow.create().map(httpRequest -> {
                                return httpRequest.entity();
                            }).flatMapConcat(requestEntity -> {
                                return requestEntity.getDataBytes();
                            }).reduce((byteString, byteString2) -> {
                                return byteString.concat(byteString2);
                            }).named("getBody");
                            Flow named2 = Flow.create().map(httpRequest2 -> {
                                return httpRequest2.getHeaders();
                            }).map(iterable -> {
                                return createHeaderMultimap(iterable);
                            }).named("getHeaders");
                            Flow named3 = Flow.create().map(pair -> {
                                return mapModel(pair);
                            }).mapConcat(httpRequest3 -> {
                                return parseRecords(httpRequest3);
                            }).named("createAndParseRequest");
                            UniformFanOutShape add = builder.add(Broadcast.create(2));
                            FlowShape add2 = builder.add(named);
                            FlowShape add3 = builder.add(named2);
                            FanInShape2 add4 = builder.add(Zip.create());
                            FlowShape add5 = builder.add(named3);
                            builder.from(add.out(0)).via(add2).toInlet(add4.in0());
                            builder.from(add.out(1)).via(add3).toInlet(add4.in1());
                            builder.from(add4.out()).toInlet(add5.in());
                            return new FlowShape(add.in(), add5.out());
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/HttpRequest;)Lakka/http/javadsl/model/RequestEntity;")) {
                        return httpRequest -> {
                            return httpRequest.entity();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/RequestEntity;)Lakka/stream/Graph;")) {
                        return requestEntity -> {
                            return requestEntity.getDataBytes();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/util/ByteString;Lakka/util/ByteString;)Lakka/util/ByteString;")) {
                        return (byteString, byteString2) -> {
                            return byteString.concat(byteString2);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/HttpRequest;)Ljava/lang/Iterable;")) {
                        return httpRequest2 -> {
                            return httpRequest2.getHeaders();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Lcom/google/common/collect/ImmutableMultimap;")) {
                        return iterable -> {
                            return createHeaderMultimap(iterable);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/Pair;)Lcom/arpnetworking/metrics/mad/model/HttpRequest;")) {
                        return pair -> {
                            return mapModel(pair);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/common/sources/HttpSource$Actor") && serializedLambda.getImplMethodSignature().equals("(Lcom/arpnetworking/metrics/mad/model/HttpRequest;)Ljava/lang/Iterable;")) {
                        Actor actor2 = (Actor) serializedLambda.getCapturedArg(0);
                        return httpRequest3 -> {
                            return parseRecords(httpRequest3);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }

        private static void ajc$preClinit() {
            Factory factory = new Factory("HttpSource.java", Actor.class);
            ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 112);
        }
    }

    @Processed({"com.arpnetworking.commons.builder.ValidationProcessor"})
    @WovenValidation
    /* loaded from: input_file:com/arpnetworking/metrics/common/sources/HttpSource$Builder.class */
    public static abstract class Builder<B extends Builder<B, S>, S extends HttpSource> extends ActorSource.Builder<B, S> {

        @NotNull
        private Parser<List<Record>, HttpRequest> _parser;
        private static final NotNullCheck _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK = new NotNullCheck();
        private static final OValContext _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT = new FieldContext(Builder.class, "_parser");

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(Function<B, S> function) {
            super(function);
        }

        public B setParser(Parser<List<Record>, HttpRequest> parser) {
            this._parser = parser;
            return (B) self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.arpnetworking.metrics.common.sources.ActorSource.Builder, com.arpnetworking.metrics.common.sources.BaseSource.Builder
        public void validate(List list) {
            super.validate(list);
            if (_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.isSatisfied(this, this._parser, (OValContext) null, (Validator) null)) {
                return;
            }
            list.add(new ConstraintViolation(_PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.getMessage(), this, this._parser, _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK_CONTEXT));
        }

        static {
            try {
                _PARSER_NET_SF_OVAL_CONSTRAINT_NOTNULLCHECK.configure(Builder.class.getDeclaredField("_parser").getDeclaredAnnotation(NotNull.class));
            } catch (NoSuchFieldException e) {
                throw new RuntimeException("Constraint check configuration error", e);
            }
        }
    }

    @Override // com.arpnetworking.metrics.common.sources.ActorSource
    protected Props createProps() {
        return Actor.props(this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HttpSource(Builder<?, ? extends HttpSource> builder) {
        super(builder);
        this._parser = ((Builder) builder)._parser;
    }
}
