package net.ibizsys.dataflow.spark.dataentity.dataflow;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import net.ibizsys.dataflow.spark.util.GroovyExpression;
import net.ibizsys.dataflow.spark.util.SparkDataTypeUtil;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFPrepareProcessNode;
import net.ibizsys.model.dataentity.dataflow.IPSDEDataFlowFilterCond;
import net.ibizsys.model.dataentity.dataflow.IPSDEDataFlowNodeParam;
import net.ibizsys.model.engine.IPSModelEngine;
import net.ibizsys.model.engine.IPSModelEngineHolder;
import net.ibizsys.model.util.DataTypeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/ibizsys/dataflow/spark/dataentity/dataflow/SparkPSDEDFPrepareProcessNodeAddin.class */
public class SparkPSDEDFPrepareProcessNodeAddin extends SparkPSDEDataFlowProcessNodeAddinBase {
    private static final Log log = LogFactory.getLog(SparkPSDEDFPrepareProcessNodeAddin.class);

    public void init(IPSModelEngineHolder iPSModelEngineHolder, IPSModelEngine iPSModelEngine, Object obj) throws Exception {
        if (!(obj instanceof IPSDEDFPrepareProcessNode)) {
            throw new Exception(String.format("模型类型不正确", new Object[0]));
        }
        super.init(iPSModelEngineHolder, iPSModelEngine, obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getAddinData, reason: merged with bridge method [inline-methods] */
    public IPSDEDFPrepareProcessNode m21getAddinData() {
        return (IPSDEDFPrepareProcessNode) super.getAddinData();
    }

    /* renamed from: getPSDEDataFlowNode, reason: merged with bridge method [inline-methods] */
    public IPSDEDFPrepareProcessNode m20getPSDEDataFlowNode() {
        return m21getAddinData();
    }

    protected void onInit() throws Exception {
        super.onInit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // net.ibizsys.dataflow.spark.dataentity.dataflow.SparkPSDEDataFlowProcessNodeAddinBase
    public Dataset<Row> onGetDataset(ISparkPSDEDataFlowSession iSparkPSDEDataFlowSession) throws Throwable {
        IPSDEDFPrepareProcessNode m20getPSDEDataFlowNode = m20getPSDEDataFlowNode();
        List<IPSDEDataFlowNodeParam> pSDEDataFlowNodeParams = m20getPSDEDataFlowNode().getPSDEDataFlowNodeParams();
        if (ObjectUtils.isEmpty(pSDEDataFlowNodeParams)) {
            throw new Exception(String.format("未指定节点参数", new Object[0]));
        }
        Dataset<Row> dataset = iSparkPSDEDataFlowSession.getDataset(getDataStream(false));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (dataset.columns() != null && dataset.columns().length > 0) {
            for (String str : dataset.columns()) {
                String upperCase = str.toUpperCase();
                if (linkedHashMap.containsKey(upperCase)) {
                    throw new Exception(String.format("数据流列[%1$s]重复", str));
                }
                linkedHashMap.put(upperCase, str);
            }
        }
        if (getDataStream2(true) != null) {
            Dataset<Row> dataset2 = iSparkPSDEDataFlowSession.getDataset(getDataStream2(false));
            LinkedHashMap linkedHashMap2 = new LinkedHashMap();
            if (dataset2.columns() != null && dataset2.columns().length > 0) {
                for (String str2 : dataset2.columns()) {
                    String upperCase2 = str2.toUpperCase();
                    if (linkedHashMap.containsKey(upperCase2)) {
                        String format = String.format("s2__%1$s", str2);
                        String upperCase3 = format.toUpperCase();
                        if (linkedHashMap.containsKey(upperCase3)) {
                            throw new Exception(String.format("数据流列[%1$s]重复", format));
                        }
                        log.warn(String.format("数据流2列[%1$s]转义为[%2$s]", str2, format));
                        linkedHashMap.put(upperCase3, format);
                        linkedHashMap2.put(str2, format);
                    } else {
                        linkedHashMap.put(upperCase2, str2);
                    }
                }
                if (linkedHashMap2.size() > 0) {
                    ArrayList arrayList = new ArrayList();
                    for (String str3 : dataset2.columns()) {
                        String str4 = (String) linkedHashMap2.get(str3);
                        if (StringUtils.hasLength(str4)) {
                            arrayList.add(dataset2.col(str3).alias(str4));
                        } else {
                            arrayList.add(dataset2.col(str3).alias(str3));
                        }
                    }
                    dataset2 = dataset2.select((Column[]) arrayList.toArray(new Column[arrayList.size()]));
                }
            }
            dataset = dataset.crossJoin(dataset2);
        }
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        for (IPSDEDataFlowNodeParam iPSDEDataFlowNodeParam : pSDEDataFlowNodeParams) {
            if (!PSModelEnums.DEDataFlowNodeParamType.PREPAREPARAM.value.equals(iPSDEDataFlowNodeParam.getNodeParamType())) {
                throw new Exception(String.format("节点参数[%1$s]类型不正确", iPSDEDataFlowNodeParam.getName()));
            }
            String dstField = iPSDEDataFlowNodeParam.getDstField();
            if (!StringUtils.hasLength(dstField)) {
                dstField = iPSDEDataFlowNodeParam.getSrcField();
                if (!StringUtils.hasLength(dstField)) {
                    throw new Exception(String.format("节点参数[%1$s]未指定目标列名", iPSDEDataFlowNodeParam.getName()));
                }
            }
            String upperCase4 = dstField.toUpperCase();
            if (linkedHashMap3.containsKey(upperCase4)) {
                throw new Exception(String.format("节点参数[%1$s]指定数据流列[%2$s]重复", iPSDEDataFlowNodeParam.getName(), dstField));
            }
            String srcValueType = iPSDEDataFlowNodeParam.getSrcValueType();
            if (PSModelEnums.DEDataFlowParamValueType.DATASTREAM.value.equals(srcValueType)) {
                linkedHashMap3.put(upperCase4, dataset.col(iPSDEDataFlowNodeParam.getSrcField()).alias(dstField));
            } else if (PSModelEnums.DEDataFlowParamValueType.EXPRESSION.value.equals(srcValueType)) {
                if (!StringUtils.hasLength(iPSDEDataFlowNodeParam.getSrcField())) {
                    throw new Exception(String.format("节点参数[%1$s]未指定源列", iPSDEDataFlowNodeParam.getName()));
                }
                String[] split = iPSDEDataFlowNodeParam.getSrcField().equals("*") ? (String[]) linkedHashMap.values().toArray(new String[linkedHashMap.size()]) : iPSDEDataFlowNodeParam.getSrcField().replace(";", ",").split("[,]");
                UserDefinedFunction udf = functions.udf(GroovyExpression.from(split, iPSDEDataFlowNodeParam.getExpression(), iPSDEDataFlowNodeParam.getSrcValueStdDataType()), SparkDataTypeUtil.fromStdDataType(iPSDEDataFlowNodeParam.getSrcValueStdDataType()));
                Column[] columnArr = new Column[split.length];
                for (int i = 0; i < split.length; i++) {
                    if (!linkedHashMap.containsKey(split[i].toUpperCase())) {
                        throw new Exception(String.format("节点参数[%1$s]指定列[%2$s]不存在", iPSDEDataFlowNodeParam.getName(), split[i]));
                    }
                    columnArr[i] = dataset.col(split[i]);
                }
                linkedHashMap3.put(upperCase4, udf.apply(new Column[]{functions.struct(columnArr)}).alias(dstField));
            } else if (!PSModelEnums.DEDataFlowParamValueType.SRCVALUE.value.equals(srcValueType)) {
                if (!PSModelEnums.DEDataFlowParamValueType.DATACONTEXT.value.equals(srcValueType) && !PSModelEnums.DEDataFlowParamValueType.SESSION.value.equals(srcValueType)) {
                    throw new Exception(String.format("未支持的源值类型[%1$s]", srcValueType));
                }
                if (!StringUtils.hasLength(iPSDEDataFlowNodeParam.getSrcField())) {
                    throw new Exception(String.format("节点参数[%1$s]未指定上下文属性", iPSDEDataFlowNodeParam.getName()));
                }
                Object contextValue = iSparkPSDEDataFlowSession.getContextValue(PSModelEnums.DEDataFlowParamValueType.DATACONTEXT.value, iPSDEDataFlowNodeParam.getSrcField());
                if (iPSDEDataFlowNodeParam.getSrcValueStdDataType() == 0) {
                    linkedHashMap3.put(upperCase4, functions.lit(contextValue).alias(dstField));
                } else if (contextValue instanceof String) {
                    linkedHashMap3.put(upperCase4, functions.lit(DataTypeUtils.parse(iPSDEDataFlowNodeParam.getSrcValueStdDataType(), (String) contextValue)).alias(dstField));
                } else {
                    linkedHashMap3.put(upperCase4, functions.lit(DataTypeUtils.convert(iPSDEDataFlowNodeParam.getSrcValueStdDataType(), contextValue)).alias(dstField));
                }
            } else if (iPSDEDataFlowNodeParam.getSrcValueStdDataType() != 0) {
                linkedHashMap3.put(upperCase4, functions.lit(DataTypeUtils.parse(iPSDEDataFlowNodeParam.getSrcValueStdDataType(), iPSDEDataFlowNodeParam.getSrcValue())).alias(dstField));
            } else {
                linkedHashMap3.put(upperCase4, functions.lit(iPSDEDataFlowNodeParam.getSrcValue()).alias(dstField));
            }
        }
        ArrayList arrayList2 = new ArrayList();
        if (m20getPSDEDataFlowNode.isReselectColumn() || dataset.columns() == null || dataset.columns().length <= 0) {
            arrayList2.addAll(linkedHashMap3.values());
        } else {
            for (String str5 : dataset.columns()) {
                Column column = (Column) linkedHashMap3.remove(str5.toUpperCase());
                if (column != null) {
                    arrayList2.add(column);
                } else {
                    arrayList2.add(dataset.col(str5).alias(str5));
                }
            }
            arrayList2.addAll(linkedHashMap3.values());
        }
        if (!ObjectUtils.isEmpty(arrayList2)) {
            dataset = dataset.select((Column[]) arrayList2.toArray(new Column[arrayList2.size()]));
        }
        if (m20getPSDEDataFlowNode.getPSDEDataFlowNodeFilter() != null) {
            dataset = dataset.where(getFilterCondition((ISparkPSDEDataFlowSession<?>) iSparkPSDEDataFlowSession, dataset, (IPSDEDataFlowFilterCond) m20getPSDEDataFlowNode.getPSDEDataFlowNodeFilter()));
        }
        return dataset;
    }
}
