package de.ruedigermoeller.fastcast.control;

import de.ruedigermoeller.fastcast.packeting.ControlPacket;
import de.ruedigermoeller.fastcast.packeting.DataPacket;
import de.ruedigermoeller.fastcast.packeting.MsgReceiver;
import de.ruedigermoeller.fastcast.packeting.Packet;
import de.ruedigermoeller.fastcast.packeting.PacketSendBuffer;
import de.ruedigermoeller.fastcast.packeting.RetransPacket;
import de.ruedigermoeller.fastcast.packeting.TopicEntry;
import de.ruedigermoeller.fastcast.remoting.FCRemotingListener;
import de.ruedigermoeller.fastcast.remoting.FCTopicService;
import de.ruedigermoeller.fastcast.remoting.FastCast;
import de.ruedigermoeller.fastcast.transport.Transport;
import de.ruedigermoeller.fastcast.util.FCLog;
import de.ruedigermoeller.heapoff.bytez.Bytez;
import de.ruedigermoeller.heapoff.bytez.onheap.HeapBytez;
import de.ruedigermoeller.heapoff.structs.FSTStructAllocator;
import de.ruedigermoeller.heapoff.structs.structtypes.StructString;
import java.io.IOException;
import java.net.DatagramPacket;
import java.util.List;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:de/ruedigermoeller/fastcast/control/FCTransportDispatcher.class */
public class FCTransportDispatcher {
    public static final int IDLE_SPIN_LOCK_PARK_NANOS = 30000;
    public static final int IDLE_SPIN_IDLE_COUNT = 10000;
    public static int MAX_NUM_TOPICS = 256;
    Transport trans;
    StructString clusterName;
    StructString nodeId;
    Thread receiverThread;
    Packet receivedPacket;
    FSTStructAllocator alloc = new FSTStructAllocator(1);
    Bytez heartbeat = new HeapBytez(new byte[]{99});
    ReceiveBufferDispatcher[] receiver = new ReceiveBufferDispatcher[MAX_NUM_TOPICS];
    PacketSendBuffer[] sender = new PacketSendBuffer[MAX_NUM_TOPICS];
    Thread calbackCleaner = new Thread("callback cleaner") { // from class: de.ruedigermoeller.fastcast.control.FCTransportDispatcher.2
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FCTransportDispatcher.this.cleanCBLoop();
        }
    };

    public FCTransportDispatcher(Transport transport, String str, String str2) {
        this.trans = transport;
        this.nodeId = this.alloc.newStruct(new StructString(str2));
        this.clusterName = this.alloc.newStruct(new StructString(str));
        this.receiverThread = new Thread("trans receiver " + transport.getConf().getName()) { // from class: de.ruedigermoeller.fastcast.control.FCTransportDispatcher.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                FCTransportDispatcher.this.receiveLoop();
            }
        };
        this.receiverThread.start();
        this.calbackCleaner.start();
    }

    void cleanCBLoop() {
        while (true) {
            try {
                Thread.sleep(100L);
                long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < this.sender.length; i++) {
                    PacketSendBuffer packetSendBuffer = this.sender[i];
                    if (packetSendBuffer != null) {
                        packetSendBuffer.getTopicEntry().getCbMap().release(currentTimeMillis);
                    }
                }
            } catch (Exception e) {
                FCLog.log(e);
            }
        }
    }

    public void installReceiver(TopicEntry topicEntry, MsgReceiver msgReceiver) {
        ReceiveBufferDispatcher receiveBufferDispatcher = new ReceiveBufferDispatcher(this.trans.getConf().getDgramsize(), this.clusterName.toString(), this.nodeId.toString(), topicEntry, msgReceiver);
        if (this.receiver[topicEntry.getTopic()] != null) {
            throw new RuntimeException("double usage of topic " + topicEntry.getTopic() + " on transport " + this.trans.getConf().getName());
        }
        this.receiver[topicEntry.getTopic()] = receiveBufferDispatcher;
    }

    public boolean hasReceiver(TopicEntry topicEntry) {
        return this.receiver[topicEntry.getTopic()] != null;
    }

    public boolean hasSender(TopicEntry topicEntry) {
        return this.sender[topicEntry.getTopic()] != null;
    }

    public void installSender(final TopicEntry topicEntry) {
        if (this.sender[topicEntry.getTopic()] != null) {
            return;
        }
        topicEntry.setTrans(this.trans);
        PacketSendBuffer packetSendBuffer = new PacketSendBuffer(this.trans.getConf().getDgramsize(), this.clusterName.toString(), this.nodeId.toString(), topicEntry);
        this.sender[topicEntry.getTopic()] = packetSendBuffer;
        topicEntry.setSender(packetSendBuffer);
        if (topicEntry.getConf().getMaxSendPacketQueueSize() == 0) {
            return;
        }
        new Thread("trans sender " + this.trans.getConf().getName() + " " + topicEntry.getName()) { // from class: de.ruedigermoeller.fastcast.control.FCTransportDispatcher.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                FCTransportDispatcher.this.sendLoop(topicEntry);
            }
        }.start();
    }

    public PacketSendBuffer getSender(int i) {
        return this.sender[i];
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendLoop(TopicEntry topicEntry) {
        while (true) {
            if (this.trans != null && this.sender[topicEntry.getTopic()] != null) {
                break;
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                FCLog.log(e);
            }
        }
        PacketSendBuffer packetSendBuffer = this.sender[topicEntry.getTopic()];
        long currentTimeMillis = System.currentTimeMillis();
        long currentTimeMillis2 = System.currentTimeMillis();
        int i = 0;
        long flowControlInterval = topicEntry.getConf().getFlowControlInterval();
        long heartbeatInterval = topicEntry.getConf().getHeartbeatInterval();
        int i2 = 0;
        while (true) {
            try {
                i2 = packetSendBuffer.send(this.trans) ? 0 : i2 + 1;
                if (!packetSendBuffer.useSpinLock() && i2 > 10000) {
                    Object sendWakeupLock = packetSendBuffer.getSendWakeupLock();
                    i2 = 0;
                    synchronized (sendWakeupLock) {
                        sendWakeupLock.wait(0L, IDLE_SPIN_LOCK_PARK_NANOS);
                    }
                }
                i++;
                if (i % 100 == 0) {
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (currentTimeMillis3 - currentTimeMillis > flowControlInterval) {
                        packetSendBuffer.doFlowControl();
                        currentTimeMillis = currentTimeMillis3;
                    }
                    if (currentTimeMillis3 - currentTimeMillis2 > heartbeatInterval && putHeartbeat(packetSendBuffer)) {
                        currentTimeMillis2 = currentTimeMillis3;
                    }
                }
            } catch (Exception e2) {
                FCLog.log(e2);
            }
        }
    }

    public boolean putHeartbeat(PacketSendBuffer packetSendBuffer) {
        return packetSendBuffer.putMessage(-1, this.heartbeat, 0, 1, true);
    }

    void receiveLoop() {
        byte[] bArr = new byte[this.trans.getConf().getDgramsize()];
        DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
        this.receivedPacket = (Packet) this.alloc.newStruct(new Packet());
        int i = 0;
        while (true) {
            try {
                if (receiveDatagram(datagramPacket)) {
                    i = 0;
                } else {
                    i++;
                    if (i > 10000) {
                        LockSupport.parkNanos(30000L);
                    }
                }
            } catch (IOException e) {
                FCLog.log(e);
            }
        }
    }

    private boolean receiveDatagram(DatagramPacket datagramPacket) throws IOException {
        ReceiveBufferDispatcher receiveBufferDispatcher;
        if (!this.trans.receive(datagramPacket)) {
            return false;
        }
        this.receivedPacket.baseOn(datagramPacket.getData(), datagramPacket.getOffset());
        boolean equals = this.receivedPacket.getCluster().equals(this.clusterName);
        boolean equals2 = this.receivedPacket.getSender().equals(this.nodeId);
        if (!equals || equals2) {
            return true;
        }
        int topic = this.receivedPacket.getTopic();
        if (topic > MAX_NUM_TOPICS || topic < 0) {
            FCLog.get().warn("foreign traffic");
            return true;
        }
        if (this.receiver[topic] == null && this.sender[topic] == null) {
            return true;
        }
        Class pointedClass = this.receivedPacket.getPointedClass();
        StructString receiver = this.receivedPacket.getReceiver();
        if (pointedClass == DataPacket.class) {
            if (this.receiver[topic] == null) {
                return true;
            }
            if (receiver != null && receiver.getLen() != 0 && !receiver.equals(this.nodeId)) {
                return true;
            }
            dispatchDataPacket(this.receivedPacket, topic);
            return true;
        }
        if (pointedClass == RetransPacket.class) {
            if (this.sender[topic] == null || !receiver.equals(this.nodeId)) {
                return true;
            }
            dispatchRetransmissionRequest(this.receivedPacket, topic);
            return true;
        }
        if (pointedClass != ControlPacket.class || ((ControlPacket) this.receivedPacket.cast()).getType() != ControlPacket.DROPPED || !receiver.equals(this.nodeId) || (receiveBufferDispatcher = this.receiver[topic]) == null) {
            return true;
        }
        FCLog.get().warn(this.nodeId + " has been dropped by " + this.receivedPacket.getSender() + " on service " + receiveBufferDispatcher.getTopicEntry().getName());
        FCTopicService service = receiveBufferDispatcher.getTopicEntry().getService();
        if (service != null) {
            service.droppedFromReceiving();
        }
        FCRemotingListener remotingListener = FastCast.getRemoting().getRemotingListener();
        if (remotingListener != null) {
            remotingListener.droppedFromTopic(receiveBufferDispatcher.getTopicEntry().getTopic(), receiveBufferDispatcher.getTopicEntry().getName());
        }
        this.receiver[topic] = null;
        return true;
    }

    private void dispatchDataPacket(Packet packet, int i) throws IOException {
        RetransPacket receivePacket = this.receiver[i].getBuffer(packet.getSender()).receivePacket((DataPacket) packet.cast().detach());
        if (receivePacket != null) {
            this.trans.send(new DatagramPacket(receivePacket.getBase().toBytes((int) receivePacket.getOffset(), receivePacket.getByteSize()), 0, receivePacket.getByteSize()));
        }
    }

    private void dispatchRetransmissionRequest(Packet packet, int i) throws IOException {
        this.sender[i].addRetransmissionRequest((RetransPacket) packet.cast().detach(), this.trans);
    }

    public void startListening(TopicEntry topicEntry) {
        installReceiver(topicEntry, topicEntry.getMsgReceiver());
    }

    public void stopListening(TopicEntry topicEntry) {
        this.receiver[topicEntry.getTopic()] = null;
    }

    public void cleanup(List<String> list, int i) {
        for (int i2 = 0; i2 < list.size(); i2++) {
            String str = list.get(i2);
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
            FCLog.get().cluster("stopped receiving heartbeats from " + str);
            if (receiveBufferDispatcher != null) {
                receiveBufferDispatcher.cleanup(str);
            }
        }
    }
}
