package org.nustaq.kontraktor.impl;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.Scheduler;
import org.nustaq.kontraktor.util.Log;

/* loaded from: input_file:org/nustaq/kontraktor/impl/ElasticScheduler.class */
public class ElasticScheduler implements Scheduler {
    int maxThread;
    protected BackOffStrategy backOffStrategy;
    volatile DispatcherThread[] threads;
    int defQSize;
    protected ExecutorService exec;
    Object balanceLock;
    public static int DEFQSIZE = 16384;
    public static boolean DEBUG_SCHEDULING = true;
    public static int BLOCK_COUNT_WARNING_THRESHOLD = 10000;
    public static int RECURSE_ON_BLOCK_THRESHOLD = 1023;
    protected static Timer delayedCalls = new Timer();

    /* loaded from: input_file:org/nustaq/kontraktor/impl/ElasticScheduler$CallbackInvokeHandler.class */
    class CallbackInvokeHandler implements InvocationHandler {
        final Object target;
        final Actor targetActor;

        public CallbackInvokeHandler(Object obj, Actor actor) {
            this.target = obj;
            this.targetActor = actor;
        }

        @Override // java.lang.reflect.InvocationHandler
        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(obj, objArr);
            }
            if (this.target == null) {
                return null;
            }
            ElasticScheduler.this.put2QueuePolling(this.targetActor.__cbQueue, new CallEntry(this.target, method, objArr, Actor.sender.get(), this.targetActor), this.targetActor);
            return null;
        }
    }

    public ElasticScheduler(int i) {
        this(i, DEFQSIZE);
    }

    public ElasticScheduler(int i, int i2) {
        this.maxThread = Runtime.getRuntime().availableProcessors();
        this.backOffStrategy = new BackOffStrategy();
        this.defQSize = DEFQSIZE;
        this.exec = Executors.newCachedThreadPool();
        this.balanceLock = new Object();
        this.maxThread = i;
        this.defQSize = i2;
        if (i2 <= 1) {
            this.defQSize = DEFQSIZE;
        }
        this.threads = new DispatcherThread[i];
    }

    public int getActiveThreads() {
        int i = 0;
        for (int i2 = 0; i2 < this.threads.length; i2++) {
            if (this.threads[i2] != null) {
                i++;
            }
        }
        return i;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public int getMaxThreads() {
        return this.maxThread;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public int getDefaultQSize() {
        return this.defQSize;
    }

    public Future put2QueuePolling(CallEntry callEntry) {
        final Promise promise;
        if (callEntry.hasFutureResult()) {
            promise = new Promise();
            callEntry.setFutureCB(new CallbackWrapper(callEntry.getSendingActor(), new Callback() { // from class: org.nustaq.kontraktor.impl.ElasticScheduler.1
                @Override // org.nustaq.kontraktor.Callback
                public void receive(Object obj, Object obj2) {
                    promise.receive(obj, obj2);
                }
            }));
        } else {
            promise = null;
        }
        Actor targetActor = callEntry.getTargetActor();
        put2QueuePolling(targetActor.__mailbox, callEntry, targetActor);
        return promise;
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void yield(int i) {
        this.backOffStrategy.yield(i);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void put2QueuePolling(Queue queue, Object obj, Object obj2) {
        int i = 0;
        boolean z = false;
        while (!queue.offer(obj)) {
            int i2 = i;
            i++;
            yield(i2);
            if (i <= RECURSE_ON_BLOCK_THRESHOLD || (Thread.currentThread() instanceof DispatcherThread)) {
            }
            if (i > BLOCK_COUNT_WARNING_THRESHOLD && !z) {
                z = true;
                String str = obj2 instanceof Actor ? queue == ((Actor) obj2).__cbQueue ? obj2.getClass().getSimpleName() + " callbackQ" : queue == ((Actor) obj2).__mailbox ? obj2.getClass().getSimpleName() + " mailbox" : obj2.getClass().getSimpleName() + " unknown queue" : "" + obj2;
                Actor actor = Actor.sender.get();
                Log.Lg.warn(this, "Warning: Thread " + Thread.currentThread().getName() + " blocked trying to put message on " + str + (actor != null ? ", sender:" + actor.getActor().getClass().getSimpleName() : ""));
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public Object enqueueCall(Actor actor, Actor actor2, String str, Object[] objArr) {
        Actor actor3 = actor2.getActor();
        Method __getCachedMethod = actor3.__getCachedMethod(str, actor3);
        for (int i = 0; i < objArr.length; i++) {
            Object obj = objArr[i];
            if (obj instanceof Callback) {
                objArr[i] = new CallbackWrapper(actor, (Callback) obj);
            }
        }
        return put2QueuePolling(new CallEntry(actor3, __getCachedMethod, objArr, Actor.sender.get(), actor3));
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void threadStopped(DispatcherThread dispatcherThread) {
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; i++) {
                if (this.threads[i] == dispatcherThread) {
                    this.threads[i] = null;
                    return;
                }
            }
            throw new RuntimeException("Oops. Unknown Thread");
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public InvocationHandler getInvoker(Actor actor, Object obj) {
        return new CallbackInvokeHandler(obj, actor);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public <T> T inThread(Actor actor, T t) {
        Class<?>[] interfaces = t.getClass().getInterfaces();
        InvocationHandler invoker = actor.__scheduler.getInvoker(actor, t);
        return invoker == null ? t : (T) Proxy.newProxyInstance(t.getClass().getClassLoader(), interfaces, invoker);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void delayedCall(long j, final Runnable runnable) {
        delayedCalls.schedule(new TimerTask() { // from class: org.nustaq.kontraktor.impl.ElasticScheduler.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                runnable.run();
            }
        }, j);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public <T> void runBlockingCall(Actor actor, Callable<T> callable, Callback<T> callback) {
        CallbackWrapper callbackWrapper = new CallbackWrapper(actor, callback);
        this.exec.execute(() -> {
            try {
                callbackWrapper.receive(callable.call(), null);
            } catch (Throwable th) {
                callbackWrapper.receive(null, th);
            }
        });
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public Future<Future[]> yield(Future... futureArr) {
        return Actors.yield(futureArr);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public <T> Future<List<Future<T>>> yield(List<Future<T>> list) {
        return Actors.yield(list);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public DispatcherThread assignDispatcher() {
        synchronized (this.threads) {
            DispatcherThread findMinLoadThread = findMinLoadThread(Integer.MAX_VALUE, null);
            if (findMinLoadThread != null) {
                return findMinLoadThread;
            }
            DispatcherThread createNewThreadIfPossible = createNewThreadIfPossible();
            if (createNewThreadIfPossible == null) {
                throw new RuntimeException("could not assign thread. This is a severe error");
            }
            createNewThreadIfPossible.start();
            return createNewThreadIfPossible;
        }
    }

    private DispatcherThread findMinLoadThread(long j, DispatcherThread dispatcherThread) {
        DispatcherThread dispatcherThread2;
        synchronized (this.threads) {
            DispatcherThread dispatcherThread3 = null;
            for (int i = 0; i < this.threads.length; i++) {
                DispatcherThread dispatcherThread4 = this.threads[i];
                if (dispatcherThread4 != null && dispatcherThread4 != dispatcherThread) {
                    long loadNanos = dispatcherThread4.getLoadNanos();
                    if (loadNanos < j) {
                        j = loadNanos;
                        dispatcherThread3 = dispatcherThread4;
                    }
                }
            }
            dispatcherThread2 = dispatcherThread3;
        }
        return dispatcherThread2;
    }

    private DispatcherThread createNewThreadIfPossible() {
        synchronized (this.threads) {
            for (int i = 0; i < this.threads.length; i++) {
                if (this.threads[i] == null) {
                    DispatcherThread createDispatcherThread = createDispatcherThread();
                    this.threads[i] = createDispatcherThread;
                    return createDispatcherThread;
                }
            }
            return null;
        }
    }

    protected DispatcherThread createDispatcherThread() {
        return new DispatcherThread(this);
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void rebalance(DispatcherThread dispatcherThread) {
        synchronized (this.balanceLock) {
            long loadNanos = dispatcherThread.getLoadNanos();
            DispatcherThread createNewThreadIfPossible = createNewThreadIfPossible();
            if (createNewThreadIfPossible != null) {
                dispatcherThread.splitTo(createNewThreadIfPossible);
                createNewThreadIfPossible.start();
                return;
            }
            DispatcherThread findMinLoadThread = findMinLoadThread(loadNanos, dispatcherThread);
            if (findMinLoadThread == null) {
                return;
            }
            Actor[] actors = dispatcherThread.getActors();
            long loadNanos2 = findMinLoadThread.getLoadNanos();
            for (Actor actor : actors) {
                if (loadNanos2 + actor.__nanos < loadNanos - actor.__nanos) {
                    loadNanos2 += actor.__nanos;
                    loadNanos -= actor.__nanos;
                    if (DEBUG_SCHEDULING) {
                        Log.Info(this, "move " + actor.__nanos + " myload " + loadNanos + " otherlOad " + loadNanos2);
                    }
                    dispatcherThread.removeActorImmediate(actor);
                    findMinLoadThread.addActor(actor);
                }
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public void tryStopThread(DispatcherThread dispatcherThread) {
        synchronized (this.balanceLock) {
            Actor[] actors = dispatcherThread.getActors();
            DispatcherThread findMinLoadThread = findMinLoadThread(Long.MAX_VALUE, dispatcherThread);
            if (findMinLoadThread == null) {
                return;
            }
            for (Actor actor : actors) {
                dispatcherThread.removeActorImmediate(actor);
                findMinLoadThread.addActor(actor);
                if (DEBUG_SCHEDULING) {
                    Log.Info(this, "move for idle " + actor.__nanos + " myload " + dispatcherThread.getLoadNanos() + " actors " + actors.length);
                }
            }
        }
    }

    @Override // org.nustaq.kontraktor.Scheduler
    public BackOffStrategy getBackoffStrategy() {
        return this.backOffStrategy;
    }
}
