package com.baidu.hugegraph.loader.task;

import com.baidu.hugegraph.loader.builder.Record;
import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.mapping.ElementMapping;
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.metrics.LoadSummary;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/loader/task/TaskManager.class */
public final class TaskManager {
    private static final Logger LOG = Log.logger(TaskManager.class);
    private final LoadContext context;
    private final LoadOptions options;
    private final Semaphore batchSemaphore = new Semaphore(batchSemaphoreNum());
    private final Semaphore singleSemaphore = new Semaphore(singleSemaphoreNum());
    private final ExecutorService batchService;
    private final ExecutorService singleService;

    public TaskManager(LoadContext loadContext) {
        this.context = loadContext;
        this.options = loadContext.options();
        this.batchService = ExecutorUtil.newFixedThreadPool(this.options.batchInsertThreads, Constants.BATCH_WORKER);
        this.singleService = ExecutorUtil.newFixedThreadPool(this.options.singleInsertThreads, Constants.SINGLE_WORKER);
    }

    private int batchSemaphoreNum() {
        return 1 + this.options.batchInsertThreads;
    }

    private int singleSemaphoreNum() {
        return 2 * this.options.singleInsertThreads;
    }

    public void waitFinished() {
        waitFinished("insert tasks");
    }

    public void waitFinished(String str) {
        LOG.info("Waiting for the {} to finish", str);
        try {
            this.batchSemaphore.acquire(batchSemaphoreNum());
        } catch (InterruptedException e) {
            LOG.error("Interrupted while waiting batch-mode tasks");
        } finally {
            this.batchSemaphore.release(batchSemaphoreNum());
        }
        try {
            try {
                this.singleSemaphore.acquire(singleSemaphoreNum());
                this.singleSemaphore.release(singleSemaphoreNum());
            } catch (InterruptedException e2) {
                LOG.error("Interrupted while waiting single-mode tasks");
                this.singleSemaphore.release(singleSemaphoreNum());
            }
            LOG.info("All the {} finished", str);
        } catch (Throwable th) {
            this.singleSemaphore.release(singleSemaphoreNum());
            throw th;
        }
    }

    public void shutdown() {
        long j = this.options.shutdownTimeout;
        try {
            try {
                this.batchService.shutdown();
                this.batchService.awaitTermination(j, TimeUnit.SECONDS);
                LOG.info("The batch-mode tasks service executor shutdown");
                if (!this.batchService.isTerminated()) {
                    LOG.error("The unfinished batch-mode tasks will be cancelled");
                }
                this.batchService.shutdownNow();
            } catch (InterruptedException e) {
                LOG.error("The batch-mode tasks are interrupted");
                if (!this.batchService.isTerminated()) {
                    LOG.error("The unfinished batch-mode tasks will be cancelled");
                }
                this.batchService.shutdownNow();
            }
            try {
                try {
                    this.singleService.shutdown();
                    this.singleService.awaitTermination(j, TimeUnit.SECONDS);
                    LOG.info("The single-mode tasks service executor shutdown");
                    if (!this.singleService.isTerminated()) {
                        LOG.error("The unfinished single-mode tasks will be cancelled");
                    }
                    this.singleService.shutdownNow();
                } catch (InterruptedException e2) {
                    LOG.error("The single-mode tasks are interrupted");
                    if (!this.singleService.isTerminated()) {
                        LOG.error("The unfinished single-mode tasks will be cancelled");
                    }
                    this.singleService.shutdownNow();
                }
            } catch (Throwable th) {
                if (!this.singleService.isTerminated()) {
                    LOG.error("The unfinished single-mode tasks will be cancelled");
                }
                this.singleService.shutdownNow();
                throw th;
            }
        } catch (Throwable th2) {
            if (!this.batchService.isTerminated()) {
                LOG.error("The unfinished batch-mode tasks will be cancelled");
            }
            this.batchService.shutdownNow();
            throw th2;
        }
    }

    public void submitBatch(InputStruct inputStruct, ElementMapping elementMapping, List<Record> list) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.batchSemaphore.acquire();
            LoadSummary summary = this.context.summary();
            summary.metrics(inputStruct).plusFlighting(list.size());
            CompletableFuture.runAsync(new BatchInsertTask(this.context, inputStruct, elementMapping, list), this.batchService).whenComplete((r14, th) -> {
                if (th != null) {
                    LOG.warn("Batch insert {} error, try single insert", elementMapping.type(), th);
                    submitInSingle(inputStruct, elementMapping, list);
                } else {
                    summary.metrics(inputStruct).minusFlighting(list.size());
                }
                this.batchSemaphore.release();
                this.context.summary().addTimeRange(elementMapping.type(), currentTimeMillis, System.currentTimeMillis());
            });
        } catch (InterruptedException e) {
            throw new LoadException("Interrupted while waiting to submit %s batch in batch mode", e, elementMapping.type());
        }
    }

    private void submitInSingle(InputStruct inputStruct, ElementMapping elementMapping, List<Record> list) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.singleSemaphore.acquire();
            LoadSummary summary = this.context.summary();
            CompletableFuture.runAsync(new SingleInsertTask(this.context, inputStruct, elementMapping, list), this.singleService).whenComplete((r14, th) -> {
                summary.metrics(inputStruct).minusFlighting(list.size());
                this.singleSemaphore.release();
                this.context.summary().addTimeRange(elementMapping.type(), currentTimeMillis, System.currentTimeMillis());
            });
        } catch (InterruptedException e) {
            throw new LoadException("Interrupted while waiting to submit %s batch in single mode", e, elementMapping.type());
        }
    }
}
