package net.ibizsys.dataflow.spark.ba;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import net.ibizsys.dataflow.core.ba.PSSysBDSchemeEngineBase;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.ba.IPSSysBDScheme;
import net.ibizsys.model.ba.IPSSysBDTable;
import net.ibizsys.model.dataentity.IPSDataEntity;
import net.ibizsys.model.dataentity.defield.IPSDEFGroup;
import net.ibizsys.model.dataentity.defield.IPSDEField;
import net.ibizsys.model.dataentity.der.IPSDER1N;
import net.ibizsys.model.dataentity.der.IPSDERBase;
import net.ibizsys.model.engine.util.IAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/ibizsys/dataflow/spark/ba/SparkPSSysBDSchemeEngine.class */
public class SparkPSSysBDSchemeEngine<M extends IPSSysBDScheme> extends PSSysBDSchemeEngineBase<M> implements ISparkPSSysBDSchemeEngine<M> {
    private static final Log log = LogFactory.getLog(SparkPSSysBDSchemeEngine.class);

    @Override // net.ibizsys.dataflow.spark.ba.ISparkPSSysBDSchemeEngine
    public boolean isEnableDataStream() {
        return false;
    }

    @Override // net.ibizsys.dataflow.spark.ba.ISparkPSSysBDSchemeEngine
    public DataFrameReader getDataFrameReader(final SparkSession sparkSession, final Object obj) {
        return (DataFrameReader) executeAction("获取数据源读取对象", new IAction<DataFrameReader>() { // from class: net.ibizsys.dataflow.spark.ba.SparkPSSysBDSchemeEngine.1
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public DataFrameReader m3execute(Object[] objArr) throws Throwable {
                return SparkPSSysBDSchemeEngine.this.onGetDataFrameReader(sparkSession, obj);
            }
        }, DataFrameReader.class);
    }

    protected DataFrameReader onGetDataFrameReader(SparkSession sparkSession, Object obj) throws Throwable {
        String dBName = getDBName();
        if (!StringUtils.hasLength(dBName)) {
            throw new Exception(String.format("未指定数据库名称", new Object[0]));
        }
        String str = null;
        IPSDataEntity iPSDataEntity = null;
        if (obj != null) {
            if (obj instanceof String) {
                str = (String) obj;
            } else if (obj instanceof IPSSysBDTable) {
                str = ((IPSSysBDTable) obj).getName();
            } else if (obj instanceof IPSDataEntity) {
                iPSDataEntity = (IPSDataEntity) obj;
            } else {
                if (!(obj instanceof IPSDEFGroup)) {
                    throw new Exception(String.format("无法识别的目标对象[%1$s]", obj));
                }
                iPSDataEntity = (IPSDataEntity) ((IPSDEFGroup) obj).getParentPSModelObject(IPSDataEntity.class);
            }
            if (iPSDataEntity != null) {
                str = iPSDataEntity.getTableName();
            }
        }
        String realDBObjName = getRealDBObjName(str);
        if (PSModelEnums.BDType.MONGODB.value.equalsIgnoreCase(getDBType())) {
            DataFrameReader format = sparkSession.read().format("mongodb");
            format.option("connection.uri", getServiceUrl());
            format.option("database", dBName);
            if (StringUtils.hasLength(realDBObjName)) {
                format.option("collection", realDBObjName);
            }
            return format;
        }
        if (!PSModelEnums.BDType.NEO4J.value.equalsIgnoreCase(getDBType())) {
            throw new Exception(String.format("无法识别的大数据库类型[%1$s]", getDBType()));
        }
        DataFrameReader format2 = sparkSession.read().format("org.neo4j.spark.DataSource");
        format2.option("url", getServiceUrl());
        if (StringUtils.hasLength(getClientId())) {
            format2.option("authentication.basic.username", getClientId());
        }
        if (StringUtils.hasLength(getClientSecret())) {
            format2.option("authentication.basic.password", getClientSecret());
        }
        return format2;
    }

    @Override // net.ibizsys.dataflow.spark.ba.ISparkPSSysBDSchemeEngine
    public DataFrameWriter<Row> getDataFrameWriter(final SparkSession sparkSession, final Dataset<Row> dataset, final Object obj, final SaveMode saveMode) {
        return (DataFrameWriter) executeAction("获取数据源写入对象", new IAction<DataFrameWriter>() { // from class: net.ibizsys.dataflow.spark.ba.SparkPSSysBDSchemeEngine.2
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public DataFrameWriter m4execute(Object[] objArr) throws Throwable {
                return SparkPSSysBDSchemeEngine.this.onGetDataFrameWriter(sparkSession, dataset, obj, saveMode);
            }
        }, DataFrameWriter.class);
    }

    protected DataFrameWriter<Row> onGetDataFrameWriter(SparkSession sparkSession, Dataset<Row> dataset, Object obj, SaveMode saveMode) throws Throwable {
        String dBName = getDBName();
        if (!StringUtils.hasLength(dBName)) {
            throw new Exception(String.format("未指定数据库名称", new Object[0]));
        }
        String str = null;
        IPSDataEntity iPSDataEntity = null;
        if (obj != null) {
            if (obj instanceof String) {
                str = (String) obj;
            } else if (obj instanceof IPSSysBDTable) {
                str = ((IPSSysBDTable) obj).getName();
            } else if (obj instanceof IPSDataEntity) {
                iPSDataEntity = (IPSDataEntity) obj;
            } else {
                if (!(obj instanceof IPSDEFGroup)) {
                    throw new Exception(String.format("无法识别的目标对象[%1$s]", obj));
                }
                iPSDataEntity = (IPSDataEntity) ((IPSDEFGroup) obj).getParentPSModelObject(IPSDataEntity.class);
            }
            if (iPSDataEntity != null) {
                str = iPSDataEntity.getTableName();
            }
        }
        String realDBObjName = getRealDBObjName(str);
        if (PSModelEnums.BDType.MONGODB.value.equalsIgnoreCase(getDBType())) {
            DataFrameWriter<Row> format = dataset.write().format("mongodb");
            format.option("connection.uri", getServiceUrl());
            format.option("database", dBName);
            if (StringUtils.hasLength(realDBObjName)) {
                format.option("collection", realDBObjName);
            }
            if (saveMode != null) {
                format.mode(saveMode);
            }
            return format;
        }
        if (!PSModelEnums.BDType.NEO4J.value.equalsIgnoreCase(getDBType())) {
            throw new Exception(String.format("无法识别的大数据库类型[%1$s]", getDBType()));
        }
        DataFrameWriter<Row> format2 = dataset.write().format("org.neo4j.spark.DataSource");
        format2.option("url", getServiceUrl());
        if (StringUtils.hasLength(getClientId())) {
            format2.option("authentication.basic.username", getClientId());
        }
        if (StringUtils.hasLength(getClientSecret())) {
            format2.option("authentication.basic.password", getClientSecret());
        }
        if (iPSDataEntity != null) {
            if (iPSDataEntity.getDEType() == PSModelEnums.DEType.RELATED.value) {
                format2.option("relationship", realDBObjName);
                format2.option("relationship.save.strategy", "keys");
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                List<IPSDER1N> minorPSDERs = iPSDataEntity.getMinorPSDERs();
                if (!ObjectUtils.isEmpty(minorPSDERs)) {
                    for (IPSDER1N ipsder1n : minorPSDERs) {
                        if ("SOURCE".equalsIgnoreCase(ipsder1n.getCodeName())) {
                            linkedHashMap.put("SOURCE", ipsder1n);
                        } else if ("TARGET".equalsIgnoreCase(ipsder1n.getCodeName())) {
                            linkedHashMap.put("TARGET", ipsder1n);
                        } else {
                            String name = ipsder1n.getPSPickupDEFieldMust().getName();
                            if ("SOURCE".equalsIgnoreCase(name)) {
                                linkedHashMap.put("SOURCE", ipsder1n);
                            } else if ("TARGET".equalsIgnoreCase(name)) {
                                linkedHashMap.put("TARGET", ipsder1n);
                            }
                        }
                    }
                    if (linkedHashMap.size() != 2) {
                        linkedHashMap.clear();
                        boolean z = true;
                        Iterator it = minorPSDERs.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            IPSDERBase iPSDERBase = (IPSDERBase) it.next();
                            if (!z) {
                                linkedHashMap.put("TARGET", iPSDERBase);
                                break;
                            }
                            z = false;
                            linkedHashMap.put("SOURCE", iPSDERBase);
                        }
                    }
                    if (linkedHashMap.size() != 2) {
                        throw new Exception("未指定源及目标关系");
                    }
                    IPSDER1N ipsder1n2 = (IPSDER1N) linkedHashMap.get("SOURCE");
                    format2.option("relationship.source.labels", getRealDBObjName(ipsder1n2.getMajorPSDataEntityMust().getTableName()));
                    format2.option("relationship.source.node.keys", String.format("%1$s:%2$s", getRealDBObjName(ipsder1n2.getPSPickupDEFieldMust().getName()), getRealDBObjName(ipsder1n2.getPSPickupDEFieldMust().getRelatedPSDEFieldMust().getName())));
                    format2.option("relationship.source.save.mode", "Overwrite");
                    IPSDER1N ipsder1n3 = (IPSDER1N) linkedHashMap.get("TARGET");
                    format2.option("relationship.target.labels", getRealDBObjName(ipsder1n3.getMajorPSDataEntityMust().getTableName()));
                    format2.option("relationship.target.node.keys", String.format("%1$s:%2$s", getRealDBObjName(ipsder1n3.getPSPickupDEFieldMust().getName()), getRealDBObjName(ipsder1n3.getPSPickupDEFieldMust().getRelatedPSDEFieldMust().getName())));
                    format2.option("relationship.target.save.mode", "Overwrite");
                    ArrayList arrayList = new ArrayList();
                    Iterator it2 = iPSDataEntity.getAllPSDEFields().iterator();
                    while (it2.hasNext()) {
                        arrayList.add(getRealDBObjName(((IPSDEField) it2.next()).getName()));
                    }
                    format2.option("relationship.properties", StringUtils.collectionToDelimitedString(arrayList, ","));
                }
            } else {
                format2.option("labels", realDBObjName);
                format2.option("node.keys", getRealDBObjName(iPSDataEntity.getKeyPSDEFieldMust().getName()));
            }
            if (saveMode != null) {
                format2.mode(saveMode);
            }
        }
        return format2;
    }

    @Override // net.ibizsys.dataflow.spark.ba.ISparkPSSysBDSchemeEngine
    public DataStreamReader getDataStreamReader(final SparkSession sparkSession, final Object obj) {
        return (DataStreamReader) executeAction("获取数据源读取对象", new IAction<DataStreamReader>() { // from class: net.ibizsys.dataflow.spark.ba.SparkPSSysBDSchemeEngine.3
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public DataStreamReader m5execute(Object[] objArr) throws Throwable {
                return SparkPSSysBDSchemeEngine.this.onGetDataStreamReader(sparkSession, obj);
            }
        }, DataStreamReader.class);
    }

    protected DataStreamReader onGetDataStreamReader(SparkSession sparkSession, Object obj) throws Throwable {
        throw new Exception(String.format("无法识别的大数据库类型[%1$s]", getDBType()));
    }

    @Override // net.ibizsys.dataflow.spark.ba.ISparkPSSysBDSchemeEngine
    public DataStreamWriter<Row> getDataStreamWriter(final SparkSession sparkSession, final Dataset<Row> dataset, final Object obj, final SaveMode saveMode) {
        return (DataStreamWriter) executeAction("获取数据源写入对象", new IAction<DataStreamWriter>() { // from class: net.ibizsys.dataflow.spark.ba.SparkPSSysBDSchemeEngine.4
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public DataStreamWriter m6execute(Object[] objArr) throws Throwable {
                return SparkPSSysBDSchemeEngine.this.onGetDataStreamWriter(sparkSession, dataset, Row.class, obj, saveMode);
            }
        }, DataStreamWriter.class);
    }

    protected <T> DataStreamWriter<T> onGetDataStreamWriter(SparkSession sparkSession, Dataset<T> dataset, Class<T> cls, Object obj, SaveMode saveMode) throws Throwable {
        throw new Exception(String.format("无法识别的大数据库类型[%1$s]", getDBType()));
    }
}
