package com.arpnetworking.tsdcore.sinks;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.pattern.PatternsCS;
import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.tsdcore.model.PeriodicData;
import com.arpnetworking.tsdcore.model.RequestEntry;
import com.google.common.base.Charsets;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor.class */
public class HttpPostSinkActor extends AbstractActor {
    private int _inflightRequestsCount = 0;
    private long _postRequests = 0;
    private boolean _waiting = false;
    private final int _maximumConcurrency;
    private final EvictingQueue<RequestEntry> _pendingRequests;
    private final AsyncHttpClient _client;
    private final HttpPostSink _sink;
    private final int _spreadingDelayMillis;
    private final PeriodicMetrics _periodicMetrics;
    private final ImmutableSet<Integer> _acceptedStatusCodes;
    private final String _evictedRequestsName;
    private final String _requestLatencyName;
    private final String _inQueueLatencyName;
    private final String _requestSuccessName;
    private final String _responseStatusName;
    private final String _samplesDroppedName;
    private final String _samplesSentName;
    private static final Logger LOGGER;
    private static final Logger POST_ERROR_LOGGER;
    private static final Logger EVICTED_LOGGER;
    private static final ImmutableList<Integer> STATUS_CLASSES;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_5;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_6;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_7;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_8;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_9;

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$EmitAggregation.class */
    public static final class EmitAggregation {
        private final PeriodicData _data;

        /* JADX INFO: Access modifiers changed from: package-private */
        public EmitAggregation(PeriodicData periodicData) {
            this._data = periodicData;
        }

        public PeriodicData getData() {
            return this._data;
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$PostFailure.class */
    private static final class PostFailure {
        private final Request _request;
        private final Throwable _throwable;

        private PostFailure(Request request, Throwable th) {
            this._request = request;
            this._throwable = th;
        }

        public Request getRequest() {
            return this._request;
        }

        public Throwable getCause() {
            return this._throwable;
        }

        /* synthetic */ PostFailure(Request request, Throwable th, PostFailure postFailure) {
            this(request, th);
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$PostRejected.class */
    private static final class PostRejected {
        private final Request _request;
        private final Response _response;

        private PostRejected(Request request, Response response) {
            this._request = request;
            this._response = response;
        }

        public Request getRequest() {
            return this._request;
        }

        public Response getResponse() {
            return this._response;
        }

        /* synthetic */ PostRejected(Request request, Response response, PostRejected postRejected) {
            this(request, response);
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$PostSuccess.class */
    private static final class PostSuccess {
        private final Response _response;

        private PostSuccess(Response response) {
            this._response = response;
        }

        public Response getResponse() {
            return this._response;
        }

        /* synthetic */ PostSuccess(Response response, PostSuccess postSuccess) {
            this(response);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$ResponseAsyncCompletionHandler.class */
    public static final class ResponseAsyncCompletionHandler extends AsyncCompletionHandler<Response> {
        private final CompletableFuture<Response> _promise;

        ResponseAsyncCompletionHandler(CompletableFuture<Response> completableFuture) {
            this._promise = completableFuture;
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public Response m157onCompleted(Response response) {
            this._promise.complete(response);
            return response;
        }

        public void onThrowable(Throwable th) {
            this._promise.completeExceptionally(th);
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/HttpPostSinkActor$WaitTimeExpired.class */
    private static final class WaitTimeExpired {
        private static final WaitTimeExpired INSTANCE = new WaitTimeExpired();

        public static WaitTimeExpired getInstance() {
            return INSTANCE;
        }

        private WaitTimeExpired() {
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(HttpPostSinkActor.class);
        POST_ERROR_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30L));
        EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30L));
        STATUS_CLASSES = ImmutableList.of(2, 3, 4, 5);
    }

    public static Props props(AsyncHttpClient asyncHttpClient, HttpPostSink httpPostSink, int i, int i2, Duration duration, PeriodicMetrics periodicMetrics) {
        return Props.create(HttpPostSinkActor.class, new Object[]{asyncHttpClient, httpPostSink, Integer.valueOf(i), Integer.valueOf(i2), duration, periodicMetrics});
    }

    public HttpPostSinkActor(AsyncHttpClient asyncHttpClient, HttpPostSink httpPostSink, int i, int i2, Duration duration, PeriodicMetrics periodicMetrics) {
        this._client = asyncHttpClient;
        this._sink = httpPostSink;
        this._acceptedStatusCodes = httpPostSink.getAcceptedStatusCodes();
        this._maximumConcurrency = i;
        this._pendingRequests = EvictingQueue.create(i2);
        if (Objects.equals(Duration.ZERO, duration)) {
            this._spreadingDelayMillis = 0;
        } else {
            this._spreadingDelayMillis = new Random().nextInt((int) duration.toMillis());
        }
        LogBuilder addData = LOGGER.info().setMessage("Http post sink actor spread period").addData("spreadPeriodMillis", Integer.valueOf(this._spreadingDelayMillis));
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
        addData.log();
        this._periodicMetrics = periodicMetrics;
        this._evictedRequestsName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/evicted_requests";
        this._requestLatencyName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/request_latency";
        this._inQueueLatencyName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/queue_time";
        this._requestSuccessName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/success";
        this._responseStatusName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/status";
        this._samplesDroppedName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/samples_dropped";
        this._samplesSentName = "sinks/http_post/" + httpPostSink.getMetricSafeName() + "/samples_sent";
    }

    @LogValue
    public Object toLogValue() {
        return LogValueMapFactory.builder(this).put("sink", this._sink).put("acceptedStatusCodes", this._acceptedStatusCodes).put("maximumConcurrency", Integer.valueOf(this._maximumConcurrency)).put("spreadingDelayMillis", Integer.valueOf(this._spreadingDelayMillis)).put("waiting", Boolean.valueOf(this._waiting)).put("inflightRequestsCount", Integer.valueOf(this._inflightRequestsCount)).put("pendingRequestsCount", Integer.valueOf(this._pendingRequests.size())).put("periodicMetrics", this._periodicMetrics).build();
    }

    public String toString() {
        return toLogValue().toString();
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(EmitAggregation.class, this::processEmitAggregation).match(PostSuccess.class, this::processSuccessRequest).match(PostRejected.class, this::processRejectedRequest).match(PostFailure.class, this::processFailedRequest).match(WaitTimeExpired.class, this::waitTimeExpired).build();
    }

    private void waitTimeExpired(WaitTimeExpired waitTimeExpired) {
        LogBuilder addContext = LOGGER.debug().setMessage("Received WaitTimeExpired message").addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addContext));
        addContext.log();
        this._waiting = false;
        dispatchPending();
    }

    private void processFailedRequest(PostFailure postFailure) {
        this._inflightRequestsCount--;
        LogBuilder throwable = POST_ERROR_LOGGER.error().setMessage("Post error").addData("sink", this._sink).addContext("actor", self()).setThrowable(postFailure.getCause());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, throwable));
        throwable.log();
        dispatchPending();
    }

    private void processSuccessRequest(PostSuccess postSuccess) {
        this._postRequests++;
        this._inflightRequestsCount--;
        LogBuilder addContext = LOGGER.debug().setMessage("Post accepted").addData("sink", this._sink).addData("status", Integer.valueOf(postSuccess.getResponse().getStatusCode())).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, addContext));
        addContext.log();
        dispatchPending();
    }

    private void processRejectedRequest(PostRejected postRejected) {
        this._postRequests++;
        this._inflightRequestsCount--;
        Response response = postRejected.getResponse();
        LogBuilder addContext = LOGGER.warn().setMessage("Post rejected").addData("sink", this._sink).addData("status", Integer.valueOf(response.getStatusCode())).addData("request", new String(postRejected.getRequest().getByteData(), Charsets.UTF_8)).addData("response", Optional.ofNullable(response.getResponseBody())).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addContext));
        addContext.log();
        dispatchPending();
    }

    private void processEmitAggregation(EmitAggregation emitAggregation) {
        PeriodicData data = emitAggregation.getData();
        LogBuilder addContext = LOGGER.debug().setMessage("Writing aggregated data").addData("sink", this._sink).addData("dataSize", Integer.valueOf(data.getData().size())).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_5, this, addContext));
        addContext.log();
        if (data.getData().isEmpty()) {
            return;
        }
        Collection<RequestEntry.Builder> createRequests = this._sink.createRequests(this._client, data);
        boolean isEmpty = this._pendingRequests.isEmpty();
        int max = Math.max(0, createRequests.size() - this._pendingRequests.remainingCapacity());
        Iterator<RequestEntry.Builder> it = createRequests.iterator();
        while (it.hasNext()) {
            this._pendingRequests.offer((RequestEntry) it.next().setEnterTime(Instant.now()).build());
        }
        if (max > 0) {
            this._periodicMetrics.recordCounter(this._evictedRequestsName, max);
            LogBuilder addContext2 = EVICTED_LOGGER.warn().setMessage("Evicted data from HTTP sink queue").addData("sink", this._sink).addData("count", Integer.valueOf(max)).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_6, this, addContext2));
            addContext2.log();
        }
        if (this._spreadingDelayMillis <= 0) {
            dispatchPending();
            return;
        }
        if (this._waiting || !isEmpty) {
            if (this._waiting) {
                return;
            }
            dispatchPending();
        } else {
            this._waiting = true;
            LogBuilder addContext3 = LOGGER.debug().setMessage("Scheduling http requests for later transmission").addData("delayMs", Integer.valueOf(this._spreadingDelayMillis)).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_7, this, addContext3));
            addContext3.log();
            context().system().scheduler().scheduleOnce(FiniteDuration.apply(this._spreadingDelayMillis, TimeUnit.MILLISECONDS), self(), WaitTimeExpired.getInstance(), context().dispatcher(), self());
        }
    }

    private void dispatchPending() {
        LogBuilder addContext = LOGGER.debug().setMessage("Dispatching requests").addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_8, this, addContext));
        addContext.log();
        while (this._inflightRequestsCount < this._maximumConcurrency && !this._pendingRequests.isEmpty()) {
            fireNextRequest();
        }
    }

    private void fireNextRequest() {
        RequestEntry requestEntry = (RequestEntry) this._pendingRequests.poll();
        this._periodicMetrics.recordTimer(this._inQueueLatencyName, Duration.between(requestEntry.getEnterTime(), Instant.now()).toMillis(), Optional.of(TimeUnit.MILLISECONDS));
        Request request = requestEntry.getRequest();
        this._inflightRequestsCount++;
        CompletableFuture completableFuture = new CompletableFuture();
        long currentTimeMillis = System.currentTimeMillis();
        this._client.executeRequest(request, new ResponseAsyncCompletionHandler(completableFuture));
        PatternsCS.pipe(completableFuture.handle((response, th) -> {
            Object postFailure;
            this._periodicMetrics.recordTimer(this._requestLatencyName, System.currentTimeMillis() - currentTimeMillis, Optional.of(TimeUnit.MILLISECONDS));
            if (th == null) {
                int statusCode = response.getStatusCode();
                int i = statusCode / 100;
                Iterator it = STATUS_CLASSES.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    this._periodicMetrics.recordCounter(String.format("%s/%dxx", this._responseStatusName, Integer.valueOf(intValue)), i == intValue ? 1 : 0);
                }
                if (this._acceptedStatusCodes.contains(Integer.valueOf(statusCode))) {
                    postFailure = new PostSuccess(response, null);
                    this._periodicMetrics.recordCounter(this._samplesSentName, requestEntry.getPopulationSize());
                } else {
                    postFailure = new PostRejected(request, response, null);
                    this._periodicMetrics.recordCounter(this._samplesDroppedName, requestEntry.getPopulationSize());
                }
            } else {
                postFailure = new PostFailure(request, th, null);
                this._periodicMetrics.recordCounter(this._samplesDroppedName, requestEntry.getPopulationSize());
            }
            this._periodicMetrics.recordCounter(this._requestSuccessName, postFailure instanceof PostSuccess ? 1 : 0);
            return postFailure;
        }), context().dispatcher()).to(self());
    }

    public void postStop() throws Exception {
        super.postStop();
        LogBuilder addData = LOGGER.info().setMessage("Shutdown sink actor").addData("sink", this._sink).addData("recordsWritten", Long.valueOf(this._postRequests));
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_9, this, addData));
        addData.log();
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("HttpPostSinkActor.java", HttpPostSinkActor.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 106);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 156);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 168);
        ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 181);
        ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 199);
        ajc$tjp_5 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 211);
        ajc$tjp_6 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 231);
        ajc$tjp_7 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 242);
        ajc$tjp_8 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 268);
        ajc$tjp_9 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 324);
    }
}
