package cn.tenmg.clink.operator.job.generator;

import cn.tenmg.clink.context.ClinkContext;
import cn.tenmg.clink.datasource.DataSourceConverter;
import cn.tenmg.clink.exception.IllegalConfigurationException;
import cn.tenmg.clink.exception.IllegalJobConfigException;
import cn.tenmg.clink.metadata.MetaDataGetter;
import cn.tenmg.clink.metadata.MetaDataGetterFactory;
import cn.tenmg.clink.model.DataSync;
import cn.tenmg.clink.model.data.sync.Column;
import cn.tenmg.clink.operator.job.generator.AbstractDataSyncJobGenerator;
import cn.tenmg.clink.source.SourceFactory;
import cn.tenmg.clink.table.functions.MultiTablesRowsFilterFunction;
import cn.tenmg.clink.utils.ConfigurationUtils;
import cn.tenmg.clink.utils.DataTypeUtils;
import cn.tenmg.clink.utils.SQLUtils;
import cn.tenmg.dsl.utils.CollectionUtils;
import cn.tenmg.dsl.utils.MapUtils;
import cn.tenmg.dsl.utils.ObjectUtils;
import cn.tenmg.dsl.utils.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;

/* loaded from: input_file:cn/tenmg/clink/operator/job/generator/MultiTablesDataSyncJobGenerator.class */
public class MultiTablesDataSyncJobGenerator extends AbstractDataSyncJobGenerator {
    private static final MultiTablesDataSyncJobGenerator INSTANCE = new MultiTablesDataSyncJobGenerator();
    private static final Map<String, DataSourceConverter> converters = MapUtils.newHashMap();
    private static volatile Map<String, SourceFactory<Source<Tuple2<String, Row>, ?, ?>>> factories = MapUtils.newHashMap();
    private static final Pattern METADATA_PATTERN;

    private MultiTablesDataSyncJobGenerator() {
    }

    public static MultiTablesDataSyncJobGenerator getInstance() {
        return INSTANCE;
    }

    @Override // cn.tenmg.clink.operator.job.generator.AbstractDataSyncJobGenerator
    Object generate(StreamExecutionEnvironment streamExecutionEnvironment, StreamTableEnvironment streamTableEnvironment, DataSync dataSync, Map<String, String> map, Map<String, String> map2, Map<String, Object> map3) throws Exception {
        String topic;
        String str;
        String str2 = map.get("connector");
        SourceFactory<Source<Tuple2<String, Row>, ?, ?>> sourceFactory = getSourceFactory(str2);
        Map<String, Set<String>> parseTableConfigs = parseTableConfigs(dataSync.getPrimaryKey());
        Map<String, Set<String>> parseTableConfigs2 = parseTableConfigs(dataSync.getAutoColumns());
        String defaultAutoColumns = getDefaultAutoColumns();
        Map<String, String> emptyMap = StringUtils.isBlank(defaultAutoColumns) ? Collections.emptyMap() : toMap(TO_LOWERCASE, defaultAutoColumns.split(","));
        String[] split = dataSync.getTable().split(",");
        String currentDatabase = streamTableEnvironment.getCurrentDatabase();
        final HashMap newHashMap = MapUtils.newHashMap();
        HashMap newHashMap2 = MapUtils.newHashMap(split.length);
        HashMap newHashMap3 = MapUtils.newHashMap();
        HashMap newHashMap4 = MapUtils.newHashMap();
        HashMap newHashMap5 = MapUtils.newHashMap();
        int i = 0;
        while (true) {
            int i2 = 0;
            if (i >= split.length) {
                break;
            }
            final String trim = split[i].trim();
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            Set<String> set = parseTableConfigs.get(trim);
            Set<String> set2 = parseTableConfigs2.get(trim);
            if (CollectionUtils.isEmpty(set)) {
                set = parseTableConfigs.get(null);
            }
            if (CollectionUtils.isNotEmpty(set)) {
                hashSet.addAll(set);
            }
            if (CollectionUtils.isEmpty(set2)) {
                set2 = parseTableConfigs2.get(null);
            }
            if (CollectionUtils.isNotEmpty(set2)) {
                hashSet2.addAll(set2);
            }
            List<Column> collation = collation(dataSync, map3, map2, trim, hashSet, CollectionUtils.isEmpty(hashSet2) ? MapUtils.toHashMap(emptyMap) : toMap(TO_LOWERCASE, hashSet2));
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            HashMap newHashMap6 = MapUtils.newHashMap();
            for (Column column : collation) {
                if (!AbstractDataSyncJobGenerator.Strategy.TO.equals(column.getStrategy())) {
                    arrayList.add(decodeKeyword(column.getFromName()));
                    String fromType = column.getFromType();
                    Matcher matcher = METADATA_PATTERN.matcher(fromType);
                    if (matcher.find()) {
                        int start = matcher.start();
                        int end = matcher.end();
                        fromType = end < fromType.length() - 1 ? StringUtils.concat(new String[]{fromType.substring(0, start), fromType.substring(end + 1)}) : fromType.substring(0, matcher.start());
                        String group = matcher.group();
                        newHashMap6.put(Integer.valueOf(i2), group.substring(group.indexOf(SQLUtils.SINGLE_QUOTATION_MARK) + 1, group.lastIndexOf(SQLUtils.SINGLE_QUOTATION_MARK)));
                    }
                    LogicalType logicalType = DataTypeUtils.fromFlinkSQLType(fromType).getLogicalType();
                    arrayList2.add(logicalType);
                    arrayList3.add(InternalTypeInfo.of(logicalType));
                    i2++;
                }
            }
            newHashMap2.put(trim, collation);
            newHashMap5.put(trim, newHashMap6);
            String[] split2 = trim.split("\\.", 2);
            if (split2.length > 1) {
                str = split2[1];
                streamTableEnvironment.executeSql("CREATE DATABASE IF NOT EXISTS ".concat(split2[0]));
                streamTableEnvironment.executeSql("USE ".concat(split2[0]));
            } else {
                streamTableEnvironment.executeSql("USE ".concat(currentDatabase));
                str = trim;
            }
            DataSourceConverter dataSourceConverter = converters.get(map2.get("connector"));
            AbstractDataSyncJobGenerator.PrimaryKeysCollector primaryKeysCollector = new AbstractDataSyncJobGenerator.PrimaryKeysCollector() { // from class: cn.tenmg.clink.operator.job.generator.MultiTablesDataSyncJobGenerator.1
                @Override // cn.tenmg.clink.operator.job.generator.AbstractDataSyncJobGenerator.PrimaryKeysCollector
                public void collect(Set<String> set3) {
                    newHashMap.put(trim, set3.toArray(new String[0]));
                }
            };
            String sinkTableSQL = dataSourceConverter == null ? sinkTableSQL(map2, str, collation, hashSet, map3, primaryKeysCollector) : sinkTableSQL(dataSourceConverter.convert(map2, trim), str, collation, hashSet, map3, primaryKeysCollector);
            if (this.log.isInfoEnabled()) {
                this.log.info("Create sink table by Flink SQL: " + SQLUtils.hiddePassword(sinkTableSQL));
            }
            streamTableEnvironment.executeSql(sinkTableSQL);
            int size = arrayList2.size();
            String[] strArr = (String[]) arrayList.toArray(new String[size]);
            newHashMap4.put(trim, RowType.of((LogicalType[]) arrayList2.toArray(new LogicalType[size]), strArr));
            newHashMap3.put(trim, new RowTypeInfo((TypeInformation[]) arrayList3.toArray(new TypeInformation[0]), strArr));
            i++;
        }
        if (ConfigurationUtils.isKafka(map) && (topic = dataSync.getTopic()) != null) {
            map.put("topic", topic);
        }
        SingleOutputStreamOperator name = streamExecutionEnvironment.fromSource(sourceFactory.create(map, newHashMap4, newHashMap5), WatermarkStrategy.noWatermarks(), str2).disableChaining().name(str2);
        StreamStatementSet createStatementSet = streamTableEnvironment.createStatementSet();
        String property = ClinkContext.getProperty("data.sync.from-table-prefix");
        for (Map.Entry entry : newHashMap3.entrySet()) {
            String str3 = (String) entry.getKey();
            String[] split3 = str3.split("\\.", 2);
            String concat = split3.length > 1 ? StringUtils.concat(new String[]{split3[0], ClinkContext.CONFIG_SPLITER, property, split3[1]}) : property + str3;
            SingleOutputStreamOperator flatMap = name.flatMap(new MultiTablesRowsFilterFunction(str3), (TypeInformation) entry.getValue());
            String[] strArr2 = (String[]) newHashMap.get(str3);
            if (strArr2 == null) {
                streamTableEnvironment.createTemporaryView(concat, streamTableEnvironment.fromChangelogStream(flatMap));
            } else {
                streamTableEnvironment.createTemporaryView(concat, streamTableEnvironment.fromChangelogStream(flatMap, Schema.newBuilder().primaryKey(strArr2).build()));
            }
            String insertSQL = insertSQL(str3, concat, (List) newHashMap2.get(str3), map3);
            if (this.log.isInfoEnabled()) {
                this.log.info("Execute Flink SQL: " + SQLUtils.hiddePassword(insertSQL));
            }
            createStatementSet.addInsertSql(insertSQL);
        }
        return createStatementSet.execute();
    }

    private Map<String, Set<String>> parseTableConfigs(String str) {
        String str2;
        HashMap newHashMap = MapUtils.newHashMap();
        if (StringUtils.isNotBlank(str)) {
            for (String str3 : str.split(",")) {
                String trim = str3.trim();
                int lastIndexOf = trim.lastIndexOf(ClinkContext.CONFIG_SPLITER);
                if (lastIndexOf > 0) {
                    str2 = trim.substring(0, lastIndexOf);
                    trim = trim.substring(lastIndexOf + 1);
                } else {
                    str2 = null;
                }
                Set set = (Set) newHashMap.get(str2);
                if (set == null) {
                    set = new LinkedHashSet();
                    newHashMap.put(str2, set);
                }
                set.add(trim);
            }
        }
        return newHashMap;
    }

    private static List<Column> collation(DataSync dataSync, Map<String, Object> map, Map<String, String> map2, String str, Set<String> set, Map<String, String> map3) throws Exception {
        List<Column> list;
        Set<String> primaryKeys;
        List<Column> columns = dataSync.getColumns();
        if (CollectionUtils.isEmpty(columns)) {
            list = new ArrayList();
        } else {
            list = (List) ObjectUtils.clone(columns);
            String concat = str.concat(ClinkContext.CONFIG_SPLITER);
            Iterator<Column> it = list.iterator();
            while (it.hasNext()) {
                Column next = it.next();
                String fromName = next.getFromName();
                String toName = next.getToName();
                if (belongTo(concat, fromName) && belongTo(concat, toName)) {
                    next.setFromName(fromName.replace(concat, ""));
                    if (StringUtils.isNotBlank(toName)) {
                        next.setToName(toName.replace(concat, ""));
                    }
                } else {
                    it.remove();
                }
            }
        }
        Boolean smart = dataSync.getSmart();
        if (smart == null) {
            smart = Boolean.valueOf(ClinkContext.getProperty(ClinkContext.SMART_MODE_CONFIG_KEY));
        }
        if (Boolean.TRUE.equals(smart)) {
            MetaDataGetter.TableMetaData tableMetaData = MetaDataGetterFactory.getMetaDataGetter(map2).getTableMetaData(map2, str);
            if (set.isEmpty() && (primaryKeys = tableMetaData.getPrimaryKeys()) != null) {
                set.addAll(primaryKeys);
            }
            String str2 = map2.get("connector");
            Map<String, MetaDataGetter.TableMetaData.ColumnType> columns2 = tableMetaData.getColumns();
            if (list.isEmpty()) {
                addSmartLoadColumns(str2, list, columns2, map, map3);
            } else {
                collationPartlyCustom(str2, list, map, columns2, map3);
            }
            for (String str3 : map3.values()) {
                Column column = new Column();
                column.setFromName(str3);
                column.setToName(str3);
                String lowerCase = TO_LOWERCASE ? str3.toLowerCase() : str3;
                String defaultStrategy = getDefaultStrategy(lowerCase);
                if (AbstractDataSyncJobGenerator.Strategy.FROM.equals(defaultStrategy)) {
                    column.setStrategy(defaultStrategy);
                    column.setFromType(getDefaultFromType(lowerCase));
                    column.setToType(getDefaultToType(lowerCase));
                    column.setScript(getDefaultScript(lowerCase));
                    list.add(column);
                }
            }
        } else {
            if (list.isEmpty()) {
                throw new IllegalJobConfigException("At least one column must be configured in manual mode, or set the configuration 'clink.smart=true' to enable automatic column acquisition in smart mode");
            }
            collationCustom(list, map, map3);
            for (String str4 : map3.values()) {
                Column column2 = new Column();
                column2.setFromName(str4);
                column2.setToName(str4);
                String lowerCase2 = TO_LOWERCASE ? str4.toLowerCase() : str4;
                column2.setStrategy(getDefaultStrategy(lowerCase2));
                column2.setFromType(getDefaultFromType(lowerCase2));
                column2.setToType(getDefaultToType(lowerCase2));
                column2.setScript(getDefaultScript(lowerCase2));
                list.add(column2);
            }
        }
        return list;
    }

    private static final Map<String, String> toMap(boolean z, Set<String> set) {
        HashMap newHashMap = MapUtils.newHashMap();
        if (!CollectionUtils.isEmpty(set)) {
            if (z) {
                for (String str : set) {
                    newHashMap.put(str.toLowerCase(), str);
                }
            } else {
                for (String str2 : set) {
                    newHashMap.put(str2, str2);
                }
            }
        }
        return newHashMap;
    }

    private static boolean belongTo(String str, String str2) {
        return StringUtils.isBlank(str2) || !str2.contains(ClinkContext.CONFIG_SPLITER) || str2.startsWith(str);
    }

    private static SourceFactory<Source<Tuple2<String, Row>, ?, ?>> getSourceFactory(String str) {
        SourceFactory<Source<Tuple2<String, Row>, ?, ?>> sourceFactory = factories.get(str);
        if (sourceFactory == null) {
            throw new IllegalConfigurationException("Cannot find source factory for connector " + str);
        }
        return sourceFactory;
    }

    private static String decodeKeyword(String str) {
        int lastIndexOf;
        int indexOf = str.indexOf("`") + 1;
        if (indexOf > 0 && indexOf < str.length() && (lastIndexOf = str.lastIndexOf("`")) > indexOf) {
            str = str.substring(indexOf, lastIndexOf);
        }
        return str;
    }

    static {
        Iterator it = ServiceLoader.load(SourceFactory.class).iterator();
        while (it.hasNext()) {
            SourceFactory<Source<Tuple2<String, Row>, ?, ?>> sourceFactory = (SourceFactory) it.next();
            factories.put(sourceFactory.factoryIdentifier(), sourceFactory);
        }
        METADATA_PATTERN = Pattern.compile("METADATA[\\s]+FROM[\\s]+'[\\S]+'[\\s]+VIRTUAL");
        Iterator it2 = ServiceLoader.load(DataSourceConverter.class).iterator();
        while (it2.hasNext()) {
            DataSourceConverter dataSourceConverter = (DataSourceConverter) it2.next();
            converters.put(dataSourceConverter.connector(), dataSourceConverter);
        }
    }
}
