package cn.tenmg.flink.jobs.operator;

import cn.tenmg.flink.jobs.context.FlinkJobsContext;
import cn.tenmg.flink.jobs.model.DataSync;
import cn.tenmg.flink.jobs.model.data.sync.Column;
import cn.tenmg.flink.jobs.operator.data.sync.MetaDataGetter;
import cn.tenmg.flink.jobs.operator.data.sync.MetaDataGetterFactory;
import cn.tenmg.flink.jobs.utils.ConfigurationUtils;
import cn.tenmg.flink.jobs.utils.MapUtils;
import cn.tenmg.flink.jobs.utils.SQLUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:cn/tenmg/flink/jobs/operator/DataSyncOperator.class */
public class DataSyncOperator extends AbstractOperator<DataSync> {
    private static final Logger log = LogManager.getLogger(DataSyncOperator.class);
    private static final String SMART_KEY = "data.sync.smart";
    private static final String FROM_TABLE_PREFIX_KEY = "data.sync.from_table_prefix";
    private static final String TOPIC_KEY = "topic";
    private static final String GROUP_ID_KEY = "properties.group.id";
    private static final String GROUP_ID_PREFIX_KEY = "data.sync.group_id_prefix";

    /* renamed from: execute, reason: avoid collision after fix types in other method */
    Object execute2(StreamExecutionEnvironment streamExecutionEnvironment, DataSync dataSync, Map<String, Object> map) throws Exception {
        String from = dataSync.getFrom();
        String to = dataSync.getTo();
        String table = dataSync.getTable();
        if (StringUtils.isBlank(from) || StringUtils.isBlank(to) || StringUtils.isBlank(table)) {
            throw new IllegalArgumentException("The property 'from', 'to' or 'table' cannot be blank.");
        }
        StreamTableEnvironment orCreateStreamTableEnvironment = FlinkJobsContext.getOrCreateStreamTableEnvironment(streamExecutionEnvironment);
        String primaryKey = dataSync.getPrimaryKey();
        String topic = dataSync.getTopic();
        String currentCatalog = orCreateStreamTableEnvironment.getCurrentCatalog();
        String defaultCatalog = FlinkJobsContext.getDefaultCatalog(orCreateStreamTableEnvironment);
        String str = FlinkJobsContext.getProperty(FROM_TABLE_PREFIX_KEY) + table;
        String fromConfig = dataSync.getFromConfig();
        if (!defaultCatalog.equals(currentCatalog)) {
            orCreateStreamTableEnvironment.useCatalog(defaultCatalog);
        }
        Map<String, String> datasource = FlinkJobsContext.getDatasource(from);
        Map<String, String> datasource2 = FlinkJobsContext.getDatasource(to);
        List<Column> columns = dataSync.getColumns();
        Boolean smart = dataSync.getSmart();
        if (smart == null) {
            smart = Boolean.valueOf(FlinkJobsContext.getProperty(SMART_KEY));
        }
        if (Boolean.TRUE.equals(smart)) {
            MetaDataGetter.TableMetaData tableMetaData = MetaDataGetterFactory.getMetaDataGetter(datasource2).getTableMetaData(datasource2, table);
            Set<String> primaryKeys = tableMetaData.getPrimaryKeys();
            if (primaryKey == null && primaryKeys != null && !primaryKeys.isEmpty()) {
                primaryKey = String.join(",", primaryKeys);
            }
            Map<String, String> columns2 = tableMetaData.getColumns();
            if (columns == null) {
                columns = new ArrayList();
                addColumns(columns, columns2);
            } else if (columns.isEmpty()) {
                addColumns(columns, columns2);
            } else {
                int size = columns.size();
                for (int i = 0; i < size; i++) {
                    Column column = columns.get(i);
                    String toName = column.getToName();
                    if (StringUtils.isBlank(toName)) {
                        toName = column.getFromName();
                    }
                    String str2 = columns2.get(toName);
                    if (str2 != null) {
                        if (StringUtils.isBlank(column.getFromType())) {
                            column.setFromType(str2);
                        }
                        if (StringUtils.isBlank(column.getToType())) {
                            column.setToType(str2);
                        }
                        columns2.remove(toName);
                    }
                }
                addColumns(columns, columns2);
            }
        }
        String fromCreateTableSQL = fromCreateTableSQL(datasource, topic, table, str, columns, primaryKey, fromConfig);
        log.info(fromCreateTableSQL);
        orCreateStreamTableEnvironment.executeSql(fromCreateTableSQL);
        String createTableSQL = toCreateTableSQL(datasource2, table, columns, primaryKey, dataSync.getToConfig());
        log.info(createTableSQL);
        orCreateStreamTableEnvironment.executeSql(createTableSQL);
        String insertSQL = insertSQL(table, str, columns);
        log.info(insertSQL);
        return orCreateStreamTableEnvironment.executeSql(insertSQL);
    }

    private static void addColumns(List<Column> list, Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            addColumn(list, entry.getKey(), entry.getValue());
        }
    }

    private static void addColumn(List<Column> list, String str, String str2) {
        Column column = new Column();
        column.setFromName(str);
        column.setFromType(str2);
        list.add(column);
    }

    private static String fromCreateTableSQL(Map<String, String> map, String str, String str2, String str3, List<Column> list, String str4, String str5) throws IOException {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("CREATE TABLE ").append(str3).append("(");
        Column column = list.get(0);
        stringBuffer.append(column.getFromName()).append(' ').append(column.getFromType());
        int size = list.size();
        for (int i = 1; i < size; i++) {
            Column column2 = list.get(i);
            stringBuffer.append(',').append(' ').append(column2.getFromName()).append(' ').append(column2.getFromType());
        }
        if (StringUtils.isNotBlank(str4)) {
            stringBuffer.append(',').append(' ').append("PRIMARY KEY (").append(str4).append(") NOT ENFORCED");
        }
        stringBuffer.append(") ").append("WITH (");
        HashMap<String, String> newHashMap = MapUtils.newHashMap(map);
        if (StringUtils.isBlank(str5)) {
            newHashMap.put(GROUP_ID_KEY, FlinkJobsContext.getProperty(GROUP_ID_PREFIX_KEY) + str2);
            if (str != null) {
                newHashMap.put(TOPIC_KEY, str);
            }
            SQLUtils.appendDataSource(stringBuffer, newHashMap);
        } else {
            Map<String, String> load = ConfigurationUtils.load(str5);
            MapUtils.removeAll(newHashMap, load.keySet());
            if (!load.containsKey(GROUP_ID_KEY)) {
                newHashMap.put(GROUP_ID_KEY, FlinkJobsContext.getProperty(GROUP_ID_PREFIX_KEY) + str2);
            }
            if (str != null && !load.containsKey(TOPIC_KEY)) {
                newHashMap.put(TOPIC_KEY, str);
            }
            SQLUtils.appendDataSource(stringBuffer, newHashMap);
            stringBuffer.append(',').append(' ').append(str5);
        }
        stringBuffer.append(")");
        return stringBuffer.toString();
    }

    private static String toCreateTableSQL(Map<String, String> map, String str, List<Column> list, String str2, String str3) throws IOException {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("CREATE TABLE ").append(str).append("(");
        Column column = list.get(0);
        String toName = column.getToName();
        String toType = column.getToType();
        stringBuffer.append(toName == null ? column.getFromName() : toName).append(' ').append(toType == null ? column.getFromType() : toType);
        int size = list.size();
        for (int i = 1; i < size; i++) {
            Column column2 = list.get(i);
            String toName2 = column2.getToName();
            String toType2 = column2.getToType();
            stringBuffer.append(',').append(' ').append(toName2 == null ? column2.getFromName() : toName2).append(' ').append(toType2 == null ? column2.getFromType() : toType2);
        }
        if (StringUtils.isNotBlank(str2)) {
            stringBuffer.append(',').append(' ').append("PRIMARY KEY (").append(str2).append(") NOT ENFORCED");
        }
        stringBuffer.append(") ").append("WITH (");
        HashMap<String, String> newHashMap = MapUtils.newHashMap(map);
        newHashMap.put("table-name", str);
        if (StringUtils.isBlank(str3)) {
            SQLUtils.appendDataSource(stringBuffer, newHashMap);
        } else {
            MapUtils.removeAll(newHashMap, ConfigurationUtils.load(str3).keySet());
            SQLUtils.appendDataSource(stringBuffer, newHashMap);
            stringBuffer.append(',').append(' ').append(str3);
        }
        stringBuffer.append(")");
        return stringBuffer.toString();
    }

    private static String insertSQL(String str, String str2, List<Column> list) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("INSERT INTO ").append(str).append(' ').append("(");
        Column column = list.get(0);
        String toName = column.getToName();
        stringBuffer.append(toName == null ? column.getFromName() : toName);
        int size = list.size();
        for (int i = 1; i < size; i++) {
            Column column2 = list.get(i);
            String toName2 = column2.getToName();
            stringBuffer.append(',').append(' ').append(toName2 == null ? column2.getFromName() : toName2);
        }
        stringBuffer.append(") SELECT ");
        Column column3 = list.get(0);
        String script = column3.getScript();
        stringBuffer.append(script == null ? column3.getFromName() : script);
        int size2 = list.size();
        for (int i2 = 1; i2 < size2; i2++) {
            Column column4 = list.get(i2);
            String script2 = column4.getScript();
            stringBuffer.append(',').append(' ').append(script2 == null ? column4.getFromName() : script2);
        }
        stringBuffer.append(" FROM ").append(str2);
        return stringBuffer.toString();
    }

    @Override // cn.tenmg.flink.jobs.operator.AbstractOperator
    /* bridge */ /* synthetic */ Object execute(StreamExecutionEnvironment streamExecutionEnvironment, DataSync dataSync, Map map) throws Exception {
        return execute2(streamExecutionEnvironment, dataSync, (Map<String, Object>) map);
    }
}
