package net.hycube.eventprocessing;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import net.hycube.core.UnrecoverableRuntimeException;
import net.hycube.environment.Environment;
import net.hycube.logging.LogHelper;
import net.hycube.utils.HashMapUtils;
import org.apache.commons.logging.Log;

/* loaded from: input_file:net/hycube/eventprocessing/EventQueueSchedulerProcessor.class */
public class EventQueueSchedulerProcessor implements EventScheduler, EventQueueProcessor {
    public static int WAKEABLE_SLEEP_TIMEOUT = 1000;
    public static boolean SCHEDULE_AT_FIXED_TIME_INTERVAL = true;
    public static int SCHEDULE_TIME_INTERVAL = 20;
    private static Log userLog = LogHelper.getUserLog();
    private static Log devLog = LogHelper.getDevLog(EventQueueSchedulerProcessor.class);
    protected Environment environment;
    protected EventQueueProcessingInfo[] eventQueuesProcessingInfo;
    protected BlockingQueue<Event>[] queues;
    protected ThreadPoolInfo[] threadPoolInfos;
    protected ExecutorService[] threadPools;
    protected Future<?>[][] futures;
    protected EventQueueProcessorRunnable[] queueProcessorRunnables;
    protected HashMap<BlockingQueue<Event>, EventQueueProcessorRunnable> queueProcessorRunnablesByQueue;
    protected volatile boolean initialized = false;
    protected volatile boolean running = false;
    protected volatile boolean paused = false;
    protected volatile boolean error = false;
    protected Object errorLock = new Object();
    protected EventProcessingErrorCallback errorCallback;
    protected Object errorCallbackArg;

    /* loaded from: input_file:net/hycube/eventprocessing/EventQueueSchedulerProcessor$DummyEvent.class */
    public static class DummyEvent extends Event {
        public DummyEvent() {
            super(0L, EventCategory.undefinedEvent, 0, (ProcessEventProxy) null, (Object[]) null);
        }
    }

    /* loaded from: input_file:net/hycube/eventprocessing/EventQueueSchedulerProcessor$EventQueueProcessorRunnable.class */
    public class EventQueueProcessorRunnable implements Runnable, WakeableManager, EventScheduler {
        protected BlockingQueue<Event> queue;
        protected int queueIndex;
        protected LinkedList<Wakeable> wakeables;
        protected HashMap<Wakeable, Integer> nonWakeablesAfter;
        protected Lock lock;
        protected int availableProcessingResources;
        protected LinkedList<ScheduledEvent> scheduledEvents;
        protected LinkedList<Thread> threadsSleepingSorted;
        protected HashMap<Thread, Long> threadWakeupTimeMap;

        public EventQueueProcessorRunnable(EventQueueSchedulerProcessor eventQueueSchedulerProcessor, BlockingQueue<Event> blockingQueue, int i, int i2) {
            this(blockingQueue, i, null, i2);
        }

        public EventQueueProcessorRunnable(BlockingQueue<Event> blockingQueue, int i, Lock lock, int i2) {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Creating the event queue processor runnable object.");
            }
            this.queue = blockingQueue;
            this.queueIndex = i;
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Creating wakeable manager with " + i2 + " available slots. The lock object is: " + lock);
            }
            this.wakeables = new LinkedList<>();
            this.nonWakeablesAfter = new HashMap<>();
            this.availableProcessingResources = i2;
            this.lock = new ReentrantLock(true);
            this.scheduledEvents = new LinkedList<>();
            this.threadsSleepingSorted = new LinkedList<>();
            this.threadWakeupTimeMap = new HashMap<>();
        }

        public BlockingQueue<Event> getQueue() {
            return this.queue;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Processing the queue, index: " + this.queueIndex);
            }
            try {
                if (EventQueueSchedulerProcessor.this.running) {
                    if (EventQueueSchedulerProcessor.this.error) {
                        if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                            EventQueueSchedulerProcessor.devLog.debug("An error occured in a different processing thread. Not processing any more queue events. Queue index: " + this.queueIndex);
                            return;
                        }
                        return;
                    }
                    while (EventQueueSchedulerProcessor.this.paused) {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                                EventQueueSchedulerProcessor.devLog.debug("Interrupted on Thread.sleep while in paused state, queue index: " + this.queueIndex);
                            }
                        }
                    }
                    if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                        EventQueueSchedulerProcessor.devLog.debug("Taking the event from the event queue, queue index: " + this.queueIndex);
                    }
                    processScheduler();
                    Event event = null;
                    try {
                        event = this.queue.poll(getNextMaxSleepTime(), TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e2) {
                        if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                            EventQueueSchedulerProcessor.devLog.debug("Interrupted on queue.poll(), queue index: " + this.queueIndex);
                        }
                    }
                    removeCurrThreadSleepInfo();
                    if (EventQueueSchedulerProcessor.this.running) {
                        processScheduler();
                        if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                            EventQueueSchedulerProcessor.devLog.debug("Submitting new task (waiting on the queue for new events) to the thread pool, queue index: " + this.queueIndex);
                        }
                        try {
                            EventQueueSchedulerProcessor.this.threadPools[this.queueIndex].submit(this);
                            if (event != null) {
                                if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                                    EventQueueSchedulerProcessor.devLog.debug("Processing the event..., queue index: " + this.queueIndex);
                                }
                                event.process();
                            }
                            removeCurrThreadSleepInfo();
                        } catch (RejectedExecutionException e3) {
                            return;
                        }
                    }
                }
            } catch (Throwable th) {
                if (EventQueueSchedulerProcessor.devLog.isFatalEnabled()) {
                    EventQueueSchedulerProcessor.devLog.fatal("An exception thrown while processing an event. The thread will be terminated.", th);
                }
                if (EventQueueSchedulerProcessor.userLog.isFatalEnabled()) {
                    EventQueueSchedulerProcessor.userLog.fatal("An error occured while processing an event. The thread will be terminated.");
                }
                synchronized (EventQueueSchedulerProcessor.this.errorLock) {
                    if (!EventQueueSchedulerProcessor.this.error) {
                        EventQueueSchedulerProcessor.this.error = true;
                        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                        newSingleThreadExecutor.submit(new Runnable() { // from class: net.hycube.eventprocessing.EventQueueSchedulerProcessor.EventQueueProcessorRunnable.1
                            @Override // java.lang.Runnable
                            public void run() {
                                synchronized (EventQueueSchedulerProcessor.this) {
                                    EventQueueSchedulerProcessor.this.stop();
                                    EventQueueSchedulerProcessor.this.errorCallback.errorOccurred(EventQueueSchedulerProcessor.this.errorCallbackArg);
                                }
                            }
                        });
                        newSingleThreadExecutor.shutdown();
                    }
                }
            }
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Processing the queue finished, index: " + this.queueIndex);
            }
        }

        protected void removeCurrThreadSleepInfo() {
            this.lock.lock();
            Thread currentThread = Thread.currentThread();
            try {
                this.threadsSleepingSorted.remove(currentThread);
                this.threadWakeupTimeMap.remove(currentThread);
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public Lock getWakeableManagerLock() {
            return this.lock;
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public boolean addWakeable(Wakeable wakeable) {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Registering Wakeable object: " + wakeable.toString());
            }
            this.lock.lock();
            try {
                if (this.wakeables.contains(wakeable)) {
                    return false;
                }
                this.wakeables.add(wakeable);
                this.nonWakeablesAfter.put(wakeable, 0);
                return true;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public boolean removeWakeable(Wakeable wakeable) {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Unregistering Wakeable object: " + wakeable.toString());
            }
            this.lock.lock();
            try {
                this.nonWakeablesAfter.remove(wakeable);
                return this.wakeables.remove(wakeable);
            } finally {
                this.lock.unlock();
            }
        }

        protected List<Wakeable> getWakeables() {
            this.lock.lock();
            try {
                return new LinkedList(this.wakeables);
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public void discard() {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Discarding - unregistering all Wakeable objects...");
            }
            this.lock.lock();
            try {
                this.nonWakeablesAfter.clear();
                this.wakeables.clear();
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public void wakeup() {
            if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                EventQueueSchedulerProcessor.devLog.debug("Wakeup called.");
            }
            this.lock.lock();
            try {
                if (!this.wakeables.isEmpty()) {
                    this.nonWakeablesAfter.put(this.wakeables.getLast(), Integer.valueOf(this.nonWakeablesAfter.get(this.wakeables.getLast()).intValue() + 1));
                    int size = this.wakeables.size();
                    int i = 0;
                    Iterator<Integer> it = this.nonWakeablesAfter.values().iterator();
                    while (it.hasNext()) {
                        i += it.next().intValue();
                    }
                    if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                        EventQueueSchedulerProcessor.devLog.debug("numWakeables: " + size + ", numNonWakeables: " + i + ", availableSlots: " + this.availableProcessingResources);
                    }
                    while (size + i > this.availableProcessingResources && size > 0) {
                        Wakeable first = this.wakeables.getFirst();
                        if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                            EventQueueSchedulerProcessor.devLog.debug("Waking the first enqueued Wakeable: " + first.toString());
                        }
                        size--;
                        i -= this.nonWakeablesAfter.get(first).intValue();
                        first.wakeup();
                        this.wakeables.remove(first);
                        this.nonWakeablesAfter.remove(first);
                        if (EventQueueSchedulerProcessor.devLog.isDebugEnabled()) {
                            EventQueueSchedulerProcessor.devLog.debug("numWakeables: " + size + ", numNonWakeables: " + i + ", availableSlots: " + this.availableProcessingResources);
                        }
                    }
                } else if (EventQueueSchedulerProcessor.devLog.isTraceEnabled()) {
                    EventQueueSchedulerProcessor.devLog.debug("No Wakeable to wake up, do nothing.");
                }
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.WakeableManager
        public int getNextMaxSleepTime() {
            long currentTime;
            long j = 0;
            this.lock.lock();
            try {
                purgeWakeupTimes();
                EventQueueSchedulerProcessor.devLog.debug("Calculating next wakeable sleep time. Scheduled events: " + this.scheduledEvents + ", sleeping threads: " + this.threadsSleepingSorted);
                ListIterator<ScheduledEvent> listIterator = this.scheduledEvents.listIterator();
                ListIterator<Thread> listIterator2 = this.threadsSleepingSorted.listIterator();
                int i = 0;
                boolean z = false;
                int i2 = 1;
                while (true) {
                    if (i2 > this.availableProcessingResources) {
                        break;
                    }
                    if (!listIterator.hasNext()) {
                        z = true;
                        break;
                    }
                    ScheduledEvent next = listIterator.next();
                    if (listIterator2.hasNext()) {
                        long longValue = this.threadWakeupTimeMap.get(listIterator2.next()).longValue();
                        while (longValue < next.getExecutionTime()) {
                            i++;
                            if (!listIterator2.hasNext()) {
                                break;
                            }
                            longValue = this.threadWakeupTimeMap.get(listIterator2.next()).longValue();
                        }
                        listIterator2.previous();
                    }
                    if (((this.availableProcessingResources - 1) - this.threadsSleepingSorted.size()) + i < i2) {
                        j = next.getExecutionTime();
                        break;
                    }
                    i2++;
                }
                if (z) {
                    currentTime = EventQueueSchedulerProcessor.WAKEABLE_SLEEP_TIMEOUT;
                } else {
                    currentTime = j - EventQueueSchedulerProcessor.this.environment.getTimeProvider().getCurrentTime();
                }
                if (currentTime < 0) {
                    currentTime = 0;
                }
                if (currentTime > EventQueueSchedulerProcessor.WAKEABLE_SLEEP_TIMEOUT) {
                    currentTime = EventQueueSchedulerProcessor.WAKEABLE_SLEEP_TIMEOUT;
                }
                ListIterator<Thread> listIterator3 = this.threadsSleepingSorted.listIterator();
                int i3 = 0;
                while (listIterator3.hasNext()) {
                    if (j <= this.threadWakeupTimeMap.get(listIterator3.next()).longValue()) {
                        break;
                    }
                    i3++;
                }
                Thread currentThread = Thread.currentThread();
                this.threadWakeupTimeMap.put(currentThread, Long.valueOf(j));
                this.threadsSleepingSorted.add(i3, currentThread);
                this.lock.unlock();
                EventQueueSchedulerProcessor.devLog.debug("Next wakeable sleep time: " + currentTime);
                return (int) currentTime;
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        protected void purgeWakeupTimes() {
            long currentTime = EventQueueSchedulerProcessor.this.environment.getTimeProvider().getCurrentTime();
            if (this.threadsSleepingSorted.isEmpty()) {
                return;
            }
            Thread first = this.threadsSleepingSorted.getFirst();
            while (true) {
                Thread thread = first;
                if (this.threadWakeupTimeMap.get(thread).longValue() > currentTime) {
                    return;
                }
                this.threadsSleepingSorted.removeFirst();
                this.threadWakeupTimeMap.remove(thread);
                if (this.threadsSleepingSorted.isEmpty()) {
                    return;
                } else {
                    first = this.threadsSleepingSorted.getFirst();
                }
            }
        }

        @Override // net.hycube.eventprocessing.EventScheduler
        public void scheduleEvent(ScheduledEvent scheduledEvent) {
            if (EventQueueSchedulerProcessor.SCHEDULE_AT_FIXED_TIME_INTERVAL) {
                long executionTime = scheduledEvent.getExecutionTime();
                if (executionTime % EventQueueSchedulerProcessor.SCHEDULE_TIME_INTERVAL != 0) {
                    executionTime = (executionTime + EventQueueSchedulerProcessor.SCHEDULE_TIME_INTERVAL) - (executionTime % EventQueueSchedulerProcessor.SCHEDULE_TIME_INTERVAL);
                }
                scheduledEvent.setExecutionTime(executionTime);
            }
            this.lock.lock();
            try {
                ListIterator<ScheduledEvent> listIterator = this.scheduledEvents.listIterator();
                int i = 0;
                while (listIterator.hasNext()) {
                    if (scheduledEvent.getExecutionTime() < listIterator.next().getExecutionTime()) {
                        break;
                    } else {
                        i++;
                    }
                }
                this.scheduledEvents.add(i, scheduledEvent);
                if (!this.threadsSleepingSorted.isEmpty() && this.threadWakeupTimeMap.get(this.threadsSleepingSorted.getFirst()).longValue() > scheduledEvent.getExecutionTime()) {
                    try {
                        this.queue.put(new DummyEvent());
                    } catch (InterruptedException e) {
                        throw new UnrecoverableRuntimeException("A put operation on the event queue threw an InterruptedException.", e);
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }

        @Override // net.hycube.eventprocessing.EventScheduler
        public void scheduleEvent(Event event, Queue<Event> queue, long j) {
            scheduleEvent(new ScheduledEvent(event, queue, j));
        }

        @Override // net.hycube.eventprocessing.EventScheduler
        public void scheduleEventWithDelay(Event event, Queue<Event> queue, long j) {
            scheduleEvent(new ScheduledEvent(event, queue, EventQueueSchedulerProcessor.this.environment.getTimeProvider().getCurrentTime() + j));
        }

        protected void processScheduler() {
            this.lock.lock();
            try {
                long currentTime = EventQueueSchedulerProcessor.this.environment.getTimeProvider().getCurrentTime();
                ListIterator<ScheduledEvent> listIterator = this.scheduledEvents.listIterator();
                while (listIterator.hasNext()) {
                    ScheduledEvent next = listIterator.next();
                    if (next.getExecutionTime() > currentTime) {
                        break;
                    }
                    try {
                        EventQueueSchedulerProcessor.devLog.debug("Enqueuing a scheduled event: " + next.getEvent().hashCode() + ", ex. time: " + next.getExecutionTime());
                        this.queue.put(next.getEvent());
                    } catch (InterruptedException e) {
                        if (EventQueueSchedulerProcessor.this.running) {
                            throw new UnrecoverableRuntimeException("An exception was thrown while inserting an event to an event queue.");
                        }
                    }
                    listIterator.remove();
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    public synchronized void initialize(Environment environment, BlockingQueue<Event> blockingQueue) {
        initialize(environment, new BlockingQueue[]{blockingQueue}, new EventQueueProcessingInfo[]{new EventQueueProcessingInfo(new ThreadPoolInfo(1, 60L), null, true)});
    }

    public synchronized void initialize(Environment environment, BlockingQueue<Event> blockingQueue, EventQueueProcessingInfo eventQueueProcessingInfo) {
        initialize(environment, new BlockingQueue[]{blockingQueue}, new EventQueueProcessingInfo[]{eventQueueProcessingInfo});
    }

    public synchronized void initialize(Environment environment, BlockingQueue<Event>[] blockingQueueArr, EventQueueProcessingInfo[] eventQueueProcessingInfoArr) {
        initialize(environment, blockingQueueArr, eventQueueProcessingInfoArr, null, null);
    }

    public synchronized void initialize(Environment environment, BlockingQueue<Event>[] blockingQueueArr, EventQueueProcessingInfo[] eventQueueProcessingInfoArr, EventProcessingErrorCallback eventProcessingErrorCallback, Object obj) {
        EventQueueProcessorRunnable eventQueueProcessorRunnable;
        if (environment == null || environment.getTimeProvider() == null) {
            throw new IllegalArgumentException("Invalid Environment object specified");
        }
        this.environment = environment;
        if (blockingQueueArr == null || eventQueueProcessingInfoArr == null) {
            throw new IllegalArgumentException("The queues and threadPoolInfos must be not null");
        }
        if (blockingQueueArr.length != eventQueueProcessingInfoArr.length) {
            throw new IllegalArgumentException("The number of queues should be equal to the number of thread pool info objects");
        }
        for (int i = 0; i < blockingQueueArr.length; i++) {
            if (blockingQueueArr[i] == null) {
                throw new IllegalArgumentException("queue [" + i + "] is null");
            }
            if (eventQueueProcessingInfoArr[i] == null) {
                throw new IllegalArgumentException("threadPoolInfo[" + i + "] is null");
            }
            if (eventQueueProcessingInfoArr[i].getThreadPoolInfo() == null) {
                throw new IllegalArgumentException("eventQueuesProcessingInfo[" + i + "].threadPoolInfo is null");
            }
            if (eventQueueProcessingInfoArr[i].getThreadPoolInfo().getPoolSize() < 0) {
                throw new IllegalArgumentException("threadPoolInfos[" + i + "].poolSize is less than 0.");
            }
            if (eventQueueProcessingInfoArr[i].getThreadPoolInfo().getKeepAliveTimeSec() < 0) {
                throw new IllegalArgumentException("threadPoolInfos[" + i + "].keepAliveTimeSec is less than 0.");
            }
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Initializing the event queue processor.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Initializing the event queue processor.");
        }
        this.errorCallback = eventProcessingErrorCallback;
        this.errorCallbackArg = obj;
        this.queues = new BlockingQueue[blockingQueueArr.length];
        this.queueProcessorRunnables = new EventQueueProcessorRunnable[blockingQueueArr.length];
        this.eventQueuesProcessingInfo = new EventQueueProcessingInfo[eventQueueProcessingInfoArr.length];
        this.threadPoolInfos = new ThreadPoolInfo[eventQueueProcessingInfoArr.length];
        this.queueProcessorRunnablesByQueue = new HashMap<>(HashMapUtils.getHashMapCapacityForElementsNum(eventQueueProcessingInfoArr.length, 0.75f), 0.75f);
        for (int i2 = 0; i2 < blockingQueueArr.length; i2++) {
            this.queues[i2] = blockingQueueArr[i2];
            this.eventQueuesProcessingInfo[i2] = eventQueueProcessingInfoArr[i2];
            this.threadPoolInfos[i2] = eventQueueProcessingInfoArr[i2].getThreadPoolInfo();
            if (eventQueueProcessingInfoArr[i2].getWakeable()) {
                eventQueueProcessorRunnable = new EventQueueProcessorRunnable(this, blockingQueueArr[i2], i2, this.threadPoolInfos[i2].getPoolSize());
                if (!(blockingQueueArr[i2] instanceof NotifyingQueue)) {
                }
                ((NotifyingQueue) blockingQueueArr[i2]).setInsertNotifyLock(eventQueueProcessorRunnable.getWakeableManagerLock());
                ((NotifyingQueue) blockingQueueArr[i2]).addListener(new WakeableManagerQueueListener(eventQueueProcessorRunnable));
            } else {
                eventQueueProcessorRunnable = new EventQueueProcessorRunnable(this, blockingQueueArr[i2], i2, this.threadPoolInfos[i2].getPoolSize());
            }
            this.queueProcessorRunnables[i2] = eventQueueProcessorRunnable;
            this.queueProcessorRunnablesByQueue.put(this.queues[i2], eventQueueProcessorRunnable);
        }
        this.initialized = true;
    }

    public WakeableManager getWakeableManagerByQueue(LinkedBlockingQueue<Event> linkedBlockingQueue) {
        if (this.initialized) {
            return this.queueProcessorRunnablesByQueue.get(linkedBlockingQueue);
        }
        return null;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isRunning() {
        return this.running;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isInitialized() {
        return this.initialized;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isPaused() {
        return this.paused;
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [java.util.concurrent.Future[], java.util.concurrent.Future<?>[][]] */
    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public void start() {
        if (!this.initialized) {
            throw new EventQueueProcessorRuntimeException("The event queue processor has not been initialized.");
        }
        this.running = true;
        this.paused = false;
        if (devLog.isInfoEnabled()) {
            devLog.info("Starting processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Starting processing the event queues.");
        }
        this.threadPools = new ExecutorService[this.queues.length];
        this.futures = new Future[this.queues.length];
        for (int i = 0; i < this.queues.length; i++) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.threadPoolInfos[i].getPoolSize(), this.threadPoolInfos[i].getPoolSize(), this.threadPoolInfos[i].getKeepAliveTimeSec(), TimeUnit.SECONDS, new LinkedBlockingQueue());
            if (this.threadPoolInfos[i].getKeepAliveTimeSec() > 0) {
                threadPoolExecutor.allowCoreThreadTimeOut(true);
            }
            this.threadPools[i] = threadPoolExecutor;
            this.threadPools[i].submit(this.queueProcessorRunnables[i]);
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Started processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Started processing the event queues.");
        }
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public void stop() {
        if (this.running) {
            if (devLog.isInfoEnabled()) {
                devLog.info("Stopping processing the event queues.");
            }
            if (userLog.isInfoEnabled()) {
                userLog.info("Stopping processing the event queues.");
            }
            this.running = false;
            for (int i = 0; i < this.threadPools.length; i++) {
                this.threadPools[i].shutdownNow();
            }
            for (int i2 = 0; i2 < this.threadPools.length; i2++) {
                while (!this.threadPools[i2].isTerminated()) {
                    try {
                        this.threadPools[i2].awaitTermination(100L, TimeUnit.MILLISECONDS);
                        this.threadPools[i2].shutdownNow();
                    } catch (InterruptedException e) {
                    }
                }
            }
            for (int i3 = 0; i3 < this.threadPools.length; i3++) {
                if (this.eventQueuesProcessingInfo[i3].getWakeable()) {
                    ((NotifyingQueue) this.queues[i3]).discard();
                }
                this.queueProcessorRunnables[i3].discard();
            }
            this.threadPools = null;
            this.paused = false;
            if (devLog.isInfoEnabled()) {
                devLog.info("Stopped processing the event queues.");
            }
            if (userLog.isInfoEnabled()) {
                userLog.info("Stopped processing the event queues.");
            }
        }
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public void pause() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Pausing processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Pausing processing the event queues.");
        }
        this.paused = true;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public void resume() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Resuming processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Resuming processing the event queues.");
        }
        this.paused = false;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public void clear() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Clearing the event queue processor.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Clearing the event queue processor.");
        }
        if (this.running) {
            stop();
        }
        this.queueProcessorRunnables = null;
        this.queues = null;
        this.eventQueuesProcessingInfo = null;
        this.threadPoolInfos = null;
        this.error = false;
        this.errorCallback = null;
        this.errorCallbackArg = null;
        this.initialized = false;
    }

    @Override // net.hycube.eventprocessing.EventScheduler
    public void scheduleEvent(ScheduledEvent scheduledEvent) {
        if (scheduledEvent.getEvent() == null) {
            throw new IllegalArgumentException("The event of the scheduledEvent is not set");
        }
        if (scheduledEvent.getQueue() == null) {
            throw new IllegalArgumentException("The queue is not specified.");
        }
        schedule(scheduledEvent.getEvent(), scheduledEvent.getQueue(), scheduledEvent.getExecutionTime());
    }

    @Override // net.hycube.eventprocessing.EventScheduler
    public void scheduleEvent(Event event, Queue<Event> queue, long j) {
        if (event == null) {
            throw new IllegalArgumentException("The event object is not set");
        }
        if (queue == null) {
            throw new IllegalArgumentException("The queue is not specified.");
        }
        schedule(event, queue, j);
    }

    @Override // net.hycube.eventprocessing.EventScheduler
    public void scheduleEventWithDelay(Event event, Queue<Event> queue, long j) {
        if (event == null) {
            throw new IllegalArgumentException("The event object is not set");
        }
        if (j < 0) {
            throw new IllegalArgumentException("Delay must not be negative");
        }
        long currentTime = this.environment.getTimeProvider().getCurrentTime() + j;
        if (queue == null) {
            throw new IllegalArgumentException("The queue is not specified.");
        }
        schedule(event, queue, currentTime);
    }

    protected void schedule(Event event, Queue<Event> queue, long j) {
        EventQueueProcessorRunnable eventQueueProcessorRunnable = this.queueProcessorRunnablesByQueue.get(queue);
        if (eventQueueProcessorRunnable == null) {
            throw new IllegalArgumentException("Invalid scheduler queue specified");
        }
        eventQueueProcessorRunnable.scheduleEvent(event, queue, j);
    }
}
