package com.arpnetworking.tsdcore.sinks;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.io.Tcp;
import akka.io.TcpMessage;
import akka.util.ByteString;
import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.tsdcore.model.PeriodicData;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import org.joda.time.DateTime;
import org.joda.time.Period;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:com/arpnetworking/tsdcore/sinks/TcpSinkActor.class */
public class TcpSinkActor extends AbstractActor {
    private final TcpSink _sink;
    private final String _serverAddress;
    private final int _serverPort;
    private final int _maximumQueueSize;
    private final Duration _exponentialBackoffBase;
    private static final Logger LOGGER;
    private static final Logger EVICTED_LOGGER;
    private static final double EXPONENTIAL_BACKOFF_MULTIPLIER = 1.3d;
    private static final int EXPONENTIAL_BACKOFF_MAX_EXPONENT = 20;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;
    private long _recordsWritten = 0;
    private boolean _waitingForAck = false;
    private ActorRef _client = null;
    private int _connectionAttempt = 1;
    private final LinkedList<ByteString> _pendingRequests = new LinkedList<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/TcpSinkActor$Ack.class */
    public static final class Ack implements Tcp.Event {
        private final ByteString _data;

        private Ack(ByteString byteString) {
            this._data = byteString;
        }

        /* synthetic */ Ack(ByteString byteString, Ack ack) {
            this(byteString);
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/TcpSinkActor$Connect.class */
    private static final class Connect {
        private Connect() {
        }

        /* synthetic */ Connect(Connect connect) {
            this();
        }
    }

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/TcpSinkActor$EmitAggregation.class */
    public static final class EmitAggregation {
        private final PeriodicData _data;

        public EmitAggregation(PeriodicData periodicData) {
            this._data = periodicData;
        }

        public PeriodicData getData() {
            return this._data;
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(TcpSinkActor.class);
        EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(TcpSinkActor.class, Duration.ofSeconds(30L));
    }

    public static Props props(TcpSink tcpSink, String str, int i, int i2, Period period) {
        return Props.create(TcpSinkActor.class, new Object[]{tcpSink, str, Integer.valueOf(i), Integer.valueOf(i2), period});
    }

    public TcpSinkActor(TcpSink tcpSink, String str, int i, int i2, Period period) {
        this._sink = tcpSink;
        this._serverAddress = str;
        this._serverPort = i;
        this._maximumQueueSize = i2;
        this._exponentialBackoffBase = Duration.ofMillis(period.toDurationFrom(DateTime.now()).getMillis());
        connect();
    }

    @LogValue
    public Object toLogValue() {
        return LogValueMapFactory.builder(this).put("sink", this._sink).put("serverAddress", this._serverAddress).put("serverPort", Integer.valueOf(this._serverPort)).put("exponentialBackoffBase", this._exponentialBackoffBase).build();
    }

    public String toString() {
        return toLogValue().toString();
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(EmitAggregation.class, this::processEmitAggregation).match(Ack.class, ack -> {
            this._waitingForAck = false;
            dispatchPending();
        }).match(Tcp.Connected.class, connected -> {
            this._client = sender();
            this._client.tell(TcpMessage.register(self(), true, true), self());
            this._connectionAttempt = 1;
        }).match(Tcp.CommandFailed.class, commandFailed -> {
            if (!(commandFailed.cmd() instanceof Tcp.Connect)) {
                if (commandFailed.cmd() instanceof Tcp.Write) {
                    this._pendingRequests.offerFirst(((Ack) commandFailed.cmd().ack())._data);
                    if (sender().equals(this._client)) {
                        this._client.tell(TcpMessage.resumeWriting(), self());
                        return;
                    }
                    return;
                }
                return;
            }
            LogBuilder addData = LOGGER.warn().setMessage("Failed to connect").addData("serverAddress", this._serverAddress).addData("serverPort", Integer.valueOf(this._serverPort));
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, addData));
            addData.log();
            long random = (((int) (Math.random() * Math.pow(EXPONENTIAL_BACKOFF_MULTIPLIER, Math.min(this._connectionAttempt, EXPONENTIAL_BACKOFF_MAX_EXPONENT)))) + 1) * this._exponentialBackoffBase.toMillis();
            this._connectionAttempt++;
            LogBuilder addData2 = LOGGER.info().setMessage("Waiting to reconnect").addData("serverAddress", this._serverAddress).addData("serverPort", Integer.valueOf(this._serverPort)).addData("currentReconnectWait", Long.valueOf(random));
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addData2));
            addData2.log();
            context().system().scheduler().scheduleOnce(FiniteDuration.apply(random, TimeUnit.MILLISECONDS), self(), new Connect(null), context().dispatcher(), self());
        }).match(Tcp.WritingResumed.class, writingResumed -> {
            this._waitingForAck = false;
            dispatchPending();
        }).match(Tcp.ConnectionClosed.class, connectionClosed -> {
            this._client = null;
            connect();
        }).match(Connect.class, connect -> {
            connect();
        }).build();
    }

    public void postStop() throws Exception {
        super.postStop();
        LogBuilder addData = LOGGER.info().setMessage("Shutdown sink actor").addData("sink", this._sink).addData("recordsWritten", Long.valueOf(this._recordsWritten));
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
        addData.log();
    }

    private void connect() {
        Tcp.get(context().system()).manager().tell(TcpMessage.connect(new InetSocketAddress(this._serverAddress, this._serverPort)), self());
    }

    private void processEmitAggregation(EmitAggregation emitAggregation) {
        PeriodicData data = emitAggregation.getData();
        LogBuilder addContext = LOGGER.debug().setMessage("Writing aggregated data").addData("sink", this._sink).addData("dataSize", Integer.valueOf(data.getData().size())).addData("conditionsSize", Integer.valueOf(data.getConditions().size())).addContext("actor", self());
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addContext));
        addContext.log();
        if (data.getData().isEmpty() && data.getConditions().isEmpty()) {
            return;
        }
        ByteString serializeData = this._sink.serializeData(data);
        if (this._pendingRequests.size() >= this._maximumQueueSize) {
            LogBuilder addContext2 = EVICTED_LOGGER.warn().setMessage("Evicted data from HTTP sink queue").addData("sink", this._sink).addData("count", 1).addContext("actor", self());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addContext2));
            addContext2.log();
            this._pendingRequests.poll();
        }
        this._pendingRequests.offer(serializeData);
        dispatchPending();
    }

    private void dispatchPending() {
        if (this._waitingForAck || this._pendingRequests.isEmpty() || this._client == null) {
            return;
        }
        int i = 0;
        ByteString empty = ByteString.empty();
        while (i < 10 && !this._pendingRequests.isEmpty()) {
            empty = empty.concat(this._pendingRequests.poll());
            i++;
            this._recordsWritten++;
        }
        this._client.tell(TcpMessage.write(empty, new Ack(empty, null)), self());
        this._waitingForAck = true;
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("TcpSinkActor.java", TcpSinkActor.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 188);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 205);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 219);
        ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 131);
        ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 144);
    }
}
