package org.nustaq.fastcast.impl;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.nustaq.fastcast.api.FCPublisher;
import org.nustaq.fastcast.config.PhysicalTransportConf;
import org.nustaq.fastcast.config.PublisherConf;
import org.nustaq.fastcast.transport.PhysicalTransport;
import org.nustaq.fastcast.util.FCLog;
import org.nustaq.offheap.bytez.ByteSource;
import org.nustaq.offheap.bytez.Bytez;
import org.nustaq.offheap.bytez.malloc.MallocBytezAllocator;
import org.nustaq.offheap.bytez.onheap.HeapBytez;
import org.nustaq.offheap.structs.FSTStruct;
import org.nustaq.offheap.structs.FSTStructAllocator;
import org.nustaq.offheap.structs.structtypes.StructArray;
import org.nustaq.serialization.util.FSTUtil;

/* loaded from: input_file:org/nustaq/fastcast/impl/PacketSendBuffer.class */
public class PacketSendBuffer implements FCPublisher {
    public static long CHECK_PACKET_RATE_BULKSEND_THRESHOLD = 10;
    public static final boolean RETRANSDEBUG = true;
    public static final String KEEP_SUBS_NODEID = "KEEPRECEIVER";
    private static final boolean DEBUG_LAT = false;
    private static final int RETRANS_MEM = 10000;
    private static final int TAG_BUFF = 4;
    final PhysicalTransport trans;
    FSTStructAllocator offheapAllocator;
    FSTStructAllocator heapAllocator;
    String nodeId;
    int payMaxLen;
    int currentAvail;
    int topic;
    FSTStruct currentPacketBytePointer;
    StructArray<DataPacket> history;
    int historySize;
    ControlPacket dropMsg;
    DataPacket template;
    ByteBuffer tmpSend;
    Topic topicEntry;
    boolean isUnordered;
    int packetRateLimit;
    int packetRateLimitWindowDivisor;
    int pps;
    int ppsWindowNanos;
    int packetCounter;
    long lastPpsRateCheckNanos;
    Bytez heartbeat;
    long hbInvtervalMS;
    ConcurrentLinkedQueue<RetransPacket> retransRequests = new ConcurrentLinkedQueue<>();
    long currentSequence = 1;
    long nextSendMsg = 1;
    String currentReceiver = null;
    boolean batchOnLimit = true;
    long[] sentRetransSeq = new long[RETRANS_MEM];
    long[] sentRetransTimes = new long[RETRANS_MEM];
    int maxRetransAge = 0;
    int suppressedRetransCount = 0;
    ThreadLocal<byte[]> msgBytes = new ThreadLocal<>();
    AtomicBoolean sendLock = new AtomicBoolean(false);
    public volatile long lastMsgFlush = System.currentTimeMillis();
    HeapBytez hp = new HeapBytez((byte[]) null, 0, 0);

    public PacketSendBuffer(PhysicalTransport physicalTransport, String str, Topic topic) {
        this.hbInvtervalMS = 1000L;
        this.trans = physicalTransport;
        this.topic = topic.getTopicId();
        this.topicEntry = topic;
        this.nodeId = str;
        this.hbInvtervalMS = topic.getPublisherConf().getHeartbeatInterval();
        FCLog.log("init send buffer for topic " + topic.getTopicId());
        PhysicalTransportConf conf = physicalTransport.getConf();
        this.template = DataPacket.getTemplate(conf.getDgramsize());
        this.payMaxLen = this.template.data.length;
        this.template.getSender().setString(str);
        this.template.setTopic(this.topic);
        this.offheapAllocator = new FSTStructAllocator(0, new MallocBytezAllocator());
        this.heapAllocator = new FSTStructAllocator(0);
        PublisherConf publisherConf = this.topicEntry.getPublisherConf();
        int numPacketHistory = publisherConf.getNumPacketHistory();
        if (numPacketHistory * conf.getDgramsize() > Integer.MAX_VALUE - (2 * conf.getDgramsize())) {
            int dgramsize = (Integer.MAX_VALUE - (2 * conf.getDgramsize())) / conf.getDgramsize();
            publisherConf.numPacketHistory(dgramsize);
            FCLog.get().warn("int overflow, degrading history size from " + numPacketHistory + " to " + dgramsize);
            numPacketHistory = dgramsize;
        }
        this.history = this.offheapAllocator.newArray(numPacketHistory, this.template);
        FCLog.log("allocating send buffer for topic " + this.topicEntry.getTopicId() + " of " + ((this.history.getByteSize() / 1024) / 1024) + " MByte");
        this.historySize = this.history.size();
        setUnordered(this.topicEntry.isUnordered());
        initDropMsgPacket(str);
        DataPacket packetAt = getPacketAt(this.currentSequence);
        this.currentPacketBytePointer = packetAt.detach();
        packetAt.setSeqNo(this.currentSequence);
        packetAt.dataPointer(this.currentPacketBytePointer);
        this.currentAvail = this.payMaxLen - 4;
        try {
            initTmpBBuf();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (NoSuchFieldException e2) {
            e2.printStackTrace();
        }
        this.packetRateLimitWindowDivisor = publisherConf.getPpsWindow();
        setPacketRateLimit(publisherConf.getPps());
        this.heartbeat = new HeapBytez(new byte[]{1});
        flush();
    }

    private void initTmpBBuf() throws NoSuchFieldException, IllegalAccessException {
        this.tmpSend = ByteBuffer.allocateDirect(0);
        Field field = null;
        Field field2 = null;
        ArrayList arrayList = new ArrayList();
        FSTUtil.getAllFields(arrayList, this.tmpSend.getClass());
        for (int i = 0; i < arrayList.size(); i++) {
            Field field3 = (Field) arrayList.get(i);
            if (field3.getName().equals("address")) {
                field = field3;
            } else if (field3.getName().equals("capacity")) {
                field2 = field3;
            }
        }
        field.setAccessible(true);
        field2.setAccessible(true);
        field.setLong(this.tmpSend, this.history.getBase().getBaseAdress() + this.history.getOffset());
        field2.setInt(this.tmpSend, this.history.getByteSize());
    }

    private void moveBuff(DataPacket dataPacket) {
        this.tmpSend.limit((int) (dataPacket.getOffset() + dataPacket.getDGramSize()));
        this.tmpSend.position((int) dataPacket.getOffset());
    }

    protected void initDropMsgPacket(String str) {
        this.dropMsg = new ControlPacket();
        this.dropMsg.getSender().setString(str);
        this.dropMsg.setTopic(this.topic);
        this.dropMsg.setType((short) 0);
        this.dropMsg = (ControlPacket) this.heapAllocator.newStruct(this.dropMsg);
    }

    public void free() {
        this.offheapAllocator.free();
    }

    public Topic getTopicEntry() {
        return this.topicEntry;
    }

    public boolean isUnordered() {
        return this.isUnordered;
    }

    public void setUnordered(boolean z) {
        this.isUnordered = z;
    }

    private DataPacket getPacketAt(long j) {
        return (DataPacket) this.history.get(getIndexFromSequence(j));
    }

    private int getIndexFromSequence(long j) {
        return (int) (j % this.historySize);
    }

    private boolean putMessage(int i, ByteSource byteSource, long j, int i2, boolean z) {
        putMessageRecursive(i, byteSource, j, i2);
        return true;
    }

    private void putMessageRecursive(int i, ByteSource byteSource, long j, int i2) {
        while (this.currentAvail <= i2 + 4 + 2) {
            if (isUnordered()) {
                if (i2 > (this.payMaxLen - 4) - 2) {
                    throw new RuntimeException("unordered message size must not exceed packet size");
                }
                fire();
                putMessageRecursive(i, byteSource, j, i2);
                return;
            }
            int i3 = (this.currentAvail - 4) - 8;
            if (i3 <= 8) {
                fire();
            } else {
                putInternal(i, (short) 2, byteSource, j, i3);
                fire();
                i = -1;
                j += i3;
                i2 -= i3;
            }
        }
        putInternal(i, (short) 1, byteSource, j, i2);
    }

    private void putInternal(int i, short s, ByteSource byteSource, long j, int i2) {
        int i3 = 0;
        if (i >= 0) {
            i3 = 1;
        }
        this.currentPacketBytePointer.setShort(s);
        this.currentPacketBytePointer.next(2);
        this.currentPacketBytePointer.setShort((short) (i2 + i3));
        this.currentPacketBytePointer.next(2);
        if (i >= 0) {
            this.currentPacketBytePointer.setByte((byte) i);
            this.currentPacketBytePointer.next(i3);
        }
        this.currentPacketBytePointer.setBytes(byteSource, j, i2);
        this.currentPacketBytePointer.next(i2);
        this.currentAvail -= (i2 + i3) + 4;
    }

    private void fire() {
        if (isCurrentPacketEmpty()) {
            return;
        }
        this.currentPacketBytePointer.setShort((short) 3);
        long j = this.currentSequence;
        this.currentAvail -= 2;
        if (this.currentAvail < 0) {
            throw new RuntimeException("negative bytes left " + this.currentAvail);
        }
        getPacketAt(j).setBytesLeft(this.currentAvail);
        long j2 = j + 1;
        DataPacket packetAt = getPacketAt(j2);
        packetAt.dataPointer(this.currentPacketBytePointer);
        packetAt.setSeqNo(j2);
        this.currentAvail = this.payMaxLen - 4;
        packetAt.getReceiver().setString(this.currentReceiver);
        packetAt.setRetrans(false);
        this.currentSequence++;
    }

    private boolean isCurrentPacketEmpty() {
        return this.currentAvail == this.payMaxLen - 4;
    }

    private boolean sendPendingPackets() throws IOException {
        if (this.currentSequence <= this.nextSendMsg) {
            return false;
        }
        sendPackets(this.nextSendMsg, this.currentSequence, false, 0L);
        return true;
    }

    private boolean sendPendingRetrans() throws IOException {
        RetransPacket poll;
        if (this.retransRequests.peek() == null) {
            return false;
        }
        ArrayList<RetransPacket> arrayList = new ArrayList<>();
        do {
            poll = this.retransRequests.poll();
            arrayList.add(poll);
        } while (poll != null);
        mergeRetransmissions(arrayList);
        return true;
    }

    private void putRetransSent(long j, long j2) {
        int i = (int) (j % 10000);
        this.sentRetransSeq[i] = j;
        this.sentRetransTimes[i] = j2;
    }

    private long getLastRetransmitted(long j) {
        int i = (int) (j % 10000);
        if (this.sentRetransSeq[i] == j) {
            return this.sentRetransTimes[i];
        }
        return 0L;
    }

    private void mergeRetransmissions(ArrayList<RetransPacket> arrayList) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        for (int i = 0; i < arrayList.size(); i++) {
            RetransPacket retransPacket = arrayList.get(i);
            if (retransPacket != null) {
                j = sendRetransmissionResponse(j, retransPacket, currentTimeMillis);
            }
        }
    }

    private long sendRetransmissionResponse(long j, RetransPacket retransPacket, long j2) throws IOException {
        for (int i = 0; i < retransPacket.getRetransIndex(); i++) {
            RetransEntry retransEntries = retransPacket.retransEntries(i);
            if (getPacketAt(retransEntries.getFrom()).getSeqNo() != retransEntries.getFrom()) {
                long from = retransEntries.getFrom();
                if (this.currentSequence - from > this.maxRetransAge) {
                    this.maxRetransAge = (int) (this.currentSequence - from);
                    FCLog.get().warn("old retransmission from " + retransPacket.getSender() + " age " + this.maxRetransAge + " requested:" + from + " curseq " + this.currentSequence + " topic " + this.topicEntry.getTopicId());
                }
                this.dropMsg.setReceiver(retransPacket.getSender());
                this.dropMsg.setSeqNo(retransEntries.getFrom());
                FCLog.get().warn("Sending Drop " + this.dropMsg + " requestedSeq " + from + " on service " + getTopicEntry().getTopicId() + " currentSeq " + this.currentSequence + " age: " + (this.currentSequence - retransEntries.getFrom()));
                this.packetCounter++;
                this.trans.send(new DatagramPacket(this.dropMsg.getBase().toBytes((int) this.dropMsg.getOffset(), this.dropMsg.getByteSize()), 0, this.dropMsg.getByteSize()));
            } else {
                long from2 = retransEntries.getFrom();
                long to = retransEntries.getTo();
                if (from2 >= j || to > j) {
                    long max = Math.max(from2, j);
                    j = Math.max(to, j);
                    sendPackets(max, to, true, j2);
                }
            }
        }
        return j;
    }

    private void sendPackets(long j, long j2, boolean z, long j3) throws IOException {
        long j4 = j2 - j;
        long j5 = 0;
        if (!z && j4 > CHECK_PACKET_RATE_BULKSEND_THRESHOLD) {
            j5 = System.nanoTime();
        }
        long j6 = j;
        while (true) {
            long j7 = j6;
            if (j7 >= j2) {
                break;
            }
            DataPacket packetAt = getPacketAt(j7);
            if (z) {
                putRetransSent(j7, j3);
            } else if (packetAt.getSeqNo() != j7) {
                FCLog.get().fatal("FATAL error, current seq:" + this.currentSequence + " expected Seq:[" + j7 + "] real read:" + packetAt.getSeqNo());
                FCLog.get().fatal("current put seq " + this.currentSequence);
                FCLog.get().fatal("current send seq " + this.nextSendMsg);
                FCLog.get().fatal("current pointer and currentpackpointer " + packetAt.___offset + " " + this.currentPacketBytePointer.___offset);
                FCLog.get().fatal(null, new Exception("stack trace"));
                for (int i = 0; i < 20; i++) {
                    FCLog.get().fatal("  =>" + getPacketAt(j7 + i).getSeqNo());
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    FCLog.log(e);
                }
                System.exit(2);
            }
            packetAt.setRetrans(z);
            moveBuff(packetAt);
            if (!z) {
                this.packetCounter++;
            }
            this.trans.send(this.tmpSend);
            if (j4 > CHECK_PACKET_RATE_BULKSEND_THRESHOLD && !z) {
                long j8 = 2 * this.pps;
                long nanoTime = System.nanoTime() - j5;
                int i2 = this.ppsWindowNanos;
                while (j7 - j > j8 * (nanoTime / i2)) {
                    sendPendingRetrans();
                    Thread.yield();
                    j8 = 2 * this.pps;
                    nanoTime = System.nanoTime() - j5;
                    i2 = this.ppsWindowNanos;
                }
            }
            j6 = j7 + 1;
        }
        if (z) {
            return;
        }
        this.nextSendMsg = j2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addRetransmissionRequest(RetransPacket retransPacket, PhysicalTransport physicalTransport) throws IOException {
        RetransPacket retransPacket2 = (RetransPacket) retransPacket.createCopy();
        System.out.println("received retrans request and add to Q " + retransPacket2);
        this.retransRequests.add(retransPacket2);
        flush();
    }

    private void lock() {
        do {
        } while (!this.sendLock.compareAndSet(false, true));
    }

    private void unlock() {
        this.sendLock.set(false);
    }

    protected boolean offerNoLock(String str, ByteSource byteSource, long j, int i, boolean z) {
        long nanoTime = System.nanoTime();
        if (str != KEEP_SUBS_NODEID) {
            setReceiver(str);
        }
        if (nanoTime - this.lastPpsRateCheckNanos > this.ppsWindowNanos) {
            this.packetCounter = Math.max(0, this.packetCounter - this.pps);
            this.lastPpsRateCheckNanos += this.ppsWindowNanos;
        }
        if (byteSource != null) {
            if (this.packetCounter > this.pps * 2) {
                return false;
            }
            if (this.packetCounter > this.pps) {
                if (!this.batchOnLimit) {
                    return false;
                }
                z = false;
            }
        }
        boolean putMessage = byteSource == null ? true : putMessage(-1, byteSource, j, i, true);
        if (nanoTime - this.lastMsgFlush > this.hbInvtervalMS * 1000 * 1000) {
            long j2 = this.lastMsgFlush;
            this.lastMsgFlush = nanoTime;
            if (!offerNoLock(null, this.heartbeat, 0L, 1, false)) {
                this.lastMsgFlush = j2;
            }
            return putMessage;
        }
        if (z && !isCurrentPacketEmpty()) {
            this.lastMsgFlush = nanoTime;
            fire();
        }
        try {
            if (!sendPendingRetrans() && sendPendingPackets()) {
                this.lastMsgFlush = nanoTime;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return putMessage;
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public boolean offer(String str, byte[] bArr, int i, int i2, boolean z) {
        this.hp.setBase(bArr, i, i2);
        return offer(str, this.hp, z);
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public boolean offer(String str, ByteSource byteSource, long j, int i, boolean z) {
        try {
            lock();
            boolean offerNoLock = offerNoLock(str, byteSource, j, i, z);
            unlock();
            return offerNoLock;
        } catch (Throwable th) {
            unlock();
            throw th;
        }
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public boolean offer(String str, ByteSource byteSource, boolean z) {
        try {
            lock();
            boolean offerNoLock = offerNoLock(str, byteSource, 0L, (int) byteSource.length(), z);
            unlock();
            return offerNoLock;
        } catch (Throwable th) {
            unlock();
            throw th;
        }
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public int getTopicId() {
        return this.topicEntry.getTopicId();
    }

    private void setReceiver(String str) {
        if (str == null) {
            if (this.currentReceiver != null) {
                this.currentReceiver = null;
                updateCurrentReceiver();
                return;
            }
            return;
        }
        if (str.equals(this.currentReceiver)) {
            return;
        }
        this.currentReceiver = str;
        updateCurrentReceiver();
    }

    private void updateCurrentReceiver() {
        if (isCurrentPacketEmpty()) {
            getPacketAt(this.currentSequence).getReceiver().setString(this.currentReceiver);
        } else {
            fire();
            updateCurrentReceiver();
        }
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public void setPacketRateLimit(int i) {
        this.packetRateLimit = i;
        this.pps = this.packetRateLimit / this.packetRateLimitWindowDivisor;
        this.ppsWindowNanos = 1000000000 / this.packetRateLimitWindowDivisor;
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public int getPacketRateLimit() {
        return this.packetRateLimit;
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public FCPublisher batchOnLimit(boolean z) {
        this.batchOnLimit = z;
        return this;
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public boolean isBatchOnLimit(boolean z) {
        return z;
    }

    @Override // org.nustaq.fastcast.api.FCPublisher
    public void flush() {
        offer(KEEP_SUBS_NODEID, (ByteSource) null, 0L, 0, true);
    }
}
