package cn.mzhong.janytask.executor;

import cn.mzhong.janytask.core.TaskContext;
import cn.mzhong.janytask.queue.Message;
import cn.mzhong.janytask.queue.MessageDao;
import cn.mzhong.janytask.queue.QueueInfo;
import cn.mzhong.janytask.util.ThreadUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/mzhong/janytask/executor/TaskExecutor.class */
public abstract class TaskExecutor<A extends Annotation> implements Runnable {
    static final Logger Log = LoggerFactory.getLogger(TaskExecutor.class);
    protected String ID;
    protected QueueInfo<A> queueInfo;
    protected TaskContext context;
    protected MessageDao messageDao;
    protected Method method;
    protected Object consumer;
    protected long idleInterval;
    protected long sleepInterval;
    protected long cnt = 0;

    public TaskExecutor(TaskContext taskContext, QueueInfo<A> queueInfo) {
        this.context = taskContext;
        this.queueInfo = queueInfo;
        this.messageDao = queueInfo.getMessageDao();
        this.ID = this.messageDao.ID();
        this.method = queueInfo.getConsumerMethod();
        this.consumer = queueInfo.getConsumer();
        this.idleInterval = taskContext.getQueueConfig().getIdleInterval();
        this.sleepInterval = taskContext.getQueueConfig().getSleepInterval();
    }

    protected abstract void invoke(Message message);

    protected void invoke() {
        long j;
        Message poll;
        long length = this.messageDao.length();
        long j2 = 0;
        if (Log.isDebugEnabled()) {
            Log.debug("'{}'：第{}轮消息处理开始, 本次目标长度:{}", new Object[]{this.ID, Long.valueOf(this.cnt), Long.valueOf(length)});
            j2 = System.currentTimeMillis();
        }
        long j3 = 0;
        while (true) {
            j = j3;
            if (j >= length || this.context.isShutdown() || (poll = this.messageDao.poll()) == null) {
                break;
            }
            invoke(poll);
            j3 = j + 1;
        }
        int currentTimeMillis = (int) (System.currentTimeMillis() - j2);
        if (Log.isDebugEnabled()) {
            int i = 0;
            float f = currentTimeMillis + 1;
            int round = Math.round(f / 1000.0f);
            if (j > 0) {
                i = (int) (f / ((float) j));
            }
            Log.debug("'{}'：第{}轮消息处理完毕，数量:{}，总耗时:{}秒，单条耗时:{}毫秒", new Object[]{this.ID, Long.valueOf(this.cnt), Long.valueOf(j), Integer.valueOf(round), Integer.valueOf(i)});
        }
        if (currentTimeMillis < this.idleInterval) {
            ThreadUtils.sleep(this.idleInterval - currentTimeMillis);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            this.cnt++;
            if (this.context.isShutdown()) {
                return;
            }
            try {
                invoke();
            } catch (Exception e) {
                Log.error(e.getLocalizedMessage(), e);
                ThreadUtils.sleep(15000L);
            }
        }
    }
}
