package org.nustaq.fastcast.impl;

import java.io.IOException;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.fastcast.api.FCPublisher;
import org.nustaq.fastcast.api.FCSubscriber;
import org.nustaq.fastcast.api.FastCast;
import org.nustaq.fastcast.config.PhysicalTransportConf;
import org.nustaq.fastcast.config.PublisherConf;
import org.nustaq.fastcast.config.SubscriberConf;
import org.nustaq.fastcast.transport.PhysicalTransport;
import org.nustaq.fastcast.util.FCLog;
import org.nustaq.offheap.structs.FSTStructAllocator;
import org.nustaq.offheap.structs.structtypes.StructString;

/* loaded from: input_file:org/nustaq/fastcast/impl/TransportDriver.class */
public class TransportDriver {
    private static final boolean RETRANS_DEBUG = false;
    public static int MAX_NUM_TOPICS = 256;
    int spinIdleLoopMicros;
    int idleParkMicros;
    volatile PhysicalTransport trans;
    ReceiveBufferDispatcher[] receiver;
    PacketSendBuffer[] sender;
    long[] lastMsg;
    StructString nodeId;
    Thread receiverThread;
    Thread houseKeeping;
    long autoFlushMS;
    Packet receivedPacket;
    DataPacket tmpP;
    FSTStructAllocator alloc = new FSTStructAllocator(1);
    private ConcurrentHashMap<Integer, Topic> topics = new ConcurrentHashMap<>();
    int tCheckCounter = 0;
    volatile int terminationCounter = 0;
    long lastTimeoutCheck = System.currentTimeMillis();
    PhysicalTransport emptyTransport = new PhysicalTransport() { // from class: org.nustaq.fastcast.impl.TransportDriver.3
        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public boolean receive(ByteBuffer byteBuffer) throws IOException {
            return false;
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public boolean receive(DatagramPacket datagramPacket) throws IOException {
            return false;
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public void send(DatagramPacket datagramPacket) throws IOException {
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public void send(ByteBuffer byteBuffer) throws IOException {
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public void join() throws IOException {
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public PhysicalTransportConf getConf() {
            return null;
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public void close() {
        }

        @Override // org.nustaq.fastcast.transport.PhysicalTransport
        public boolean isBlocking() {
            return false;
        }
    };

    public TransportDriver(PhysicalTransport physicalTransport, String str) {
        this.spinIdleLoopMicros = 10000000;
        this.idleParkMicros = 500;
        this.trans = physicalTransport;
        this.nodeId = this.alloc.newStruct(new StructString(str));
        PhysicalTransportConf conf = physicalTransport.getConf();
        this.autoFlushMS = conf.getAutoFlushMS();
        this.spinIdleLoopMicros = conf.getSpinLoopMicros();
        this.idleParkMicros = conf.getIdleParkMicros();
        this.receiver = new ReceiveBufferDispatcher[MAX_NUM_TOPICS];
        this.sender = new PacketSendBuffer[MAX_NUM_TOPICS];
        this.lastMsg = new long[MAX_NUM_TOPICS];
        this.receiverThread = new Thread("trans receiver " + conf.getName()) { // from class: org.nustaq.fastcast.impl.TransportDriver.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                TransportDriver.this.receiveLoop();
            }
        };
        this.receiverThread.start();
        this.houseKeeping = new Thread("trans houseKeeping " + conf.getName()) { // from class: org.nustaq.fastcast.impl.TransportDriver.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                TransportDriver.this.houseKeepingLoop();
            }
        };
        this.houseKeeping.start();
    }

    private void installReceiver(Topic topic, FCSubscriber fCSubscriber) {
        ReceiveBufferDispatcher receiveBufferDispatcher = new ReceiveBufferDispatcher(this.trans.getConf().getDgramsize(), this.nodeId.toString(), topic, fCSubscriber);
        if (this.receiver[topic.getTopicId()] != null) {
            throw new RuntimeException("double usage of topic " + topic.getTopicId() + " on transport " + this.trans.getConf().getName());
        }
        this.receiver[topic.getTopicId()] = receiveBufferDispatcher;
    }

    public boolean hasReceiver(int i) {
        return this.receiver[i] != null;
    }

    public boolean hasSender(int i) {
        return this.sender[i] != null;
    }

    private PacketSendBuffer installSender(Topic topic) {
        if (this.sender[topic.getTopicId()] != null) {
            return this.sender[topic.getTopicId()];
        }
        PacketSendBuffer packetSendBuffer = new PacketSendBuffer(this.trans, this.nodeId.toString(), topic);
        this.sender[topic.getTopicId()] = packetSendBuffer;
        topic.setSender(packetSendBuffer);
        return packetSendBuffer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void houseKeepingLoop() {
        ArrayList arrayList = new ArrayList();
        while (!isTerminated()) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - this.lastTimeoutCheck > 2000) {
                    this.lastTimeoutCheck = currentTimeMillis;
                    for (int i = 0; i < this.receiver.length; i++) {
                        ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
                        if (receiveBufferDispatcher != null) {
                            Topic topicEntry = receiveBufferDispatcher.getTopicEntry();
                            arrayList.clear();
                            List<String> timedOutSenders = topicEntry.getTimedOutSenders(arrayList, currentTimeMillis, topicEntry.getHbTimeoutMS());
                            if (timedOutSenders != null && timedOutSenders.size() > 0 && !isTerminated()) {
                                cleanup(timedOutSenders, i);
                            }
                        }
                    }
                }
                for (int i2 = 0; i2 < this.sender.length; i2++) {
                    PacketSendBuffer packetSendBuffer = this.sender[i2];
                    if (packetSendBuffer != null) {
                        long j = packetSendBuffer.lastMsgFlush;
                        if (this.lastMsg[i2] == 0) {
                            this.lastMsg[i2] = j;
                        } else if (this.lastMsg[i2] != j) {
                            this.lastMsg[i2] = j;
                        } else if (!isTerminated()) {
                            packetSendBuffer.flush();
                        }
                    }
                }
                try {
                    Thread.sleep(this.autoFlushMS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        this.terminationCounter++;
    }

    private boolean isTerminated() {
        return this.trans == this.emptyTransport;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveLoop() {
        byte[] bArr = new byte[this.trans.getConf().getDgramsize()];
        DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
        ByteBuffer wrap = ByteBuffer.wrap(datagramPacket.getData(), datagramPacket.getOffset(), datagramPacket.getLength());
        this.receivedPacket = (Packet) this.alloc.newStruct(new Packet());
        long nanoTime = System.nanoTime();
        this.receivedPacket.baseOn(bArr, 0);
        while (true) {
            boolean z = true;
            try {
                wrap.position(0);
                if (receiveDatagram(wrap, bArr)) {
                    nanoTime = System.nanoTime();
                    z = false;
                } else if (System.nanoTime() - nanoTime <= this.spinIdleLoopMicros * 1000) {
                    z = false;
                    nanoTime = (ThreadLocalRandom.current().nextInt() & 1) == 0 ? nanoTime + 1 : nanoTime - 1;
                } else if (!this.trans.isBlocking() && this.idleParkMicros > 0) {
                    LockSupport.parkNanos(1000 * this.idleParkMicros);
                }
                this.tCheckCounter++;
                if (this.tCheckCounter == 100000 || z || this.spinIdleLoopMicros == 0) {
                    this.tCheckCounter = 0;
                    if (isTerminated()) {
                        break;
                    }
                }
            } catch (Throwable th) {
                FCLog.log(th);
            }
        }
        while (this.terminationCounter < 1) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500L));
        }
        this.alloc.free();
        for (int i = 0; i < this.receiver.length; i++) {
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
            if (receiveBufferDispatcher != null) {
                receiveBufferDispatcher.cleanupTopic();
            }
        }
    }

    public void terminate() {
        PhysicalTransport physicalTransport = this.trans;
        this.trans = this.emptyTransport;
        physicalTransport.close();
    }

    private boolean receiveDatagram(ByteBuffer byteBuffer, byte[] bArr) throws IOException {
        ReceiveBufferDispatcher receiveBufferDispatcher;
        if (!this.trans.receive(byteBuffer) || this.receivedPacket.getSender().equals(this.nodeId)) {
            return false;
        }
        int topic = this.receivedPacket.getTopic();
        if (topic > MAX_NUM_TOPICS || topic < 0) {
            FCLog.get().warn("foreign traffic");
            return true;
        }
        if (this.receiver[topic] == null && this.sender[topic] == null) {
            return true;
        }
        Class pointedClass = this.receivedPacket.getPointedClass();
        StructString receiver = this.receivedPacket.getReceiver();
        if (pointedClass == DataPacket.class) {
            if (this.receiver[topic] == null) {
                return true;
            }
            dispatchDataPacket(this.receivedPacket, topic);
            return true;
        }
        if (pointedClass == RetransPacket.class) {
            if (this.sender[topic] == null || receiver == null || !receiver.equals(this.nodeId)) {
                return true;
            }
            dispatchRetransmissionRequest(this.receivedPacket, topic);
            return true;
        }
        if (pointedClass != ControlPacket.class || isForeignReceiver(receiver) || ((ControlPacket) this.receivedPacket.cast()).getType() != 0 || (receiveBufferDispatcher = this.receiver[topic]) == null) {
            return true;
        }
        FCLog.get().warn(this.nodeId + " has been dropped by " + this.receivedPacket.getSender() + " on service " + receiveBufferDispatcher.getTopicEntry().getTopicId());
        FCSubscriber subscriber = receiveBufferDispatcher.getTopicEntry().getSubscriber();
        if (subscriber == null) {
            return true;
        }
        if (!subscriber.dropped()) {
            this.receiver[topic] = null;
            receiveBufferDispatcher.cleanupTopic();
            return true;
        }
        FCLog.get().warn("..resyncing..");
        PacketReceiveBuffer buffer = receiveBufferDispatcher.getBuffer(this.receivedPacket.getSender());
        if (buffer != null) {
            buffer.resync();
            return true;
        }
        FCLog.get().warn("unexpected null buffer");
        return true;
    }

    private boolean isForeignReceiver(StructString structString) {
        return (structString == null || structString.getLen() <= 0 || structString.equals(this.nodeId)) ? false : true;
    }

    private void dispatchDataPacket(Packet packet, int i) throws IOException {
        PacketReceiveBuffer buffer = this.receiver[i].getBuffer(packet.getSender());
        this.tmpP = (DataPacket) packet.cast().detachTo(this.tmpP);
        RetransPacket receivePacket = buffer.receivePacket(this.tmpP);
        if (receivePacket != null) {
            if (PacketSendBuffer.RETRANSDEBUG) {
                FCLog.get().info("send retrans request " + receivePacket + " " + receivePacket.getClzId());
            }
            this.trans.send(new DatagramPacket(receivePacket.getBase().toBytes(receivePacket.getOffset(), receivePacket.getByteSize()), 0, receivePacket.getByteSize()));
        }
    }

    private void dispatchRetransmissionRequest(Packet packet, int i) throws IOException {
        this.sender[i].addRetransmissionRequest((RetransPacket) packet.cast().detach(), this.trans);
    }

    void cleanup(List<String> list, int i) {
        for (int i2 = 0; i2 < list.size(); i2++) {
            String str = list.get(i2);
            ReceiveBufferDispatcher receiveBufferDispatcher = this.receiver[i];
            FCLog.get().info("stopped receiving heartbeats from " + str);
            if (receiveBufferDispatcher != null) {
                receiveBufferDispatcher.cleanup(str);
            }
        }
    }

    public void subscribe(String str, FCSubscriber fCSubscriber) {
        subscribe(FastCast.getFastCast().getSubscriberConf(str), fCSubscriber);
    }

    public void subscribe(SubscriberConf subscriberConf, FCSubscriber fCSubscriber) {
        Topic topic = this.topics.get(Integer.valueOf(subscriberConf.getTopicId()));
        if (topic == null) {
            topic = new Topic(null, null);
        }
        if (topic.getPublisherConf() != null) {
            throw new RuntimeException("already a sender registered at " + subscriberConf.getTopicId());
        }
        topic.setSubscriberConf(subscriberConf);
        topic.setChannelDispatcher(this);
        topic.setSubscriber(fCSubscriber);
        installReceiver(topic, fCSubscriber);
    }

    public FCPublisher publish(String str) {
        return publish(FastCast.getFastCast().getPublisherConf(str));
    }

    public FCPublisher publish(PublisherConf publisherConf) {
        Topic topic = this.topics.get(Integer.valueOf(publisherConf.getTopicId()));
        if (topic == null) {
            topic = new Topic(null, null);
        }
        if (topic.getPublisherConf() != null) {
            throw new RuntimeException("already a sender registered at " + publisherConf.getTopicId());
        }
        topic.setChannelDispatcher(this);
        topic.setPublisherConf(publisherConf);
        this.topics.put(Integer.valueOf(publisherConf.getTopicId()), topic);
        return installSender(topic);
    }

    public ReceiveBufferDispatcher getReceiver(int i) {
        return this.receiver[i];
    }
}
