package org.nustaq.kontraktor.impl;

import io.jaq.mpsc.MpscConcurrentQueue;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.remoting.http.rest.HttpObjectSocket;
import org.nustaq.kontraktor.util.Log;

/* loaded from: input_file:org/nustaq/kontraktor/impl/DispatcherThread.class */
public class DispatcherThread extends Thread {
    private Scheduler scheduler;
    private int maxThreads;
    public static int NUMBER_OF_MESSAGES_TO_PROCESS_PER_CHECK_FOR_NEW_ADDS = HttpObjectSocket.MAX_BATCHED_REQUESTS;
    public static int PROFILE_INTERVAL = 255;
    public static int SCHEDULE_PER_PROFILE = 32;
    public static int QUEUE_PERCENTAGE_TRIGGERING_REBALANCE = 80;
    public static int MILLIS_AFTER_CREATION_BEFORE_REBALANCING = 1000;
    public static int TRIGGER_REBALANCE_COUNTER = 2;
    static AtomicInteger dtcount = new AtomicInteger(0);
    private Actor[] actors = new Actor[0];
    ConcurrentLinkedQueue<Actor> toAdd = new ConcurrentLinkedQueue<>();
    protected boolean shutDown = false;
    int stackDepth = 0;
    int count = 0;
    int profileCounter = 0;
    int schedCounter = 0;
    int loadCounter = 0;
    int nextProfile = 511;
    long created = System.currentTimeMillis();

    public DispatcherThread(Scheduler scheduler) {
        this.scheduler = scheduler;
        this.maxThreads = scheduler.getMaxThreads();
        setName("DispatcherThread " + dtcount.incrementAndGet());
    }

    @Override // java.lang.Thread
    public String toString() {
        return "DispatcherThread{ name:" + getName() + '}';
    }

    public void addActor(Actor actor) {
        this.toAdd.offer(actor.getActorRef());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeActorImmediate(Actor actor) {
        if (Thread.currentThread() != this) {
            throw new RuntimeException("wrong thread");
        }
        Actor[] actorArr = new Actor[this.actors.length - 1];
        int i = 0;
        for (int i2 = 0; i2 < this.actors.length; i2++) {
            Actor actor2 = this.actors[i2];
            if (actor2 != actor) {
                int i3 = i;
                i++;
                actorArr[i3] = actor2;
            }
        }
        if (i != actorArr.length) {
            throw new RuntimeException("could not remove actor");
        }
        this.actors = actorArr;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int i = 0;
        int i2 = 0;
        boolean z = false;
        while (!z) {
            if (pollQs(null)) {
                i = 0;
                i2++;
                if (i2 > NUMBER_OF_MESSAGES_TO_PROCESS_PER_CHECK_FOR_NEW_ADDS) {
                    i2 = 0;
                    schedulePendingAdds();
                }
            } else {
                i++;
                this.scheduler.yield(i);
                if (this.shutDown) {
                    z = true;
                }
                if (this.scheduler.getBackoffStrategy().isSleeping(i)) {
                    i2 = 0;
                    schedulePendingAdds();
                    if (System.currentTimeMillis() - this.created > 3000) {
                        if (this.actors.length == 0 && this.toAdd.peek() == null) {
                            shutDown();
                        } else {
                            this.scheduler.tryStopThread(this);
                        }
                    }
                }
            }
        }
        this.scheduler.threadStopped(this);
        int i3 = 0;
        while (i3 < 100) {
            LockSupport.parkNanos(5000000L);
            if (this.actors.length > 0) {
                if (ElasticScheduler.DEBUG_SCHEDULING) {
                    Log.Lg.warn(this, "Severe: zombie dispatcher thread detected");
                }
                this.scheduler.tryStopThread(this);
                i3 = 0;
            }
            i3++;
        }
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "dipatcher thread terminated");
        }
    }

    private void schedulePendingAdds() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            Actor poll = this.toAdd.poll();
            if (poll == null) {
                break;
            } else {
                arrayList.add(poll);
            }
        }
        if (arrayList.size() > 0) {
            Actor[] actorArr = new Actor[arrayList.size() + this.actors.length];
            System.arraycopy(this.actors, 0, actorArr, 0, this.actors.length);
            for (int i = 0; i < arrayList.size(); i++) {
                actorArr[this.actors.length + i] = (Actor) arrayList.get(i);
            }
            this.actors = actorArr;
        }
    }

    protected CallEntry pollQueues(Actor[] actorArr, Actor actor) {
        if (this.count >= actorArr.length) {
            this.count = 0;
            if (actorArr.length == 0) {
                return null;
            }
        }
        Actor actor2 = actorArr[this.count];
        if (actor2 == actor) {
            if (actorArr.length <= 1) {
                return null;
            }
            this.count++;
            pollQueues(actorArr, actor);
            return null;
        }
        CallEntry callEntry = (CallEntry) actor2.__cbQueue.poll();
        if (callEntry == null) {
            callEntry = (CallEntry) actor2.__mailbox.poll();
        }
        this.count++;
        return callEntry;
    }

    public boolean pollQs(Actor actor) {
        Object invoke;
        CallEntry pollQueues = pollQueues(this.actors, actor);
        if (pollQueues == null) {
            return false;
        }
        try {
            Actor.sender.set(pollQueues.getTargetActor());
            if (this.maxThreads > 1) {
                this.profileCounter++;
                if (this.profileCounter <= this.nextProfile || !(pollQueues.getTarget() instanceof Actor)) {
                    invoke = invoke(pollQueues);
                } else {
                    this.profileCounter = 0;
                    invoke = profiledCall(pollQueues);
                }
            } else {
                invoke = invoke(pollQueues);
            }
            if (pollQueues.getFutureCB() == null) {
                return true;
            }
            final Future futureCB = pollQueues.getFutureCB();
            ((Promise) invoke).then(new Callback() { // from class: org.nustaq.kontraktor.impl.DispatcherThread.1
                @Override // org.nustaq.kontraktor.Callback
                public void receive(Object obj, Object obj2) {
                    futureCB.receive(obj, obj2);
                }
            });
            return true;
        } catch (Exception e) {
            if ((e instanceof InvocationTargetException) && ((InvocationTargetException) e).getTargetException() == ActorStoppedException.Instance) {
                Actor actor2 = (Actor) pollQueues.getTarget();
                actor2.__stopped = true;
                removeActorImmediate(actor2.getActorRef());
                return true;
            }
            if (pollQueues.getFutureCB() != null) {
                pollQueues.getFutureCB().receive(null, e);
            }
            Log.Warn(this, e, "");
            return false;
        }
    }

    private Object invoke(CallEntry callEntry) throws IllegalAccessException, InvocationTargetException {
        return callEntry.getMethod().invoke(callEntry.getTarget(), callEntry.getArgs());
    }

    private Object profiledCall(CallEntry callEntry) throws IllegalAccessException, InvocationTargetException {
        this.nextProfile = (int) (PROFILE_INTERVAL + (Math.random() * 13.0d));
        this.schedCounter++;
        long nanoTime = System.nanoTime();
        Object invoke = invoke(callEntry);
        ((Actor) callEntry.getTarget()).__nanos = ((((Actor) callEntry.getTarget()).__nanos * 31) + (System.nanoTime() - nanoTime)) / 32;
        if (this.schedCounter > SCHEDULE_PER_PROFILE) {
            this.schedCounter = 0;
            checkForSplit();
        }
        return invoke;
    }

    private void checkForSplit() {
        if (getLoad() <= QUEUE_PERCENTAGE_TRIGGERING_REBALANCE || this.actors.length <= 1 || System.currentTimeMillis() - this.created <= MILLIS_AFTER_CREATION_BEFORE_REBALANCING) {
            return;
        }
        this.loadCounter++;
        if (this.loadCounter > TRIGGER_REBALANCE_COUNTER) {
            this.loadCounter = 0;
            this.scheduler.rebalance(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void splitTo(DispatcherThread dispatcherThread) {
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "SPLIT " + this.scheduler.getMaxThreads());
        }
        long j = 0;
        long j2 = 0;
        Arrays.sort(this.actors, new Comparator() { // from class: org.nustaq.kontraktor.impl.DispatcherThread.2
            @Override // java.util.Comparator
            public int compare(Object obj, Object obj2) {
                return ((Actor) obj).__nanos - ((Actor) obj2).__nanos > 0 ? -1 : 1;
            }
        });
        for (int i = 0; i < this.actors.length; i++) {
            long j3 = this.actors[i].__nanos;
            if (j2 < j) {
                j2 += j3;
            } else {
                j += j3;
            }
        }
        long j4 = 0;
        long j5 = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < this.actors.length; i2++) {
            Actor actor = this.actors[i2];
            long j6 = actor.__nanos;
            if (j4 < j5) {
                arrayList2.add(actor);
                j4 += j6;
                actor.__currentDispatcher = dispatcherThread;
            } else {
                arrayList.add(actor);
                j5 += j6;
            }
        }
        this.actors = new Actor[arrayList.size()];
        arrayList.toArray(this.actors);
        dispatcherThread.actors = new Actor[arrayList2.size()];
        arrayList2.toArray(dispatcherThread.actors);
        if (ElasticScheduler.DEBUG_SCHEDULING) {
            Log.Info(this, "split distribution " + j5 + ":" + j4 + " actors " + this.actors.length);
        }
        this.created = System.currentTimeMillis();
    }

    public int getLoad() {
        int i = 0;
        for (Actor actor : this.actors) {
            MpscConcurrentQueue mpscConcurrentQueue = (MpscConcurrentQueue) actor.__mailbox;
            int size = (mpscConcurrentQueue.size() * 100) / mpscConcurrentQueue.getCapacity();
            if (size > i) {
                i = size;
            }
        }
        return i;
    }

    public long getLoadNanos() {
        long j = 0;
        for (Actor actor : this.actors) {
            j += actor.__nanos;
        }
        return j;
    }

    public int getQSize() {
        int i = 0;
        for (Actor actor : this.actors) {
            i = i + actor.__mailbox.size() + actor.__cbQueue.size();
        }
        return i;
    }

    public boolean isShutDown() {
        return !this.shutDown;
    }

    public void shutDown() {
        this.shutDown = true;
    }

    public void shutDownImmediate() {
        throw new RuntimeException("unimplemented");
    }

    public boolean isEmpty() {
        for (int i = 0; i < this.actors.length; i++) {
            Actor actor = this.actors[i];
            if (!actor.__mailbox.isEmpty() || !actor.__cbQueue.isEmpty()) {
                return false;
            }
        }
        return true;
    }

    public void waitEmpty(long j) {
        while (!isEmpty()) {
            LockSupport.parkNanos(j);
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public Actor[] getActors() {
        Actor[] actorArr = this.actors;
        Actor[] actorArr2 = new Actor[actorArr.length];
        System.arraycopy(actorArr, 0, actorArr2, 0, actorArr2.length);
        return actorArr2;
    }

    Actor[] getActorsNoCopy() {
        return this.actors;
    }
}
