package de.ruedigermoeller.fastcast.packeting;

import de.ruedigermoeller.fastcast.control.FlowControl;
import de.ruedigermoeller.fastcast.transport.Transport;
import de.ruedigermoeller.fastcast.util.FCLog;
import de.ruedigermoeller.fastcast.util.RateMeasure;
import de.ruedigermoeller.fastcast.util.Sleeper;
import de.ruedigermoeller.heapoff.structs.FSTStruct;
import de.ruedigermoeller.heapoff.structs.FSTTypedStructAllocator;
import de.ruedigermoeller.heapoff.structs.structtypes.StructArray;
import java.io.IOException;
import java.net.DatagramPacket;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:de/ruedigermoeller/fastcast/packeting/PacketSendBuffer.class */
public class PacketSendBuffer {
    public static final int MAX_BULK_SEND_DATA = 2;
    public static final boolean RETRANSDEBUG = false;
    private static final int RETRANS_MEM = 10000;
    private static final int TAG_BUFF = 4;
    public static final int RETRANS_PACKET_PAUSE_THRESHOLD = 5;
    private static final boolean DEBUG_LAT = false;
    FSTTypedStructAllocator<DataPacket> packetAllocator;
    StructArray<DataPacket> history;
    String nodeId;
    int payMaxLen;
    volatile int currentAvail;
    int topic;
    FSTStruct currentPacketBytePointer;
    ControlPacket dropMsg;
    int sendPauseMicros;
    boolean isUnordered;
    TopicEntry topicEntry;
    boolean useSpinLock;
    TopicStats stats;
    FlowControl control;
    OffHeapHistory offheapHistory;
    int historySize;
    int maxDGramRate;
    byte[] offHeapRetransByte;
    DataPacket offHeapReadDataPacket;
    ArrayList<RetransPacket> retransRequests = new ArrayList<>();
    AtomicLong currentSequence = new AtomicLong(1);
    AtomicLong nextSendMsg = new AtomicLong(1);
    Sleeper sleeperSendMsg = new Sleeper();
    private Lock putLock = new ReentrantLock();
    int maxSendPacketsQueued = 8;
    boolean optForLatency = false;
    private Object sendWakeupLock = new Object[1];
    long MaxRetransRepeatIntervalMS = 2;
    RateMeasure rate = new RateMeasure("sendDgramRate", 100) { // from class: de.ruedigermoeller.fastcast.packeting.PacketSendBuffer.1
        @Override // de.ruedigermoeller.fastcast.util.RateMeasure
        protected void statsUpdated(long j) {
            if (PacketSendBuffer.this.maxDGramRate <= 0 || !PacketSendBuffer.this.hasSendPressure()) {
                return;
            }
            float f = ((float) j) / PacketSendBuffer.this.maxDGramRate;
            PacketSendBuffer.this.sendPauseMicros = (int) (((PacketSendBuffer.this.sendPauseMicros * f) + (1 * PacketSendBuffer.this.sendPauseMicros)) / 2.0f);
            if (PacketSendBuffer.this.sendPauseMicros < 1) {
                PacketSendBuffer.this.sendPauseMicros = 1;
            }
            if (PacketSendBuffer.this.sendPauseMicros >= 10 || f <= 1.2d) {
                return;
            }
            PacketSendBuffer.this.sendPauseMicros++;
        }
    };
    long[] sentRetransSeq = new long[RETRANS_MEM];
    long[] sentRetransTimes = new long[RETRANS_MEM];
    int maxRetransAge = 0;
    long debugSeq = 0;
    int suppressedRetransCount = 0;

    public PacketSendBuffer(int i, String str, String str2, TopicEntry topicEntry) {
        this.sendPauseMicros = 100;
        this.topic = topicEntry.getTopic();
        this.topicEntry = topicEntry;
        this.nodeId = str2;
        setOptForLatency(topicEntry.getConf().isOptForLatency());
        this.useSpinLock = topicEntry.getConf().useSpinlockInSendQueue();
        if (topicEntry.getConf().getNumPacketOffHeapHistory() > 0) {
            this.offheapHistory = OffHeapHistory.createDirectHistory(i * topicEntry.getConf().getNumPacketOffHeapHistory(), i);
            this.offHeapRetransByte = new byte[i];
        }
        DataPacket template = DataPacket.getTemplate(i);
        this.payMaxLen = template.data.length;
        template.getCluster().setString(str);
        template.getSender().setString(str2);
        template.setTopic(this.topic);
        this.packetAllocator = new FSTTypedStructAllocator<>(template, 10);
        this.history = this.packetAllocator.newArray(this.topicEntry.getConf().getNumPacketHistory(), template);
        this.historySize = this.history.size();
        this.dropMsg = new ControlPacket();
        this.dropMsg.getCluster().setString(str);
        this.dropMsg.getSender().setString(str2);
        this.dropMsg.setTopic(this.topic);
        this.dropMsg.setType(ControlPacket.DROPPED);
        this.dropMsg = (ControlPacket) this.packetAllocator.newStruct(this.dropMsg);
        DataPacket dataPacket = getVolatile(this.currentSequence.get());
        this.currentPacketBytePointer = dataPacket.detach();
        dataPacket.setSeqNo(this.currentSequence.get());
        dataPacket.dataPointer(this.currentPacketBytePointer);
        this.currentAvail = this.payMaxLen - 4;
        setMaxSendPacketsQueued(this.topicEntry.getConf().getMaxSendPacketQueueSize());
        setUnordered(this.topicEntry.isUnordered());
        this.sendPauseMicros = this.topicEntry.getConf().getSendPauseMicros();
        this.stats = this.topicEntry.getStats();
        this.control = this.topicEntry.getControl();
        this.maxDGramRate = this.topicEntry.getConf().getDGramRate();
    }

    public TopicEntry getTopicEntry() {
        return this.topicEntry;
    }

    public TopicStats getStats() {
        return this.stats;
    }

    public boolean isUnordered() {
        return this.isUnordered;
    }

    public void setUnordered(boolean z) {
        this.isUnordered = z;
    }

    public int getMaxSendPacketsQueued() {
        return this.maxSendPacketsQueued;
    }

    public void setMaxSendPacketsQueued(int i) {
        if (i == 0) {
            this.maxSendPacketsQueued = 0;
        } else {
            this.maxSendPacketsQueued = Math.max(i, 8);
        }
    }

    DataPacket getVolatile(long j) {
        return (DataPacket) this.history.get(getIndexFromSequence(j));
    }

    private int getIndexFromSequence(long j) {
        return (int) (j % this.historySize);
    }

    public boolean putMessage(int i, byte[] bArr, int i2, int i3, boolean z) {
        return this.maxSendPacketsQueued == 0 ? putMessageST(i, bArr, i2, i3, z) : putMessageMT(i, bArr, i2, i3, z);
    }

    public boolean putMessageST(int i, byte[] bArr, int i2, int i3, boolean z) {
        putMessageRecursive(i, bArr, i2, i3);
        fire();
        try {
            sendPackets(this.topicEntry.getTrans(), this.nextSendMsg.get(), this.currentSequence.get(), false, 0L);
            return true;
        } catch (IOException e) {
            FCLog.log(e);
            return true;
        }
    }

    public boolean putMessageMT(int i, byte[] bArr, int i2, int i3, boolean z) {
        this.putLock.lock();
        try {
            if (waitForSenderMT(i3, z)) {
                return false;
            }
            putMessageRecursive(i, bArr, i2, i3);
            this.putLock.unlock();
            return true;
        } finally {
            this.putLock.unlock();
        }
    }

    boolean hasSendPressure() {
        return this.currentSequence.get() - this.nextSendMsg.get() > 2;
    }

    private boolean waitForSenderMT(int i, boolean z) {
        if (this.maxSendPacketsQueued == 0) {
            return false;
        }
        long j = this.currentSequence.get() + 1;
        long j2 = this.nextSendMsg.get();
        int i2 = (i / this.payMaxLen) + 1;
        while (true) {
            if (j - j2 <= this.maxSendPacketsQueued + i2 && this.retransRequests.size() <= 0) {
                return false;
            }
            if (z) {
                return true;
            }
            this.putLock.unlock();
            Thread.yield();
            this.putLock.lock();
            j = this.currentSequence.get() + 1;
            j2 = this.nextSendMsg.get();
        }
    }

    private void putMessageRecursive(int i, byte[] bArr, int i2, int i3) {
        int i4 = 0;
        while (true) {
            this.stats.msgSent();
            if (this.currentAvail > i3 + 4 + 2) {
                putInternal(i, (short) 1, bArr, i2, i3);
                if (this.nextSendMsg.get() == this.currentSequence.get()) {
                    fire();
                    return;
                }
                return;
            }
            if (isUnordered()) {
                if (i3 > (this.payMaxLen - 4) - 2) {
                    throw new RuntimeException("unordered message size must not exceed packet size");
                }
                fire();
                putMessageRecursive(i, bArr, i2, i3);
                return;
            }
            int i5 = (this.currentAvail - 4) - 8;
            if (i5 <= 8) {
                fire();
            } else {
                putInternal(i, (short) 2, bArr, i2, i5);
                fire();
                i = -1;
                i2 += i5;
                i3 -= i5;
                i4++;
                if (i4 >= this.maxSendPacketsQueued) {
                    waitForSenderMT(i3, false);
                    i4 = 0;
                }
            }
        }
    }

    public boolean isOptForLatency() {
        return this.optForLatency;
    }

    public boolean useSpinLock() {
        return this.useSpinLock && this.maxSendPacketsQueued > 0;
    }

    public void setOptForLatency(boolean z) {
        this.optForLatency = z;
    }

    private void putInternal(int i, short s, byte[] bArr, int i2, int i3) {
        int i4 = 0;
        if (i >= 0) {
            i4 = 1;
        }
        this.currentPacketBytePointer.setShort(s);
        this.currentPacketBytePointer.next(2);
        this.currentPacketBytePointer.setShort((short) (i3 + i4));
        this.currentPacketBytePointer.next(2);
        if (i >= 0) {
            this.currentPacketBytePointer.setByte((byte) i);
            this.currentPacketBytePointer.next(i4);
        }
        this.currentPacketBytePointer.setBytes(bArr, i2, i3);
        this.currentPacketBytePointer.next(i3);
        this.currentAvail -= (i3 + i4) + 4;
    }

    private void fire() {
        if (this.currentAvail == this.payMaxLen) {
            return;
        }
        this.currentPacketBytePointer.setShort((short) 3);
        long j = this.currentSequence.get();
        this.currentAvail -= 2;
        if (this.currentAvail < 0) {
            throw new RuntimeException("negative bytes left " + this.currentAvail);
        }
        getVolatile(j).setBytesLeft(this.currentAvail);
        long j2 = j + 1;
        DataPacket dataPacket = getVolatile(j2);
        dataPacket.dataPointer(this.currentPacketBytePointer);
        dataPacket.setSeqNo(j2);
        this.currentAvail = this.payMaxLen - 4;
        dataPacket.setSent(System.currentTimeMillis());
        this.currentSequence.incrementAndGet();
        if (useSpinLock()) {
            return;
        }
        synchronized (this.sendWakeupLock) {
            this.sendWakeupLock.notify();
        }
    }

    public boolean send(Transport transport) throws IOException {
        boolean z = false;
        if (this.retransRequests.size() > 0) {
            z = true;
            ArrayList<RetransPacket> arrayList = this.retransRequests;
            this.retransRequests = new ArrayList<>();
            mergeRetransmissions(transport, arrayList);
        }
        long j = this.nextSendMsg.get();
        long min = Math.min(j + 2, this.currentSequence.get());
        if (min == j) {
            return z;
        }
        sendPackets(transport, j, min, false, 0L);
        return true;
    }

    void putRetransSent(long j, long j2) {
        int i = (int) (j % 10000);
        this.sentRetransSeq[i] = j;
        this.sentRetransTimes[i] = j2;
    }

    long getLastRetransmitted(long j) {
        int i = (int) (j % 10000);
        if (this.sentRetransSeq[i] == j) {
            return this.sentRetransTimes[i];
        }
        return 0L;
    }

    private void mergeRetransmissions(Transport transport, ArrayList<RetransPacket> arrayList) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < arrayList.size(); i++) {
            RetransPacket retransPacket = arrayList.get(i);
            if (retransPacket != null) {
                sendRetransmissionResponse(transport, retransPacket, currentTimeMillis);
            }
        }
    }

    private void sendRetransmissionResponse(Transport transport, RetransPacket retransPacket, long j) throws IOException {
        for (int i = 0; i < retransPacket.getRetransIndex(); i++) {
            RetransEntry retransEntries = retransPacket.retransEntries(i);
            this.putLock.lock();
            if (getVolatile(retransEntries.getFrom()).getSeqNo() != retransEntries.getFrom()) {
                long from = retransEntries.getFrom();
                if (this.currentSequence.get() - from > this.maxRetransAge) {
                    this.maxRetransAge = (int) (this.currentSequence.get() - from);
                    FCLog.get().warn("old retransmission from " + retransPacket.getSender() + " age " + this.maxRetransAge + " requested:" + from + " curseq " + this.currentSequence.get() + " topic " + this.topicEntry.getConf().getName());
                }
                boolean z = false;
                if (this.offheapHistory != null && this.offheapHistory.hasSequence(from)) {
                    z = true;
                    long to = retransEntries.getTo();
                    long j2 = from;
                    while (true) {
                        long j3 = j2;
                        if (j3 >= to) {
                            break;
                        }
                        if (this.offheapHistory.getPacket(j3, this.offHeapRetransByte, 0) < 0) {
                            z = false;
                            FCLog.get().warn("history seq mismatch " + j3);
                        }
                        if (this.offHeapReadDataPacket == null) {
                            FSTTypedStructAllocator<DataPacket> fSTTypedStructAllocator = this.packetAllocator;
                            this.offHeapReadDataPacket = (DataPacket) FSTTypedStructAllocator.newPointer(DataPacket.class);
                        }
                        this.offHeapReadDataPacket.baseOn(this.offHeapRetransByte, 0);
                        int dGramSize = this.offHeapReadDataPacket.getDGramSize();
                        transport.send(new DatagramPacket(this.offHeapReadDataPacket.getBase(), this.offHeapReadDataPacket.getIndexInBase(), dGramSize));
                        this.stats.retransRSPSent(1, dGramSize);
                        this.sleeperSendMsg.sleepMicros(this.sendPauseMicros);
                        j2 = j3 + 1;
                    }
                    if (z) {
                    }
                }
                if (!z) {
                    this.dropMsg.setReceiver(retransPacket.getSender());
                    this.dropMsg.setSeqNo(retransEntries.getFrom());
                    FCLog.get().warn("Sending Drop " + this.dropMsg + " requestedSeq " + from + " on service " + getTopicEntry().getConf().getName() + " currentSeq " + this.currentSequence + " age: " + (this.currentSequence.get() - retransEntries.getFrom()));
                    transport.send(new DatagramPacket(this.dropMsg.getBase(), this.dropMsg.getIndexInBase(), this.dropMsg.getByteSize()));
                }
            } else {
                sendPackets(transport, retransEntries.getFrom(), retransEntries.getTo(), true, j);
            }
            this.putLock.unlock();
        }
        int computeNumPackets = retransPacket.computeNumPackets();
        for (int i2 = 0; i2 < computeNumPackets / 4; i2++) {
            this.sleeperSendMsg.sleepMicros(this.sendPauseMicros);
        }
    }

    private void sendPackets(Transport transport, long j, long j2, boolean z, long j3) throws IOException {
        long j4 = j;
        while (true) {
            long j5 = j4;
            if (j5 >= j2) {
                break;
            }
            DataPacket dataPacket = getVolatile(j5);
            if (z) {
                putRetransSent(j5, j3);
            } else {
                if (this.debugSeq != 0 && dataPacket.getSeqNo() != this.debugSeq + 1) {
                    FCLog.get().fatal("FATAL error, current seq:" + this.debugSeq + " expected Seq:[" + j5 + "] real read:" + dataPacket.getSeqNo());
                    FCLog.get().fatal("current put seq " + this.currentSequence.get());
                    FCLog.get().fatal("current send seq " + this.nextSendMsg.get());
                    FCLog.get().fatal("current pointer and currentpackpointer " + dataPacket.___offset + " " + this.currentPacketBytePointer.___offset);
                    FCLog.get().fatal(null, new Exception("stack trace"));
                    for (int i = 0; i < 20; i++) {
                        FCLog.get().fatal("  =>" + getVolatile(j5 + i).getSeqNo());
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        FCLog.log(e);
                    }
                    System.exit(1);
                }
                this.debugSeq = dataPacket.getSeqNo();
                dataPacket.setSendPauseSender(this.sendPauseMicros);
            }
            this.sleeperSendMsg.sleepMicros(this.sendPauseMicros);
            int dGramSize = dataPacket.getDGramSize();
            try {
                DatagramPacket datagramPacket = new DatagramPacket(dataPacket.getBase(), dataPacket.getOffset(), dGramSize);
                this.rate.count();
                transport.send(datagramPacket);
                if (z) {
                    this.stats.retransRSPSent(1, dGramSize);
                } else {
                    this.stats.dataPacketSent(dGramSize);
                }
                if (this.offheapHistory != null && !z) {
                    this.offheapHistory.putPacket(dataPacket.getSeqNo(), dataPacket.getBase(), dataPacket.getOffset(), dGramSize);
                }
                j4 = j5 + 1;
            } catch (Throwable th) {
                System.out.println("seq " + j5 + " start " + j + " end " + j2 + " idx " + getIndexFromSequence(j5) + " len " + dataPacket.getBase().length + " off+siz " + (dataPacket.getOffset() + dGramSize));
                throw new RuntimeException(th);
            }
        }
        if (z) {
            return;
        }
        this.nextSendMsg.set(j2);
    }

    private void dumpDiff(DataPacket dataPacket) {
        System.out.println("OFFHEAP RETRANS MISMATCH");
        System.out.println("packet=>");
        System.out.println(dataPacket);
        DataPacket dataPacket2 = (DataPacket) this.packetAllocator.newStruct();
        dataPacket2.baseOn(this.offHeapRetransByte, 0);
        System.out.println("off packet=>");
        System.out.println(dataPacket2);
    }

    public void addRetransmissionRequest(RetransPacket retransPacket, Transport transport) throws IOException {
        RetransPacket retransPacket2 = (RetransPacket) retransPacket.createCopy();
        this.stats.retransRQReceived(retransPacket2.computeNumPackets(), retransPacket2.getSendPauseSender());
        this.retransRequests.add(retransPacket2);
    }

    public Object getSendWakeupLock() {
        return this.sendWakeupLock;
    }

    public void doFlowControl() {
        if (this.control == null) {
        }
        int i = this.sendPauseMicros;
        if (this.control != null) {
            this.control.adjustSendPause(this.sendPauseMicros, this.stats);
            this.sendPauseMicros = Math.max(i, this.topicEntry.getConf().getSendPauseMicros());
            this.sendPauseMicros = Math.min(this.sendPauseMicros, 2 * this.topicEntry.getConf().getSendPauseMicros());
        }
        this.stats.setLastSendPause(this.sendPauseMicros);
        this.stats.reset();
    }
}
