package net.hycube.eventprocessing;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.hycube.logging.LogHelper;
import org.apache.commons.logging.Log;

/* loaded from: input_file:net/hycube/eventprocessing/ThreadPoolEventQueueProcessor.class */
public class ThreadPoolEventQueueProcessor implements EventQueueProcessor {
    private static Log userLog = LogHelper.getUserLog();
    private static Log devLog = LogHelper.getDevLog(ThreadPoolEventQueueProcessor.class);
    protected BlockingQueue<Event>[] queues;
    protected ThreadPoolInfo[] threadPoolInfos;
    protected ExecutorService[] threadPools;
    protected Future<?>[][] futures;
    protected EventQueueProcessorRunnable[] queueProcessorRunnables;
    protected volatile boolean initialized = false;
    protected volatile boolean running = false;
    protected volatile boolean paused = false;
    protected volatile boolean error = false;
    protected Object errorLock = new Object();
    protected EventProcessingErrorCallback errorCallback;
    protected Object errorCallbackArg;

    /* loaded from: input_file:net/hycube/eventprocessing/ThreadPoolEventQueueProcessor$EventQueueProcessorRunnable.class */
    public class EventQueueProcessorRunnable implements Runnable {
        protected BlockingQueue<Event> queue;
        protected int queueIndex;

        public EventQueueProcessorRunnable(BlockingQueue<Event> blockingQueue, int i) {
            this.queue = blockingQueue;
            this.queueIndex = i;
        }

        public BlockingQueue<Event> getQueue() {
            return this.queue;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                ThreadPoolEventQueueProcessor.devLog.debug("Processing the queue, index: " + this.queueIndex);
            }
            try {
                if (ThreadPoolEventQueueProcessor.this.running) {
                    if (ThreadPoolEventQueueProcessor.this.error) {
                        if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                            ThreadPoolEventQueueProcessor.devLog.debug("An error occured in a different processing thread. Not processing any more queue events. Queue index: " + this.queueIndex);
                            return;
                        }
                        return;
                    }
                    while (ThreadPoolEventQueueProcessor.this.paused) {
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                                ThreadPoolEventQueueProcessor.devLog.debug("Interrupted on Thread.sleep while in paused state, queue index: " + this.queueIndex);
                            }
                        }
                    }
                    Event event = null;
                    try {
                        if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                            ThreadPoolEventQueueProcessor.devLog.debug("Taking the event from the event queue, queue index: " + this.queueIndex);
                        }
                        event = this.queue.take();
                    } catch (InterruptedException e2) {
                        if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                            ThreadPoolEventQueueProcessor.devLog.debug("Interrupted on queue.take(), queue index: " + this.queueIndex);
                        }
                    }
                    if (ThreadPoolEventQueueProcessor.this.running) {
                        if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                            ThreadPoolEventQueueProcessor.devLog.debug("Submitting new task (waiting on the queue for new events) to the thread pool, queue index: " + this.queueIndex);
                        }
                        try {
                            ThreadPoolEventQueueProcessor.this.threadPools[this.queueIndex].submit(this);
                            if (event != null) {
                                if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                                    ThreadPoolEventQueueProcessor.devLog.debug("Processing the event..., queue index: " + this.queueIndex);
                                }
                                event.process();
                            }
                        } catch (RejectedExecutionException e3) {
                            return;
                        }
                    }
                }
            } catch (Throwable th) {
                if (ThreadPoolEventQueueProcessor.devLog.isFatalEnabled()) {
                    ThreadPoolEventQueueProcessor.devLog.fatal("An exception thrown while processing an event. The thread will be terminated.", th);
                }
                if (ThreadPoolEventQueueProcessor.userLog.isFatalEnabled()) {
                    ThreadPoolEventQueueProcessor.userLog.fatal("An error occured while processing an event. The thread will be terminated.");
                }
                synchronized (ThreadPoolEventQueueProcessor.this.errorLock) {
                    if (!ThreadPoolEventQueueProcessor.this.error) {
                        ThreadPoolEventQueueProcessor.this.error = true;
                        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                        newSingleThreadExecutor.submit(new Runnable() { // from class: net.hycube.eventprocessing.ThreadPoolEventQueueProcessor.EventQueueProcessorRunnable.1
                            @Override // java.lang.Runnable
                            public void run() {
                                synchronized (ThreadPoolEventQueueProcessor.this) {
                                    ThreadPoolEventQueueProcessor.this.stop();
                                    ThreadPoolEventQueueProcessor.this.errorCallback.errorOccurred(ThreadPoolEventQueueProcessor.this.errorCallbackArg);
                                }
                            }
                        });
                        newSingleThreadExecutor.shutdown();
                    }
                }
            }
            if (ThreadPoolEventQueueProcessor.devLog.isDebugEnabled()) {
                ThreadPoolEventQueueProcessor.devLog.debug("Processing the queue finished, index: " + this.queueIndex);
            }
        }
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isRunning() {
        return this.running;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isInitialized() {
        return this.initialized;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized boolean isPaused() {
        return this.paused;
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [java.util.concurrent.Future[], java.util.concurrent.Future<?>[][]] */
    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized void start() {
        if (!this.initialized) {
            throw new EventQueueProcessorRuntimeException("The event queue processor has not been initialized.");
        }
        this.running = true;
        this.paused = false;
        if (devLog.isInfoEnabled()) {
            devLog.info("Starting processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Starting processing the event queues.");
        }
        this.threadPools = new ExecutorService[this.queues.length];
        this.futures = new Future[this.queues.length];
        for (int i = 0; i < this.queues.length; i++) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.threadPoolInfos[i].getPoolSize(), this.threadPoolInfos[i].getPoolSize(), this.threadPoolInfos[i].getKeepAliveTimeSec(), TimeUnit.SECONDS, new LinkedBlockingQueue());
            if (this.threadPoolInfos[i].getKeepAliveTimeSec() > 0) {
                threadPoolExecutor.allowCoreThreadTimeOut(true);
            }
            this.threadPools[i] = threadPoolExecutor;
            this.threadPools[i].submit(this.queueProcessorRunnables[i]);
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Started processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Started processing the event queues.");
        }
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized void stop() {
        if (this.running) {
            if (devLog.isInfoEnabled()) {
                devLog.info("Stopping processing the event queues.");
            }
            if (userLog.isInfoEnabled()) {
                userLog.info("Stopping processing the event queues.");
            }
            this.running = false;
            for (int i = 0; i < this.threadPools.length; i++) {
                this.threadPools[i].shutdownNow();
            }
            for (int i2 = 0; i2 < this.threadPools.length; i2++) {
                while (!this.threadPools[i2].isTerminated()) {
                    try {
                        this.threadPools[i2].awaitTermination(1000L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                }
            }
            this.threadPools = null;
            this.paused = false;
            if (devLog.isInfoEnabled()) {
                devLog.info("Stopped processing the event queues.");
            }
            if (userLog.isInfoEnabled()) {
                userLog.info("Stopped processing the event queues.");
            }
        }
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized void pause() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Pausing processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Pausing processing the event queues.");
        }
        this.paused = true;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized void resume() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Resuming processing the event queues.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Resuming processing the event queues.");
        }
        this.paused = false;
    }

    @Override // net.hycube.eventprocessing.EventQueueProcessor
    public synchronized void clear() {
        if (devLog.isInfoEnabled()) {
            devLog.info("Clearing the event queue processor.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Clearing the event queue processor.");
        }
        if (this.running) {
            stop();
        }
        this.queueProcessorRunnables = null;
        this.queues = null;
        this.threadPoolInfos = null;
        this.error = false;
        this.errorCallback = null;
        this.errorCallbackArg = null;
        this.initialized = false;
    }

    public synchronized void initialize(BlockingQueue<Event> blockingQueue) {
        initialize(new BlockingQueue[]{blockingQueue}, new ThreadPoolInfo[]{new ThreadPoolInfo(1, 60L)});
    }

    public synchronized void initialize(BlockingQueue<Event> blockingQueue, ThreadPoolInfo threadPoolInfo) {
        initialize(new BlockingQueue[]{blockingQueue}, new ThreadPoolInfo[]{threadPoolInfo});
    }

    public synchronized void initialize(BlockingQueue<Event>[] blockingQueueArr, ThreadPoolInfo[] threadPoolInfoArr) {
        initialize(blockingQueueArr, threadPoolInfoArr, null, null);
    }

    public synchronized void initialize(BlockingQueue<Event>[] blockingQueueArr, ThreadPoolInfo[] threadPoolInfoArr, EventProcessingErrorCallback eventProcessingErrorCallback, Object obj) {
        if (blockingQueueArr == null || threadPoolInfoArr == null) {
            throw new IllegalArgumentException("The queues and threadPoolInfos must be not null");
        }
        if (blockingQueueArr.length != threadPoolInfoArr.length) {
            throw new IllegalArgumentException("The number of queues should be equal to the number of thread pool info objects");
        }
        for (int i = 0; i < blockingQueueArr.length; i++) {
            if (blockingQueueArr[i] == null) {
                throw new IllegalArgumentException("queue [" + i + "] is null");
            }
            if (threadPoolInfoArr == null) {
                throw new IllegalArgumentException("threadPoolInfos[" + i + "] is null");
            }
            if (threadPoolInfoArr[i].getPoolSize() < 0) {
                throw new IllegalArgumentException("threadPoolInfos[" + i + "].poolSize is less than 0.");
            }
            if (threadPoolInfoArr[i].getKeepAliveTimeSec() < 0) {
                throw new IllegalArgumentException("threadPoolInfos[" + i + "].keepAliveTimeSec is less than 0.");
            }
        }
        if (devLog.isInfoEnabled()) {
            devLog.info("Initializing the event queue processor.");
        }
        if (userLog.isInfoEnabled()) {
            userLog.info("Initializing the event queue processor.");
        }
        this.errorCallback = eventProcessingErrorCallback;
        this.errorCallbackArg = obj;
        this.queues = new BlockingQueue[blockingQueueArr.length];
        this.queueProcessorRunnables = new EventQueueProcessorRunnable[blockingQueueArr.length];
        this.threadPoolInfos = new ThreadPoolInfo[threadPoolInfoArr.length];
        for (int i2 = 0; i2 < blockingQueueArr.length; i2++) {
            this.queues[i2] = blockingQueueArr[i2];
            this.threadPoolInfos[i2] = threadPoolInfoArr[i2];
            this.queueProcessorRunnables[i2] = new EventQueueProcessorRunnable(blockingQueueArr[i2], i2);
        }
        this.initialized = true;
    }
}
