package net.ibizsys.dataflow.spark.dataentity.dataflow;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFJoinCond;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFJoinGroupCond;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFJoinProcessNode;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFJoinSingleCond;
import net.ibizsys.model.engine.IPSModelEngine;
import net.ibizsys.model.engine.IPSModelEngineHolder;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/ibizsys/dataflow/spark/dataentity/dataflow/SparkPSDEDFJoinProcessNodeAddin.class */
public class SparkPSDEDFJoinProcessNodeAddin extends SparkPSDEDataFlowProcessNodeAddinBase {
    private static Random random = new Random();

    public void init(IPSModelEngineHolder iPSModelEngineHolder, IPSModelEngine iPSModelEngine, Object obj) throws Exception {
        if (!(obj instanceof IPSDEDFJoinProcessNode)) {
            throw new Exception(String.format("模型类型不正确", new Object[0]));
        }
        super.init(iPSModelEngineHolder, iPSModelEngine, obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getAddinData, reason: merged with bridge method [inline-methods] */
    public IPSDEDFJoinProcessNode m16getAddinData() {
        return (IPSDEDFJoinProcessNode) super.getAddinData();
    }

    /* renamed from: getPSDEDataFlowNode, reason: merged with bridge method [inline-methods] */
    public IPSDEDFJoinProcessNode m15getPSDEDataFlowNode() {
        return m16getAddinData();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.ibizsys.dataflow.spark.dataentity.dataflow.SparkPSDEDataFlowProcessNodeAddinBase
    public Dataset<Row> onGetDataset(ISparkPSDEDataFlowSession iSparkPSDEDataFlowSession) throws Throwable {
        String format;
        IPSDEDFJoinProcessNode m15getPSDEDataFlowNode = m15getPSDEDataFlowNode();
        Dataset<Row> dataset = iSparkPSDEDataFlowSession.getDataset(getDataStream(false));
        Dataset<Row> dataset2 = iSparkPSDEDataFlowSession.getDataset(getDataStream2(false));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        if (dataset.columns() != null && dataset.columns().length > 0) {
            for (String str : dataset.columns()) {
                String upperCase = String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM.value, str).toUpperCase();
                if (linkedHashMap2.containsKey(upperCase)) {
                    throw new Exception(String.format("数据流列[%1$s]重复", str));
                }
                linkedHashMap2.put(upperCase, str);
                linkedHashMap.put(str.toUpperCase(), str);
            }
        }
        boolean z = false;
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        if (dataset2.columns() != null && dataset2.columns().length > 0) {
            for (String str2 : dataset2.columns()) {
                if (!linkedHashMap.containsKey(str2.toUpperCase())) {
                    linkedHashMap.put(str2.toUpperCase(), "");
                    String upperCase2 = String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM2.value, str2).toUpperCase();
                    if (linkedHashMap2.containsKey(upperCase2)) {
                        throw new Exception(String.format("数据流2列[%1$s]重复", str2));
                    }
                    linkedHashMap2.put(upperCase2, str2);
                    linkedHashMap3.put(str2, str2);
                }
                do {
                    format = String.format("%1$s_%2$s", str2, Integer.valueOf(random.nextInt(1000)));
                } while (linkedHashMap.containsKey(format.toUpperCase()));
                linkedHashMap.put(format.toUpperCase(), "");
                String upperCase3 = String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM2.value, str2).toUpperCase();
                if (linkedHashMap2.containsKey(upperCase3)) {
                    throw new Exception(String.format("数据流2列[%1$s]重复", str2));
                }
                linkedHashMap2.put(upperCase3, format);
                linkedHashMap3.put(str2, format);
                z = true;
            }
        }
        if (z) {
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : linkedHashMap3.entrySet()) {
                if (((String) entry.getKey()).equals(entry.getValue())) {
                    arrayList.add(dataset2.col((String) entry.getKey()));
                } else {
                    arrayList.add(dataset2.col((String) entry.getKey()).alias((String) entry.getValue()));
                }
            }
            dataset2 = dataset2.select((Column[]) arrayList.toArray(new Column[arrayList.size()]));
        }
        return dataset.join(dataset2, getJoinCondition(dataset, dataset2, linkedHashMap2, m15getPSDEDataFlowNode.getPSDEDFJoinGroupCondMust()), m15getPSDEDataFlowNode.getJoinType());
    }

    protected Column getJoinCondition(Dataset<Row> dataset, Dataset<Row> dataset2, Map<String, String> map, IPSDEDFJoinGroupCond iPSDEDFJoinGroupCond, boolean z) throws Throwable {
        if (!z && iPSDEDFJoinGroupCond.isNotMode()) {
            return functions.not(getJoinCondition(dataset, dataset2, map, iPSDEDFJoinGroupCond, true));
        }
        List pSDEDFJoinConds = iPSDEDFJoinGroupCond.getPSDEDFJoinConds();
        if (ObjectUtils.isEmpty(pSDEDFJoinConds)) {
            throw new Exception(String.format("未定义子条件", new Object[0]));
        }
        Column column = null;
        if ("AND".equalsIgnoreCase(iPSDEDFJoinGroupCond.getCondOp())) {
            Iterator it = pSDEDFJoinConds.iterator();
            while (it.hasNext()) {
                Column joinCondition = getJoinCondition(dataset, dataset2, map, (IPSDEDFJoinCond) it.next());
                column = column == null ? joinCondition : column.and(joinCondition);
            }
        } else {
            if (!"OR".equalsIgnoreCase(iPSDEDFJoinGroupCond.getCondOp())) {
                throw new Exception(String.format("无法识别的条件类型[%1$s]", iPSDEDFJoinGroupCond.getCondOp()));
            }
            Iterator it2 = pSDEDFJoinConds.iterator();
            while (it2.hasNext()) {
                Column joinCondition2 = getJoinCondition(dataset, dataset2, map, (IPSDEDFJoinCond) it2.next());
                column = column == null ? joinCondition2 : column.or(joinCondition2);
            }
        }
        if (column == null) {
            throw new Exception("未定义子条件");
        }
        return column;
    }

    protected Column getJoinCondition(Dataset<Row> dataset, Dataset<Row> dataset2, Map<String, String> map, IPSDEDFJoinCond iPSDEDFJoinCond) throws Throwable {
        Column col;
        if (iPSDEDFJoinCond instanceof IPSDEDFJoinGroupCond) {
            return getJoinCondition(dataset, dataset2, map, (IPSDEDFJoinGroupCond) iPSDEDFJoinCond, false);
        }
        if (!(iPSDEDFJoinCond instanceof IPSDEDFJoinSingleCond)) {
            throw new Exception(String.format("无法识别的条件类型[%1$s]", iPSDEDFJoinCond.getCondType()));
        }
        IPSDEDFJoinSingleCond iPSDEDFJoinSingleCond = (IPSDEDFJoinSingleCond) iPSDEDFJoinCond;
        if (PSModelEnums.DEDataFlowFieldScope.DATASTREAM.value.equalsIgnoreCase(iPSDEDFJoinSingleCond.getJoinFieldScope())) {
            String str = map.get(String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM.value, iPSDEDFJoinSingleCond.getJoinField()).toUpperCase());
            if (!StringUtils.hasLength(str)) {
                throw new Exception(String.format("数据流未存在列[%1$s]", iPSDEDFJoinSingleCond.getJoinField()));
            }
            col = dataset.col(str);
        } else {
            if (!PSModelEnums.DEDataFlowFieldScope.DATASTREAM2.value.equalsIgnoreCase(iPSDEDFJoinSingleCond.getJoinFieldScope())) {
                throw new Exception(String.format("无法识别的连接属性归属[%1$s]", iPSDEDFJoinSingleCond.getJoinFieldScope()));
            }
            String str2 = map.get(String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM2.value, iPSDEDFJoinSingleCond.getJoinField()).toUpperCase());
            if (!StringUtils.hasLength(str2)) {
                throw new Exception(String.format("数据流2未存在列[%1$s]", iPSDEDFJoinSingleCond.getJoinField()));
            }
            col = dataset2.col(str2);
        }
        if ("ISNULL".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.isNull();
        }
        if ("ISNOTNULL".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.isNotNull();
        }
        Object conditionValue = getConditionValue(dataset, dataset2, map, iPSDEDFJoinSingleCond);
        if ("EQ".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.equalTo(conditionValue);
        }
        if ("NOTEQ".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.notEqual(conditionValue);
        }
        if ("GT".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.gt(conditionValue);
        }
        if ("GTANDEQ".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.geq(conditionValue);
        }
        if ("LT".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.lt(conditionValue);
        }
        if ("LTANDEQ".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            return col.leq(conditionValue);
        }
        if ("LIKE".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            if (conditionValue instanceof String) {
                return col.like((String) conditionValue);
            }
            throw new Exception(String.format("条件操作[%1$s]仅支持字符串条件值", iPSDEDFJoinSingleCond.getCondOp()));
        }
        if ("ILIKE".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            if (conditionValue instanceof String) {
                return col.ilike((String) conditionValue);
            }
            throw new Exception(String.format("条件操作[%1$s]仅支持字符串条件值", iPSDEDFJoinSingleCond.getCondOp()));
        }
        if (!"REGEXLIKE".equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondOp())) {
            throw new Exception(String.format("无法识别的条件操作[%1$s]", iPSDEDFJoinSingleCond.getCondOp()));
        }
        if (conditionValue instanceof String) {
            return col.rlike((String) conditionValue);
        }
        throw new Exception(String.format("条件操作[%1$s]仅支持字符串条件值", iPSDEDFJoinSingleCond.getCondOp()));
    }

    protected Object getConditionValue(Dataset<Row> dataset, Dataset<Row> dataset2, Map<String, String> map, IPSDEDFJoinSingleCond iPSDEDFJoinSingleCond) throws Throwable {
        if (PSModelEnums.DEDataFlowCondValueType.DATASTREAM.value.equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondValueType())) {
            if (!StringUtils.hasLength(iPSDEDFJoinSingleCond.getCondValue())) {
                throw new Exception(String.format("条件项[%1$s]未指定数据流列", iPSDEDFJoinSingleCond.getName()));
            }
            String str = map.get(String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM.value, iPSDEDFJoinSingleCond.getCondValue()).toUpperCase());
            if (StringUtils.hasLength(str)) {
                return dataset.col(str);
            }
            throw new Exception(String.format("数据流未存在列[%1$s]", iPSDEDFJoinSingleCond.getCondValue()));
        }
        if (!PSModelEnums.DEDataFlowCondValueType.DATASTREAM2.value.equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondValueType())) {
            if (PSModelEnums.DEDataFlowCondValueType.SRCVALUE.value.equalsIgnoreCase(iPSDEDFJoinSingleCond.getCondValueType())) {
                return iPSDEDFJoinSingleCond.getCondValue();
            }
            throw new Exception(String.format("无法识别的条件值类型[%1$s]", iPSDEDFJoinSingleCond.getCondValueType()));
        }
        if (!StringUtils.hasLength(iPSDEDFJoinSingleCond.getCondValue())) {
            throw new Exception(String.format("条件项[%1$s]未指定数据流2列", iPSDEDFJoinSingleCond.getName()));
        }
        String str2 = map.get(String.format("%1$s.%2$s", PSModelEnums.DEDataFlowParamValueType.DATASTREAM2.value, iPSDEDFJoinSingleCond.getCondValue()).toUpperCase());
        if (StringUtils.hasLength(str2)) {
            return dataset2.col(str2);
        }
        throw new Exception(String.format("数据流2未存在列[%1$s]", iPSDEDFJoinSingleCond.getCondValue()));
    }
}
