package com.arpnetworking.tsdcore.sinks;

import com.arpnetworking.logback.annotations.LogValue;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.LogValueMapFactory;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.arpnetworking.tsdcore.sinks.BaseSink;
import com.google.common.collect.EvictingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import net.sf.oval.constraint.Min;
import net.sf.oval.constraint.NotEmpty;
import net.sf.oval.constraint.NotNull;
import net.sf.oval.constraint.Range;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Context;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.VertxFactory;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.impl.DefaultContext;
import org.vertx.java.core.impl.DefaultVertx;
import org.vertx.java.core.net.NetClient;
import org.vertx.java.core.net.NetSocket;

/* loaded from: input_file:com/arpnetworking/tsdcore/sinks/VertxSink.class */
public abstract class VertxSink extends BaseSink {
    private final String _serverAddress;
    private final int _serverPort;
    private final Vertx _vertx;
    private final NetClient _client;
    private final Context _context;
    private final AtomicReference<NetSocket> _socket;
    private final EvictingQueue<Buffer> _pendingData;
    private final AtomicBoolean _connecting;
    private DateTime _lastNotConnectedNotify;
    private volatile long _lastConnectionAttempt;
    private volatile int _connectionAttempt;
    private final int _exponentialBackoffBase;
    private int _currentReconnectWait;
    private static final Logger LOGGER;
    private static final long MAX_FLUSH_BYTES = 22;
    private static final int NO_DATA_CONSUME_LOOP_INTERVAL = 100;
    private static final JoinPoint.StaticPart ajc$tjp_0 = null;
    private static final JoinPoint.StaticPart ajc$tjp_1 = null;
    private static final JoinPoint.StaticPart ajc$tjp_2 = null;
    private static final JoinPoint.StaticPart ajc$tjp_3 = null;
    private static final JoinPoint.StaticPart ajc$tjp_4 = null;
    private static final JoinPoint.StaticPart ajc$tjp_5 = null;
    private static final JoinPoint.StaticPart ajc$tjp_6 = null;
    private static final JoinPoint.StaticPart ajc$tjp_7 = null;
    private static final JoinPoint.StaticPart ajc$tjp_8 = null;
    private static final JoinPoint.StaticPart ajc$tjp_9 = null;
    private static final JoinPoint.StaticPart ajc$tjp_10 = null;
    private static final JoinPoint.StaticPart ajc$tjp_11 = null;
    private static final JoinPoint.StaticPart ajc$tjp_12 = null;
    private static final JoinPoint.StaticPart ajc$tjp_13 = null;

    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/VertxSink$Builder.class */
    public static abstract class Builder<B extends BaseSink.Builder<B, S>, S extends Sink> extends BaseSink.Builder<B, S> {

        @NotNull
        @NotEmpty
        private String _serverAddress;

        @NotNull
        @Range(min = 1.0d, max = 65535.0d)
        private Integer _serverPort;

        @NotNull
        @Min(0.0d)
        private Integer _maxQueueSize;

        @NotNull
        private Integer _exponentialBackoffBase;

        public B setServerAddress(String str) {
            this._serverAddress = str;
            return self();
        }

        public B setServerPort(Integer num) {
            this._serverPort = num;
            return self();
        }

        public B setMaxQueueSize(Integer num) {
            this._maxQueueSize = num;
            return self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Builder(Function<B, S> function) {
            super(function);
            this._maxQueueSize = 10000;
            this._exponentialBackoffBase = 500;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arpnetworking/tsdcore/sinks/VertxSink$ConnectionHandler.class */
    public class ConnectionHandler implements AsyncResultHandler<NetSocket> {
        private static final JoinPoint.StaticPart ajc$tjp_0 = null;
        private static final JoinPoint.StaticPart ajc$tjp_1 = null;
        private static final JoinPoint.StaticPart ajc$tjp_2 = null;

        private ConnectionHandler() {
        }

        public void handle(AsyncResult<NetSocket> asyncResult) {
            if (asyncResult.succeeded()) {
                LogBuilder addData = VertxSink.LOGGER.info().setMessage("Connected to server").addData("sink", VertxSink.this.getName()).addData("address", VertxSink.this._serverAddress).addData("port", Integer.valueOf(VertxSink.this._serverPort)).addData("attempt", Integer.valueOf(VertxSink.this._connectionAttempt));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
                addData.log();
                NetSocket netSocket = (NetSocket) asyncResult.result();
                netSocket.exceptionHandler(VertxSink.this.createSocketExceptionHandler());
                netSocket.endHandler(VertxSink.this.createSocketCloseHandler(netSocket));
                VertxSink.this._connectionAttempt = 1;
                VertxSink.this.onConnect(netSocket);
                VertxSink.this._connecting.set(false);
                VertxSink.this._socket.set(netSocket);
                return;
            }
            if (asyncResult.failed()) {
                LogBuilder throwable = VertxSink.LOGGER.warn().setMessage("Error connecting to server").addData("sink", VertxSink.this.getName()).addData("address", VertxSink.this._serverAddress).addData("port", Integer.valueOf(VertxSink.this._serverPort)).setThrowable(asyncResult.cause());
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, throwable));
                throwable.log();
                VertxSink.this._connectionAttempt++;
                VertxSink.this._currentReconnectWait = (((int) (Math.random() * Math.pow(1.3d, Math.min(VertxSink.this._connectionAttempt, 20)))) + 1) * VertxSink.this._exponentialBackoffBase;
                LogBuilder addData2 = VertxSink.LOGGER.info().setMessage("Waiting").addData("sink", VertxSink.this.getName()).addData("currentReconnectWait", Integer.valueOf(VertxSink.this._currentReconnectWait));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData2));
                addData2.log();
                VertxSink.this.getVertx().setTimer(VertxSink.this._currentReconnectWait, l -> {
                    VertxSink.this.connectToServer();
                });
                NetSocket netSocket2 = (NetSocket) asyncResult.result();
                if (netSocket2 != null) {
                    netSocket2.close();
                }
                VertxSink.this._connecting.set(false);
                VertxSink.this._socket.set(null);
            }
        }

        /* synthetic */ ConnectionHandler(VertxSink vertxSink, ConnectionHandler connectionHandler) {
            this();
        }

        static {
            ajc$preClinit();
        }

        private static void ajc$preClinit() {
            Factory factory = new Factory("VertxSink.java", ConnectionHandler.class);
            ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 400);
            ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 417);
            ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 429);
        }
    }

    static {
        ajc$preClinit();
        LOGGER = LoggerFactory.getLogger(VertxSink.class);
    }

    @Override // com.arpnetworking.tsdcore.sinks.Sink
    public void close() {
        dispatch(r4 -> {
            NetSocket andSet = this._socket.getAndSet(null);
            if (andSet != null) {
                andSet.close();
            }
        });
    }

    @Override // com.arpnetworking.tsdcore.sinks.BaseSink
    @LogValue
    public Object toLogValue() {
        return LogValueMapFactory.builder(this).put("super", super.toLogValue()).put("serverAddress", this._serverAddress).put("serverPort", Integer.valueOf(this._serverPort)).put("connecting", this._connecting).put("pendingDataSize", Integer.valueOf(this._pendingData.size())).build();
    }

    protected abstract void onConnect(NetSocket netSocket);

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueData(Buffer buffer) {
        dispatch(r6 -> {
            if (this._pendingData.remainingCapacity() == 0) {
                LogBuilder addData = LOGGER.warn().setMessage("Dropping data due to queue full").addData("sink", getName());
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_9, this, addData));
                addData.log();
            }
            this._pendingData.add(buffer);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendRawData(Buffer buffer) {
        dispatch(r6 -> {
            NetSocket netSocket = this._socket.get();
            try {
                if (netSocket != null) {
                    netSocket.write(buffer);
                } else {
                    LogBuilder addData = LOGGER.warn().setMessage("Could not write data to socket, socket is not connected").addData("sink", getName());
                    LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_10, this, addData));
                    addData.log();
                }
            } catch (Exception e) {
                if (netSocket != null) {
                    netSocket.close();
                }
                LogBuilder throwable = LOGGER.error().setMessage("Error writing data to socket").addData("sink", getName()).setThrowable(e);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_11, this, throwable));
                throwable.log();
                throw e;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Vertx getVertx() {
        return this._vertx;
    }

    private void dispatch(Handler<Void> handler) {
        if (this._context != null) {
            this._context.runOnContext(handler);
        } else {
            this._vertx.runOnContext(handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connectToServer() {
        if (this._socket.get() != null) {
            return;
        }
        if (this._connecting.getAndSet(true)) {
            LogBuilder addData = LOGGER.debug().setMessage("Already connecting, not attempting another connection at this time").addData("sink", getName());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
            addData.log();
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this._lastConnectionAttempt < this._currentReconnectWait) {
            LogBuilder addData2 = LOGGER.debug().setMessage("Not attempting connection").addData("sink", getName());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addData2));
            addData2.log();
            this._connecting.set(false);
            return;
        }
        LogBuilder addData3 = LOGGER.info().setMessage("Connecting to server").addData("sink", getName()).addData("attempt", Integer.valueOf(this._connectionAttempt)).addData("address", this._serverAddress).addData("port", Integer.valueOf(this._serverPort));
        LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData3));
        addData3.log();
        this._lastConnectionAttempt = currentTimeMillis;
        this._client.connect(this._serverPort, this._serverAddress, new ConnectionHandler(this, null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Handler<Void> createSocketCloseHandler(NetSocket netSocket) {
        return r6 -> {
            if (netSocket != null) {
                netSocket.close();
            }
            LogBuilder addData = LOGGER.warn().setMessage("Server socket closed; forcing reconnect attempt").addData("sink", getName());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_12, this, addData));
            addData.log();
            this._socket.set(null);
            this._lastConnectionAttempt = 0L;
            connectToServer();
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Handler<Throwable> createSocketExceptionHandler() {
        return th -> {
            LogBuilder throwable = LOGGER.warn().setMessage("Server socket exception").addData("sink", getName()).setThrowable(th);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_13, this, throwable));
            throwable.log();
        };
    }

    private void consumeLoop() {
        long j = 0;
        try {
            try {
                boolean z = false;
                NetSocket netSocket = this._socket.get();
                if (!this._pendingData.isEmpty()) {
                    LogBuilder addData = LOGGER.debug().setMessage("Pending data").addData("sink", getName()).addData("size", Integer.valueOf(this._pendingData.size()));
                    LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_3, this, addData));
                    addData.log();
                }
                while (netSocket != null && !z) {
                    if (this._pendingData.size() <= 0 || j >= MAX_FLUSH_BYTES) {
                        z = true;
                    } else {
                        j += flushBuffer((Buffer) this._pendingData.poll(), netSocket);
                    }
                    netSocket = this._socket.get();
                }
                if (netSocket == null && (this._lastNotConnectedNotify == null || this._lastNotConnectedNotify.plus(Duration.standardSeconds(30L)).isBeforeNow())) {
                    LogBuilder addData2 = LOGGER.debug().setMessage("Not connected to server. Data will be flushed when reconnected. Suppressing this message for 30 seconds.").addData("sink", getName());
                    LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_4, this, addData2));
                    addData2.log();
                    this._lastNotConnectedNotify = DateTime.now();
                }
            } catch (Exception e) {
                LogBuilder throwable = LOGGER.error().setMessage("Error in consume loop").addData("sink", getName()).setThrowable(e);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_5, this, throwable));
                throwable.log();
                throw e;
            }
        } finally {
            if (j > 0) {
                dispatch(r3 -> {
                    consumeLoop();
                });
            } else {
                getVertx().setTimer(100L, l -> {
                    consumeLoop();
                });
            }
        }
    }

    private int flushBuffer(Buffer buffer, NetSocket netSocket) {
        try {
            int length = buffer.length();
            LogBuilder addData = LOGGER.debug().setMessage("Writing buffer to socket").addData("sink", getName()).addData("length", Integer.valueOf(length));
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_6, this, addData));
            addData.log();
            netSocket.write(buffer);
            return length;
        } catch (Exception e) {
            LogBuilder throwable = LOGGER.error().setMessage("Error writing AggregatedData data to socket").addData("sink", getName()).addData("buffer", buffer).setThrowable(e);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_7, this, throwable));
            throwable.log();
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public VertxSink(Builder<?, ?> builder) {
        super(builder);
        this._connecting = new AtomicBoolean(false);
        this._lastNotConnectedNotify = null;
        this._lastConnectionAttempt = 0L;
        this._connectionAttempt = 1;
        this._currentReconnectWait = 3000;
        this._serverAddress = ((Builder) builder)._serverAddress;
        this._serverPort = ((Builder) builder)._serverPort.intValue();
        this._vertx = VertxFactory.newVertx();
        if (this._vertx instanceof DefaultVertx) {
            DefaultVertx defaultVertx = this._vertx;
            DefaultContext orCreateContext = defaultVertx.getOrCreateContext();
            defaultVertx.setContext(orCreateContext);
            this._context = orCreateContext;
        } else {
            this._context = null;
            LogBuilder addData = LOGGER.warn().setMessage("Vertx instance not a DefaultVertx as expected. Threading may be incorrect.").addData("sink", getName());
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_8, this, addData));
            addData.log();
        }
        this._client = (NetClient) ((NetClient) this._vertx.createNetClient().setReconnectAttempts(0).setConnectTimeout(5000).setTCPNoDelay(true)).setTCPKeepAlive(true);
        this._socket = new AtomicReference<>();
        this._pendingData = EvictingQueue.create(((Builder) builder)._maxQueueSize.intValue());
        this._exponentialBackoffBase = ((Builder) builder)._exponentialBackoffBase.intValue();
        connectToServer();
        consumeLoop();
    }

    private static void ajc$preClinit() {
        Factory factory = new Factory("VertxSink.java", VertxSink.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 205);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 215);
        ajc$tjp_10 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 153);
        ajc$tjp_11 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 166);
        ajc$tjp_12 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 243);
        ajc$tjp_13 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 255);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 227);
        ajc$tjp_3 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 268);
        ajc$tjp_4 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 287);
        ajc$tjp_5 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 297);
        ajc$tjp_6 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 318);
        ajc$tjp_7 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 329);
        ajc$tjp_8 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 355);
        ajc$tjp_9 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 131);
    }
}
