package xin.bluesky.leiothrix.worker.report;

import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.bluesky.leiothrix.model.msg.WorkerMessage;
import xin.bluesky.leiothrix.model.task.partition.PartitionTaskProgress;
import xin.bluesky.leiothrix.worker.WorkerProcessor;
import xin.bluesky.leiothrix.worker.client.ServerChannel;
import xin.bluesky.leiothrix.worker.conf.Settings;

/* loaded from: input_file:xin/bluesky/leiothrix/worker/report/WorkerProgressReporter.class */
public class WorkerProgressReporter implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(WorkerProgressReporter.class);
    private static ExecutorService reporter = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("worker-status-report").build());
    private static BlockingQueue<PartitionTaskProgress> processingPartitionTaskQueue = new LinkedBlockingDeque();

    public void start() {
        logger.info("启动向server上报worker执行进度的线程");
        reporter.submit(this);
    }

    public void reportProgress(PartitionTaskProgress partitionTaskProgress) {
        processingPartitionTaskQueue.offer(partitionTaskProgress);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (WorkerProcessor.getProcessor().isRunning()) {
            try {
                PartitionTaskProgress take = processingPartitionTaskQueue.take();
                String workerIp = Settings.getWorkerIp();
                ServerChannel.send(new WorkerMessage("workerProgressReport", JSON.toJSONString(take), workerIp));
                logger.debug("worker:{}执行任务片[taskId={},tableName={},rangeName={}],当前执行到{}", new Object[]{workerIp, take.getPartitionTask().getTaskId(), take.getPartitionTask().getTableName(), take.getPartitionTask().getRangeName(), Long.valueOf(take.getEndIndex())});
            } catch (InterruptedException e) {
            }
        }
    }

    public void shutdown() {
        reporter.shutdownNow();
        waitTerminated();
        logger.info("成功关闭向server上报worker执行进度的线程");
    }

    private void waitTerminated() {
        while (!reporter.isTerminated()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.error("向server上报worker执行进度的线程在关闭的时候被中断");
            }
        }
    }
}
