package org.nustaq.kontraktor.impl;

import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.monitoring.Monitorable;
import org.nustaq.kontraktor.util.Log;

/* loaded from: input_file:org/nustaq/kontraktor/impl/ElasticScheduler.class */
public class ElasticScheduler implements Scheduler, Monitorable {
    public static final int MAX_STACK_ON_SYNC_CBDISPATCH = 200000;
    int maxThread;
    protected BackOffStrategy backOffStrategy;
    final DispatcherThread[] threads;
    int defQSize;
    protected ExecutorService exec;
    private AtomicInteger isolateCount;
    final Object balanceLock;
    public static int DEFQSIZE = 32768;
    public static boolean DEBUG_SCHEDULING = true;
    public static boolean REALLY_DEBUG_SCHEDULING = false;
    public static int RECURSE_ON_BLOCK_THRESHOLD = 2;
    public static Timer delayedCalls = new Timer();

    /* loaded from: input_file:org/nustaq/kontraktor/impl/ElasticScheduler$CallbackInvokeHandler.class */
    class CallbackInvokeHandler implements InvocationHandler {
        final Object target;
        final Actor targetActor;

        public CallbackInvokeHandler(Object obj, Actor actor) {
            this.target = obj;
            this.targetActor = actor;
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(obj, objArr);
            }
            if (this.target == null) {
                return null;
            }
            ElasticScheduler.this.put2QueuePolling(this.targetActor.__cbQueue, true, new CallEntry(this.target, method, objArr, Actor.sender.get(), this.targetActor, true), this.targetActor);
            return null;
        }
    }

    /* loaded from: input_file:org/nustaq/kontraktor/impl/ElasticScheduler$SchedulingReport.class */
    public static class SchedulingReport implements Serializable {
        int numDispatchers;
        int defQSize;
        int isolatedThreads;

        public SchedulingReport() {
        }

        public SchedulingReport(int i, int i2, int i3) {
            this.numDispatchers = i;
            this.defQSize = i2;
            this.isolatedThreads = i3;
        }

        public int getNumDispatchers() {
            return this.numDispatchers;
        }

        public int getDefQSize() {
            return this.defQSize;
        }
    }

    public ElasticScheduler(int i) {
        this(i, DEFQSIZE);
    }

    public ElasticScheduler(int i, int i2) {
        this.maxThread = Runtime.getRuntime().availableProcessors();
        this.backOffStrategy = new BackOffStrategy();
        this.defQSize = DEFQSIZE;
        this.exec = Executors.newCachedThreadPool();
        this.isolateCount = new AtomicInteger(0);
        this.balanceLock = new Object();
        this.maxThread = i;
        this.defQSize = i2;
        if (i2 <= 1) {
            this.defQSize = DEFQSIZE;
        }
        this.threads = new DispatcherThread[i];
    }

    public int getActiveThreads() {
        int i = 0;
        for (int i2 = 0; i2 < this.threads.length; i2++) {
            if (this.threads[i2] != null) {
                i++;
            }
        }
        return i;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public int getMaxThreads() {
        return this.maxThread;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public int getDefaultQSize() {
        return this.defQSize;
    }

    public Future put2QueuePolling(CallEntry callEntry) {
        final Promise promise;
        if (!callEntry.hasFutureResult() || (callEntry.getFutureCB() instanceof CallbackWrapper)) {
            promise = null;
        } else {
            promise = new Promise();
            callEntry.setFutureCB(new CallbackWrapper(callEntry.getSendingActor(), new Callback() { // from class: org.nustaq.kontraktor.impl.ElasticScheduler.1
                @Override // org.nustaq.kontraktor.Callback
                public void receive(Object obj, Object obj2) {
                    promise.receive(obj, obj2);
                }
            }));
        }
        Actor targetActor = callEntry.getTargetActor();
        put2QueuePolling(callEntry.isCallback() ? targetActor.__cbQueue : targetActor.__mailbox, false, callEntry, targetActor);
        return promise;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void yield(int i) {
        this.backOffStrategy.yield(i);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void put2QueuePolling(Queue queue, boolean z, Object obj, Object obj2) {
        int i = 0;
        boolean z2 = false;
        while (!queue.offer(obj)) {
            int i2 = i;
            i++;
            yield(i2);
            if (i > RECURSE_ON_BLOCK_THRESHOLD && z && (Thread.currentThread() instanceof DispatcherThread)) {
                DispatcherThread dispatcherThread = (DispatcherThread) Thread.currentThread();
                if (dispatcherThread.stackDepth < 200000 && dispatcherThread.getActorsNoCopy().length > 1) {
                    Actor actorRef = ((Actor) obj2).getActorRef();
                    if (dispatcherThread.schedules(actorRef)) {
                        dispatcherThread.stackDepth++;
                        if (dispatcherThread.pollQs(new Actor[]{actorRef})) {
                            i = 0;
                        }
                        dispatcherThread.stackDepth--;
                    }
                }
            }
            if (this.backOffStrategy.isYielding(i)) {
                Actor actor = Actor.sender.get();
                if ((obj2 instanceof Actor) && ((Actor) obj2).__stopped) {
                    String name = obj instanceof CallEntry ? ((CallEntry) obj).getMethod().getName() : "" + obj;
                    actor.__addDeadLetter((Actor) obj2, name);
                    throw new StoppedActorTargetedException(name);
                }
                if (actor != null && actor.__throwExAtBlock) {
                    throw ActorBlockedException.Instance;
                }
                if (this.backOffStrategy.isSleeping(i)) {
                    if (!z2) {
                        z2 = true;
                        String str = obj2 instanceof Actor ? queue == ((Actor) obj2).__cbQueue ? obj2.getClass().getSimpleName() + " callbackQ" : queue == ((Actor) obj2).__mailbox ? obj2.getClass().getSimpleName() + " mailbox" : obj2.getClass().getSimpleName() + " unknown queue" : "" + obj2;
                        String str2 = actor != null ? ", sender:" + actor.getActor().getClass().getSimpleName() : "";
                        if (DEBUG_SCHEDULING) {
                            Log.Lg.warn(this, "Warning: Thread " + Thread.currentThread().getName() + " blocked trying to put message on " + str + str2 + " msg:" + obj);
                        }
                    }
                    if (actor != null && (Thread.currentThread() instanceof DispatcherThread)) {
                        DispatcherThread dispatcherThread2 = (DispatcherThread) Thread.currentThread();
                        dispatcherThread2.schedulePendingAdds();
                        if (dispatcherThread2.getActors().length > 1) {
                            if (DEBUG_SCHEDULING) {
                                Log.Lg.warn(this, "  try unblock Thread " + Thread.currentThread().getName() + " actors:" + dispatcherThread2.getActors().length);
                            }
                            dispatcherThread2.getScheduler().tryIsolate(dispatcherThread2, actor.getActorRef());
                            if (DEBUG_SCHEDULING) {
                                Log.Lg.warn(this, "  unblock done Thread " + Thread.currentThread().getName() + " actors:" + dispatcherThread2.getActors().length);
                            }
                        } else if (dispatcherThread2.getActors().length > 1) {
                        }
                    }
                }
            }
        }
        if (z2 && DEBUG_SCHEDULING) {
            Log.Lg.warn(this, "Thread " + Thread.currentThread().getName() + " continued");
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public Object enqueueCall(Actor actor, Actor actor2, String str, Object[] objArr, boolean z) {
        Actor actor3 = actor2.getActor();
        Method __getCachedMethod = actor3.__getCachedMethod(str, actor3);
        for (int i = 0; i < objArr.length; i++) {
            Object obj = objArr[i];
            if (obj instanceof Callback) {
                objArr[i] = new CallbackWrapper(actor, (Callback) obj);
            }
        }
        return put2QueuePolling(new CallEntry(actor3, __getCachedMethod, objArr, Actor.sender.get(), actor3, z));
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void threadStopped(DispatcherThread dispatcherThread) {
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; i++) {
                if (this.threads[i] == dispatcherThread) {
                    this.threads[i] = null;
                    return;
                }
            }
            if (dispatcherThread.isIsolated()) {
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "  was decoupled one.");
                }
                this.isolateCount.decrementAndGet();
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public InvocationHandler getInvoker(Actor actor, Object obj) {
        return new CallbackInvokeHandler(obj, actor);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public <T> T inThread(Actor actor, T t) {
        Class<?>[] interfaces = t.getClass().getInterfaces();
        InvocationHandler invoker = actor.__scheduler.getInvoker(actor, t);
        return invoker == null ? t : (T) Proxy.newProxyInstance(t.getClass().getClassLoader(), interfaces, invoker);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void delayedCall(long j, final Runnable runnable) {
        delayedCalls.schedule(new TimerTask() { // from class: org.nustaq.kontraktor.impl.ElasticScheduler.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                runnable.run();
            }
        }, j);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public <T> void runBlockingCall(Actor actor, Callable<T> callable, Callback<T> callback) {
        CallbackWrapper callbackWrapper = new CallbackWrapper(actor, callback);
        this.exec.execute(() -> {
            try {
                callbackWrapper.receive(callable.call(), null);
            } catch (Throwable th) {
                callbackWrapper.receive(null, th);
            }
        });
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public Future<Future[]> yield(Future... futureArr) {
        return Actors.yield(futureArr);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public Future<List<Future>> yield(List<Future> list) {
        return Actors.yield(list);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public DispatcherThread assignDispatcher(int i) {
        synchronized (this.balanceLock) {
            DispatcherThread findMinLoadThread = findMinLoadThread(i, null);
            if (findMinLoadThread != null) {
                return findMinLoadThread;
            }
            DispatcherThread createNewThreadIfPossible = createNewThreadIfPossible();
            if (createNewThreadIfPossible != null) {
                createNewThreadIfPossible.start();
                return createNewThreadIfPossible;
            }
            return findMinLoadThread(Integer.MIN_VALUE, null);
        }
    }

    private DispatcherThread findMinLoadThread(int i, DispatcherThread dispatcherThread) {
        DispatcherThread dispatcherThread2;
        int load;
        synchronized (this.balanceLock) {
            DispatcherThread dispatcherThread3 = null;
            for (int i2 = 0; i2 < this.threads.length; i2++) {
                DispatcherThread dispatcherThread4 = this.threads[i2];
                if (dispatcherThread4 != null && dispatcherThread4 != dispatcherThread && (load = dispatcherThread4.getLoad()) < i) {
                    i = load;
                    dispatcherThread3 = dispatcherThread4;
                }
            }
            dispatcherThread2 = dispatcherThread3;
        }
        return dispatcherThread2;
    }

    private DispatcherThread createNewThreadIfPossible() {
        for (int i = 0; i < this.threads.length; i++) {
            if (this.threads[i] == null) {
                DispatcherThread createDispatcherThread = createDispatcherThread();
                this.threads[i] = createDispatcherThread;
                return createDispatcherThread;
            }
        }
        return null;
    }

    protected DispatcherThread createDispatcherThread() {
        return new DispatcherThread(this);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void rebalance(DispatcherThread dispatcherThread) {
        synchronized (this.balanceLock) {
            DispatcherThread assignDispatcher = assignDispatcher(dispatcherThread.getLoad());
            if (assignDispatcher == null || assignDispatcher == dispatcherThread) {
                return;
            }
            int accumulatedQSizes = dispatcherThread.getAccumulatedQSizes();
            Actor[] actors = dispatcherThread.getActors();
            long accumulatedQSizes2 = assignDispatcher.getAccumulatedQSizes();
            if ((4 * accumulatedQSizes2) / 3 > accumulatedQSizes) {
                if (REALLY_DEBUG_SCHEDULING) {
                    Log.Info(this, "no payoff, skip rebalance load:" + accumulatedQSizes + " other:" + accumulatedQSizes2);
                }
                return;
            }
            for (Actor actor : actors) {
                if (accumulatedQSizes2 + actor.getQSizes() < accumulatedQSizes - actor.getQSizes()) {
                    accumulatedQSizes2 += actor.getQSizes();
                    accumulatedQSizes -= actor.getQSizes();
                    if (REALLY_DEBUG_SCHEDULING) {
                        Log.Info(this, "move " + actor.getQSizes() + " myload " + accumulatedQSizes + " otherload " + accumulatedQSizes2 + " from " + dispatcherThread.getName() + " to " + assignDispatcher.getName());
                    }
                    dispatcherThread.removeActorImmediate(actor);
                    assignDispatcher.addActor(actor);
                }
            }
            if (!assignDispatcher.isAlive()) {
                assignDispatcher.start();
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void tryIsolate(DispatcherThread dispatcherThread, Actor actor) {
        if (dispatcherThread != Thread.currentThread()) {
            throw new RuntimeException("bad error");
        }
        synchronized (this.balanceLock) {
            if (actor == null) {
                throw new IllegalArgumentException("excluderef should not be null");
            }
            Actor[] actors = dispatcherThread.getActors();
            DispatcherThread findMinLoadThread = findMinLoadThread(Integer.MAX_VALUE, dispatcherThread);
            for (int i = 0; i < this.threads.length; i++) {
                if (this.threads[i] == dispatcherThread) {
                    this.threads[i] = createDispatcherThread();
                    dispatcherThread.setName(dispatcherThread.getName() + " (isolated)");
                    dispatcherThread.setIsolated(true);
                    this.isolateCount.incrementAndGet();
                    findMinLoadThread = this.threads[i];
                    findMinLoadThread.start();
                    if (DEBUG_SCHEDULING) {
                        Log.Info(this, "created new thread to unblock " + dispatcherThread.getName());
                    }
                }
            }
            if (findMinLoadThread == null) {
                findMinLoadThread = createDispatcherThread();
                findMinLoadThread.setName(dispatcherThread.getName() + " (isolated)");
                findMinLoadThread.setIsolated(true);
                this.isolateCount.incrementAndGet();
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "created new thread to unblock already isolated " + dispatcherThread.getName());
                }
            }
            for (Actor actor2 : actors) {
                if (actor2.getActorRef() != actor2) {
                    throw new RuntimeException("this should not happen ever");
                }
                if (actor != null && actor.getActorRef() != actor) {
                    throw new RuntimeException("this also");
                }
                if (actor2 != actor) {
                    dispatcherThread.removeActorImmediate(actor2);
                    findMinLoadThread.addActor(actor2);
                }
                if (REALLY_DEBUG_SCHEDULING) {
                    Log.Info(this, "move for unblock " + actor2.getQSizes() + " myload " + dispatcherThread.getAccumulatedQSizes() + " actors " + actors.length);
                }
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void tryStopThread(DispatcherThread dispatcherThread) {
        if (dispatcherThread != Thread.currentThread()) {
            throw new RuntimeException("bad one");
        }
        synchronized (this.balanceLock) {
            DispatcherThread findMinLoadThread = findMinLoadThread(Integer.MAX_VALUE, dispatcherThread);
            if (findMinLoadThread == null) {
                return;
            }
            Actor[] actors = dispatcherThread.getActors();
            for (int i = 0; i < this.threads.length; i++) {
                if (this.threads[i] == dispatcherThread) {
                    this.threads[i] = null;
                }
            }
            int min = Math.min(actors.length, (actors.length / 5) + 1);
            for (int i2 = 0; i2 < min; i2++) {
                Actor actor = actors[i2];
                if (actor.getActorRef() != actor) {
                    throw new RuntimeException("this should not happen ever");
                }
                dispatcherThread.removeActorImmediate(actor);
                findMinLoadThread.addActor(actor);
                if (REALLY_DEBUG_SCHEDULING) {
                    Log.Info(this, "move for idle " + actor.getQSizes() + " myload " + dispatcherThread.getAccumulatedQSizes() + " actors " + actors.length);
                }
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public BackOffStrategy getBackoffStrategy() {
        return this.backOffStrategy;
    }

    @Override // org.nustaq.kontraktor.monitoring.Monitorable
    public Future $getReport() {
        int i = 0;
        for (int i2 = 0; i2 < this.threads.length; i2++) {
            if (this.threads[i2] != null) {
                i++;
            }
        }
        return new Promise(new SchedulingReport(i, this.defQSize, this.isolateCount.get()));
    }

    @Override // org.nustaq.kontraktor.monitoring.Monitorable
    public Future<Monitorable[]> $getSubMonitorables() {
        DispatcherThread[] dispatcherThreadArr = this.threads;
        int i = 0;
        for (DispatcherThread dispatcherThread : dispatcherThreadArr) {
            if (dispatcherThread != null) {
                i++;
            }
        }
        Monitorable[] monitorableArr = new Monitorable[i];
        int i2 = 0;
        for (int i3 = 0; i3 < dispatcherThreadArr.length; i3++) {
            if (dispatcherThreadArr[i3] != null) {
                int i4 = i2;
                i2++;
                monitorableArr[i4] = dispatcherThreadArr[i3];
            }
        }
        return new Promise(monitorableArr);
    }
}
