package org.apache.flink.client.python;

import java.io.File;
import java.io.FileOutputStream;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.python.PythonEnvUtils;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.util.FileUtils;

/* loaded from: input_file:org/apache/flink/client/python/PythonFunctionFactoryTest.class */
public class PythonFunctionFactoryTest {
    private static String tmpdir = "";
    private static BatchTableEnvironment flinkTableEnv;
    private static StreamTableEnvironment blinkTableEnv;
    private static Table flinkSourceTable;
    private static Table blinkSourceTable;

    public static void main(String[] strArr) throws Exception {
        prepareEnvironment();
        testPythonFunctionFactory();
        cleanEnvironment();
    }

    public static void prepareEnvironment() throws Exception {
        tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath();
        new File(tmpdir).mkdir();
        File file = new File(tmpdir, "test1.py");
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        Throwable th = null;
        try {
            fileOutputStream.write("from pyflink.table.udf import udf\nfrom pyflink.table import DataTypes\n@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\ndef func1(str):\n    return str + str\n".getBytes());
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            flinkTableEnv = BatchTableEnvironment.create(executionEnvironment);
            flinkTableEnv.getConfig().getConfiguration().set(PythonOptions.PYTHON_FILES, file.getAbsolutePath());
            flinkTableEnv.getConfig().getConfiguration().setString(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), "80mb");
            StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
            blinkTableEnv = StreamTableEnvironment.create(executionEnvironment2, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
            blinkTableEnv.getConfig().getConfiguration().set(PythonOptions.PYTHON_FILES, file.getAbsolutePath());
            blinkTableEnv.getConfig().getConfiguration().setString(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), "80mb");
            flinkSourceTable = flinkTableEnv.fromDataSet(executionEnvironment.fromElements(new String[]{"1", "2", "3"})).as("str", new String[0]);
            blinkSourceTable = blinkTableEnv.fromDataStream(executionEnvironment2.fromElements(new String[]{"1", "2", "3"})).as("str", new String[0]);
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    public static void cleanEnvironment() throws Exception {
        closeStartedPythonProcess();
        FileUtils.deleteDirectory(new File(tmpdir));
    }

    public static void testPythonFunctionFactory() {
        flinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
        verifyPlan(flinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), flinkTableEnv);
        flinkTableEnv.executeSql("alter function func1 as 'test1.func1' language python");
        verifyPlan(flinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), flinkTableEnv);
        flinkTableEnv.executeSql("create temporary function func1 as 'test1.func1' language python");
        verifyPlan(flinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), flinkTableEnv);
        flinkTableEnv.executeSql("create temporary system function func1 as 'test1.func1' language python");
        verifyPlan(flinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), flinkTableEnv);
        blinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
        verifyPlan(blinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), blinkTableEnv);
        blinkTableEnv.executeSql("alter function func1 as 'test1.func1' language python");
        verifyPlan(blinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), blinkTableEnv);
        blinkTableEnv.executeSql("create temporary function func1 as 'test1.func1' language python");
        verifyPlan(blinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), blinkTableEnv);
        blinkTableEnv.executeSql("create temporary system function func1 as 'test1.func1' language python");
        verifyPlan(blinkSourceTable.select(new Expression[]{Expressions.call("func1", new Object[]{Expressions.$("str")})}), blinkTableEnv);
    }

    private static void verifyPlan(Table table, TableEnvironment tableEnvironment) {
        String explain = table.explain(new ExplainDetail[0]);
        if (!explain.contains("PythonCalc(select=[func1(f0) AS _c0])")) {
            throw new AssertionError(String.format("This plan does not contains \"%s\":\n%s", "PythonCalc(select=[func1(f0) AS _c0])", explain));
        }
    }

    private static void closeStartedPythonProcess() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
        Field declaredField = Class.forName("java.lang.ApplicationShutdownHooks").getDeclaredField("hooks");
        declaredField.setAccessible(true);
        Map map = (Map) declaredField.get(null);
        PythonEnvUtils.PythonProcessShutdownHook pythonProcessShutdownHook = null;
        Iterator it = map.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Thread thread = (Thread) it.next();
            if (thread instanceof PythonEnvUtils.PythonProcessShutdownHook) {
                pythonProcessShutdownHook = (PythonEnvUtils.PythonProcessShutdownHook) thread;
                break;
            }
        }
        if (pythonProcessShutdownHook != null) {
            pythonProcessShutdownHook.run();
            map.remove(pythonProcessShutdownHook);
        }
    }
}
