package com.baidu.hugegraph.task;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.ConditionQuery;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.tx.GraphTransaction;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.iterator.MapperIterator;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.Cardinality;
import com.baidu.hugegraph.type.define.DataType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;

/* loaded from: input_file:com/baidu/hugegraph/task/TaskScheduler.class */
public class TaskScheduler {
    private final HugeGraph graph;
    private final ExecutorService taskExecutor;
    private final ExecutorService dbExecutor;
    private final EventListener eventListener;
    private final Map<Id, HugeTask<?>> tasks;
    private volatile TaskTransaction taskTx;
    private static final long NO_LIMIT = -1;
    private static final long QUERY_INTERVAL = 100;
    private static final int MAX_PENDING_TASKS = 10000;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/hugegraph/task/TaskScheduler$TaskTransaction.class */
    public static class TaskTransaction extends GraphTransaction {
        public static final String TASK;
        static final /* synthetic */ boolean $assertionsDisabled;

        public TaskTransaction(HugeGraph hugeGraph, BackendStore backendStore) {
            super(hugeGraph, backendStore);
            autoCommit(true);
        }

        public HugeVertex constructVertex(HugeTask<?> hugeTask) {
            if (graph().schemaTransaction().getVertexLabel(TASK) == null) {
                throw new HugeException("Schema is missing for task(%s) '%s'", hugeTask.id(), hugeTask.name());
            }
            return constructVertex(false, hugeTask.asArray());
        }

        public boolean indexValueChanged(Vertex vertex, HugeVertex hugeVertex) {
            return !vertex.value(HugeTask.P.STATUS).equals(hugeVertex.value(HugeTask.P.STATUS));
        }

        private void deleteIndex(HugeVertex hugeVertex) {
            Iterator<Vertex> queryVertices = queryVertices(hugeVertex.mo106id());
            if (queryVertices.hasNext()) {
                HugeVertex hugeVertex2 = (HugeVertex) queryVertices.next();
                if (!$assertionsDisabled && queryVertices.hasNext()) {
                    throw new AssertionError();
                }
                if (indexValueChanged(hugeVertex2, hugeVertex)) {
                    removeVertex(hugeVertex2);
                }
            }
        }

        protected void initSchema() {
            HugeGraph graph = graph();
            if (graph.schemaTransaction().getVertexLabel(TASK) != null) {
                return;
            }
            VertexLabel build = graph.schema().vertexLabel(TASK).properties(initProperties()).useCustomizeNumberId().nullableKeys(HugeTask.P.DESCRIPTION, HugeTask.P.UPDATE, HugeTask.P.INPUT, HugeTask.P.RESULT, HugeTask.P.DEPENDENCIES).enableLabelIndex(true).build();
            graph.schemaTransaction().addVertexLabel(build);
            createIndex(build, HugeTask.P.STATUS);
        }

        private String[] initProperties() {
            ArrayList arrayList = new ArrayList();
            arrayList.add(createPropertyKey(HugeTask.P.TYPE));
            arrayList.add(createPropertyKey(HugeTask.P.NAME));
            arrayList.add(createPropertyKey(HugeTask.P.CALLABLE));
            arrayList.add(createPropertyKey(HugeTask.P.DESCRIPTION));
            arrayList.add(createPropertyKey(HugeTask.P.STATUS, DataType.BYTE));
            arrayList.add(createPropertyKey(HugeTask.P.PROGRESS, DataType.INT));
            arrayList.add(createPropertyKey(HugeTask.P.CREATE, DataType.DATE));
            arrayList.add(createPropertyKey(HugeTask.P.UPDATE, DataType.DATE));
            arrayList.add(createPropertyKey(HugeTask.P.RETRIES, DataType.INT));
            arrayList.add(createPropertyKey(HugeTask.P.INPUT));
            arrayList.add(createPropertyKey(HugeTask.P.RESULT));
            arrayList.add(createPropertyKey(HugeTask.P.DEPENDENCIES, DataType.LONG, Cardinality.SET));
            return (String[]) arrayList.toArray(new String[0]);
        }

        private String createPropertyKey(String str) {
            return createPropertyKey(str, DataType.TEXT);
        }

        private String createPropertyKey(String str, DataType dataType) {
            return createPropertyKey(str, dataType, Cardinality.SINGLE);
        }

        private String createPropertyKey(String str, DataType dataType, Cardinality cardinality) {
            HugeGraph graph = graph();
            graph.schemaTransaction().addPropertyKey(graph.schema().propertyKey(str).dataType(dataType).cardinality(cardinality).build());
            return str;
        }

        private IndexLabel createIndex(VertexLabel vertexLabel, String str) {
            HugeGraph graph = graph();
            IndexLabel build = graph.schema().indexLabel(Graph.Hidden.hide("task-index-by-" + str)).on(HugeType.VERTEX_LABEL, TASK).by(str).build();
            graph.schemaTransaction().addIndexLabel(vertexLabel, build);
            return build;
        }

        static {
            $assertionsDisabled = !TaskScheduler.class.desiredAssertionStatus();
            TASK = HugeTask.P.TASK;
        }
    }

    public TaskScheduler(HugeGraph hugeGraph, ExecutorService executorService, ExecutorService executorService2) {
        E.checkNotNull(hugeGraph, "graph");
        E.checkNotNull(executorService, "taskExecutor");
        E.checkNotNull(executorService2, "dbExecutor");
        this.graph = hugeGraph;
        this.taskExecutor = executorService;
        this.dbExecutor = executorService2;
        this.tasks = new ConcurrentHashMap();
        this.taskTx = null;
        this.eventListener = listenChanges();
    }

    public HugeGraph graph() {
        return this.graph;
    }

    public int pendingTasks() {
        return this.tasks.size();
    }

    private TaskTransaction tx() {
        if (this.taskTx == null) {
            synchronized (this) {
                if (this.taskTx == null) {
                    TaskTransaction taskTransaction = new TaskTransaction(this.graph, this.graph.loadSystemStore());
                    if (!$assertionsDisabled && this.taskTx != null) {
                        throw new AssertionError();
                    }
                    this.taskTx = taskTransaction;
                }
            }
        }
        if ($assertionsDisabled || this.taskTx != null) {
            return this.taskTx;
        }
        throw new AssertionError();
    }

    private EventListener listenChanges() {
        ImmutableSet of = ImmutableSet.of(Events.STORE_INITED);
        EventListener eventListener = event -> {
            if (!of.contains(event.name())) {
                return false;
            }
            call(() -> {
                tx().initSchema();
            });
            return true;
        };
        this.graph.loadSystemStore().provider().listen(eventListener);
        return eventListener;
    }

    private void unlistenChanges() {
        this.graph.loadSystemStore().provider().unlisten(this.eventListener);
    }

    public <V> void restoreTasks() {
        Iterator<TaskStatus> it = TaskStatus.PENDING_STATUSES.iterator();
        while (it.hasNext()) {
            Iterator<HugeTask<V>> findTask = findTask(it.next(), -1L);
            while (findTask.hasNext()) {
                restore(findTask.next());
            }
        }
    }

    public <V> Future<?> restore(HugeTask<V> hugeTask) {
        E.checkArgumentNotNull(hugeTask, "Task can't be null", new Object[0]);
        E.checkArgument(!hugeTask.isDone(), "No need to restore task '%s', it has been completed", new Object[]{hugeTask.id()});
        hugeTask.status(TaskStatus.RESTORING);
        return submitTask(hugeTask);
    }

    public <V> Future<?> schedule(HugeTask<V> hugeTask) {
        E.checkArgumentNotNull(hugeTask, "Task can't be null", new Object[0]);
        hugeTask.status(TaskStatus.QUEUED);
        return submitTask(hugeTask);
    }

    private <V> Future<?> submitTask(HugeTask<V> hugeTask) {
        int size = this.tasks.size() + 1;
        E.checkArgument(size <= MAX_PENDING_TASKS, "Pending tasks size %s has exceeded the max limit %s", new Object[]{Integer.valueOf(size), Integer.valueOf(MAX_PENDING_TASKS)});
        this.tasks.put(hugeTask.id(), hugeTask);
        hugeTask.callable().scheduler(this);
        hugeTask.callable().task(hugeTask);
        return this.taskExecutor.submit(hugeTask);
    }

    public <V> void cancel(HugeTask<V> hugeTask) {
        E.checkArgumentNotNull(hugeTask, "Task can't be null", new Object[0]);
        if (hugeTask.completed()) {
            return;
        }
        hugeTask.cancel(true);
        remove(hugeTask.id());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void remove(Id id) {
        HugeTask<?> remove = this.tasks.remove(id);
        if (!$assertionsDisabled && remove != null && !remove.completed()) {
            throw new AssertionError();
        }
    }

    public <V> void save(HugeTask<V> hugeTask) {
        E.checkArgumentNotNull(hugeTask, "Task can't be null", new Object[0]);
        call(() -> {
            return tx().addVertex(tx().constructVertex(hugeTask));
        });
    }

    public boolean close() {
        unlistenChanges();
        if (this.dbExecutor.isShutdown()) {
            return true;
        }
        call(() -> {
            tx().close();
            this.graph.closeTx();
        });
        return true;
    }

    public <V> HugeTask<V> task(Id id) {
        E.checkArgumentNotNull(id, "Parameter task id can't be null", new Object[0]);
        HugeTask<V> hugeTask = (HugeTask) this.tasks.get(id);
        return hugeTask != null ? hugeTask : findTask(id);
    }

    public <V> Iterator<HugeTask<V>> tasks(List<Id> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Id id : list) {
            HugeTask<?> hugeTask = this.tasks.get(id);
            if (hugeTask != null) {
                arrayList2.add(hugeTask);
            } else {
                arrayList.add(id);
            }
        }
        ExtendableIterator extendableIterator = arrayList2.isEmpty() ? new ExtendableIterator() : new ExtendableIterator(arrayList2.iterator());
        extendableIterator.extend(findTasks(arrayList));
        return extendableIterator;
    }

    public <V> HugeTask<V> findTask(Id id) {
        HugeTask<V> hugeTask = (HugeTask) call(() -> {
            HugeTask hugeTask2 = null;
            Iterator<Vertex> queryVertices = tx().queryVertices(id);
            if (queryVertices.hasNext()) {
                hugeTask2 = HugeTask.fromVertex(queryVertices.next());
                if (!$assertionsDisabled && queryVertices.hasNext()) {
                    throw new AssertionError();
                }
            }
            return hugeTask2;
        });
        if (hugeTask == null) {
            throw new NotFoundException("Can't find task with id '%s'", id);
        }
        return hugeTask;
    }

    public <V> Iterator<HugeTask<V>> findTasks(List<Id> list) {
        return queryTask(list);
    }

    public <V> Iterator<HugeTask<V>> findAllTask(long j) {
        return queryTask(ImmutableMap.of(), j);
    }

    public <V> Iterator<HugeTask<V>> findTask(TaskStatus taskStatus, long j) {
        return queryTask(HugeTask.P.STATUS, Byte.valueOf(taskStatus.code()), j);
    }

    public <V> HugeTask<V> deleteTask(Id id) {
        HugeTask<?> hugeTask = this.tasks.get(id);
        if (hugeTask != null) {
            E.checkArgument(hugeTask.completed(), "Can't delete incomplete task '%s' in status %s, Please try to cancel the task first", new Object[]{id, hugeTask.status()});
            remove(id);
        }
        return (HugeTask) call(() -> {
            HugeTask hugeTask2 = null;
            Iterator<Vertex> queryVertices = tx().queryVertices(id);
            if (queryVertices.hasNext()) {
                HugeVertex hugeVertex = (HugeVertex) queryVertices.next();
                hugeTask2 = HugeTask.fromVertex(hugeVertex);
                E.checkState(hugeTask2.completed(), "Can't delete incomplete task '%s' in status %s", new Object[]{id, hugeTask2.status()});
                tx().removeVertex(hugeVertex);
                if (!$assertionsDisabled && queryVertices.hasNext()) {
                    throw new AssertionError();
                }
            }
            return hugeTask2;
        });
    }

    public <V> HugeTask<V> waitUntilTaskCompleted(Id id, long j) throws TimeoutException {
        long j2 = (j * 1000) / QUERY_INTERVAL;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            HugeTask<V> task = task(id);
            if (task.completed()) {
                return task;
            }
            if (j4 >= j2) {
                throw new TimeoutException(String.format("Task '%s' was not completed in %s seconds", id, Long.valueOf(j)));
            }
            try {
                Thread.sleep(QUERY_INTERVAL);
            } catch (InterruptedException e) {
            }
            j3 = j4 + 1;
        }
    }

    public void waitUntilAllTasksCompleted(long j) throws TimeoutException {
        long j2 = (j * 1000) / QUERY_INTERVAL;
        long j3 = 0;
        while (true) {
            long j4 = j3;
            int pendingTasks = pendingTasks();
            if (pendingTasks == 0) {
                return;
            }
            if (j4 >= j2) {
                throw new TimeoutException(String.format("There are still %s incomplete tasks after %s seconds", Integer.valueOf(pendingTasks), Long.valueOf(j)));
            }
            try {
                Thread.sleep(QUERY_INTERVAL);
            } catch (InterruptedException e) {
            }
            j3 = j4 + 1;
        }
    }

    private <V> Iterator<HugeTask<V>> queryTask(String str, Object obj, long j) {
        return queryTask(ImmutableMap.of(str, obj), j);
    }

    private <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> map, long j) {
        return ((List) call(() -> {
            ConditionQuery conditionQuery = new ConditionQuery(HugeType.VERTEX);
            conditionQuery.eq(HugeKeys.LABEL, this.graph.vertexLabel(TaskTransaction.TASK).id());
            for (Map.Entry entry : map.entrySet()) {
                conditionQuery.query(Condition.eq(this.graph.propertyKey((String) entry.getKey()).id(), entry.getValue()));
            }
            conditionQuery.showHidden(true);
            if (j != -1) {
                conditionQuery.limit(j);
            }
            return IteratorUtils.list(new MapperIterator(tx().queryVertices(conditionQuery), HugeTask::fromVertex));
        })).iterator();
    }

    private <V> Iterator<HugeTask<V>> queryTask(List<Id> list) {
        return ((List) call(() -> {
            return IteratorUtils.list(new MapperIterator(tx().queryVertices(list.toArray(new Id[list.size()])), HugeTask::fromVertex));
        })).iterator();
    }

    private <V> V call(Runnable runnable) {
        return (V) call(Executors.callable(runnable, null));
    }

    private <V> V call(Callable<V> callable) {
        try {
            return (V) this.dbExecutor.submit(callable).get();
        } catch (Exception e) {
            throw new HugeException("Failed to update/query TaskStore", e);
        }
    }

    static {
        $assertionsDisabled = !TaskScheduler.class.desiredAssertionStatus();
    }
}
