package cn.tenmg.flink.jobs;

import cn.tenmg.flink.jobs.context.FlinkJobsContext;
import cn.tenmg.flink.jobs.model.Arguments;
import cn.tenmg.flink.jobs.utils.OperatorUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:cn/tenmg/flink/jobs/BasicFlinkJobsRunner.class */
public abstract class BasicFlinkJobsRunner {
    protected abstract void run(StreamExecutionEnvironment streamExecutionEnvironment, Arguments arguments) throws Exception;

    public void run(String[] strArr) throws Exception {
        if (strArr == null || strArr.length < 1) {
            throw new IllegalArgumentException("You must provide a parameter in JSON format or the path of json file");
        }
        if (strArr.length > 1) {
            throw new IllegalArgumentException("Too many parameters. You must provide a parameter in JSON format or the path of json file");
        }
        String str = strArr[0];
        Arguments arguments = str.endsWith(".json") ? (Arguments) JSON.parseObject(new FileInputStream(str), Arguments.class, new Feature[0]) : (Arguments) JSON.parseObject(str, Arguments.class);
        StreamExecutionEnvironment executionEnvironment = FlinkJobsContext.getExecutionEnvironment(arguments.getConfiguration());
        RuntimeExecutionMode runtimeMode = arguments.getRuntimeMode();
        if (RuntimeExecutionMode.BATCH.equals(runtimeMode)) {
            executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        } else if (RuntimeExecutionMode.STREAMING.equals(runtimeMode)) {
            executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        } else if (RuntimeExecutionMode.AUTOMATIC.equals(runtimeMode)) {
            executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        }
        Map<String, Object> params = arguments.getParams();
        if (params == null) {
            params = new HashMap();
            arguments.setParams(params);
        }
        operates(executionEnvironment, arguments.getOperates(), params);
        run(executionEnvironment, arguments);
        FlinkJobsContext.remove();
    }

    private void operates(StreamExecutionEnvironment streamExecutionEnvironment, List<String> list, Map<String, Object> map) throws Exception {
        if (list != null) {
            for (int i = 0; i < list.size(); i++) {
                String str = list.get(i);
                OperatorUtils.getOperator(JSON.parseObject(str).getString("type")).execute(streamExecutionEnvironment, str, map);
            }
        }
    }
}
