package net.ibizsys.dataflow.spark.dataentity.dataflow;

import java.lang.invoke.SerializedLambda;
import java.sql.Timestamp;
import java.util.LinkedHashMap;
import net.ibizsys.dataflow.spark.dataentity.service.ISparkPSDEMethodDTOEngine;
import net.ibizsys.dataflow.spark.eai.ISparkPSSysDataSyncAgentEngine;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.dataentity.dataflow.IPSDEDFSysDataSyncAgentSinkNode;
import net.ibizsys.model.engine.util.JsonUtils;
import net.ibizsys.model.engine.util.domain.DataSyncOut;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/ibizsys/dataflow/spark/dataentity/dataflow/SparkPSDEDFSysDataSyncAgentSinkNodeAddin.class */
public class SparkPSDEDFSysDataSyncAgentSinkNodeAddin extends SparkPSDEDataFlowSinkNodeAddinBase<ISparkPSDEDataFlowEngine<?>, IPSDEDFSysDataSyncAgentSinkNode> {
    private static final Log log = LogFactory.getLog(SparkPSDEDFSysDataSyncAgentSinkNodeAddin.class);
    private PSModelEnums.DEDataFlowSysDataSyncAgentSinkType subType = PSModelEnums.DEDataFlowSysDataSyncAgentSinkType.RAW;
    public static final String PACKAGE_AGENT = "agent";
    public static final String PACKAGE_DATA = "data";

    protected void onPrepareSetting() throws Exception {
        if (StringUtils.hasLength(getPSDEDataFlowNode().getSubType())) {
            setSubType(PSModelEnums.DEDataFlowSysDataSyncAgentSinkType.from(getPSDEDataFlowNode().getSubType()));
        }
        super.onPrepareSetting();
    }

    public PSModelEnums.DEDataFlowSysDataSyncAgentSinkType getSubType() {
        return this.subType;
    }

    protected void setSubType(PSModelEnums.DEDataFlowSysDataSyncAgentSinkType dEDataFlowSysDataSyncAgentSinkType) {
        this.subType = dEDataFlowSysDataSyncAgentSinkType;
    }

    @Override // net.ibizsys.dataflow.spark.dataentity.dataflow.SparkPSDEDataFlowSinkNodeAddinBase
    protected StreamingQuery onGetStreamingQuery(ISparkPSDEDataFlowSession<?> iSparkPSDEDataFlowSession) throws Throwable {
        IPSDEDFSysDataSyncAgentSinkNode pSDEDataFlowNode = getPSDEDataFlowNode();
        Dataset<Row> dataset = iSparkPSDEDataFlowSession.getDataset(getDataStream(false));
        DataStreamWriter<?> dataStreamWriter = ((ISparkPSSysDataSyncAgentEngine) getPSModelEngineHolder().getPSModelEngine(pSDEDataFlowNode.getPSSysDataSyncAgentMust(), ISparkPSSysDataSyncAgentEngine.class)).getDataStreamWriter(iSparkPSDEDataFlowSession.getSparkSession(), toStringDataset(iSparkPSDEDataFlowSession, dataset), String.class);
        if (dataStreamWriter == null) {
            return null;
        }
        return dataStreamWriter.start();
    }

    protected Dataset<String> toStringDataset(ISparkPSDEDataFlowSession<?> iSparkPSDEDataFlowSession, Dataset<Row> dataset) throws Exception {
        if (getSubType() == PSModelEnums.DEDataFlowSysDataSyncAgentSinkType.RAW) {
            return dataset.map(row -> {
                return row.json();
            }, Encoders.STRING());
        }
        if (getSubType() != PSModelEnums.DEDataFlowSysDataSyncAgentSinkType.DEDATASYNC) {
            throw new Exception(String.format("未支持的类型[%1$s]", getSubType().value));
        }
        Dataset<Row> dataset2 = getPSModelEngineHolder().getPSModelEngine(getPSDEDataFlowNode().getDstPSDataEntityMust().getDefaultPSDEMethodDTOMust(), ISparkPSDEMethodDTOEngine.class).getDataset(dataset);
        String id = getPSDEDataFlowNode().getDstPSDataEntityMust().getId();
        String name = getPSDEDataFlowNode().getDstPSDataEntityMust().getName();
        getPSDEDataFlowNode().getDstPSDataEntityMust().getKeyPSDEFieldMust().getName();
        int eventType = getPSDEDataFlowNode().getEventType();
        String codeName = getPSDEDataFlowNode().getPSSysDataSyncAgentMust().getCodeName();
        return dataset2.map(row2 -> {
            String json = row2.json();
            DataSyncOut dataSyncOut = new DataSyncOut();
            dataSyncOut.setEventType(Integer.valueOf(eventType));
            dataSyncOut.setDataKey((String) null);
            dataSyncOut.setData(json);
            dataSyncOut.setDEId(id);
            dataSyncOut.setDEName(name);
            dataSyncOut.setCreateDate(new Timestamp(System.currentTimeMillis()));
            String jsonUtils = JsonUtils.toString(dataSyncOut);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put(PACKAGE_AGENT, codeName);
            linkedHashMap.put(PACKAGE_DATA, jsonUtils);
            return JsonUtils.toString(linkedHashMap);
        }, Encoders.STRING());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 539033569:
                if (implMethodName.equals("lambda$toStringDataset$9bfa92bd$1")) {
                    z = false;
                    break;
                }
                break;
            case 1893794809:
                if (implMethodName.equals("lambda$toStringDataset$2adc506d$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("net/ibizsys/dataflow/spark/dataentity/dataflow/SparkPSDEDFSysDataSyncAgentSinkNodeAddin") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;Lorg/apache/spark/sql/Row;)Ljava/lang/String;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    String str = (String) serializedLambda.getCapturedArg(1);
                    String str2 = (String) serializedLambda.getCapturedArg(2);
                    String str3 = (String) serializedLambda.getCapturedArg(3);
                    return row2 -> {
                        String json = row2.json();
                        DataSyncOut dataSyncOut = new DataSyncOut();
                        dataSyncOut.setEventType(Integer.valueOf(intValue));
                        dataSyncOut.setDataKey((String) null);
                        dataSyncOut.setData(json);
                        dataSyncOut.setDEId(str);
                        dataSyncOut.setDEName(str2);
                        dataSyncOut.setCreateDate(new Timestamp(System.currentTimeMillis()));
                        String jsonUtils = JsonUtils.toString(dataSyncOut);
                        LinkedHashMap linkedHashMap = new LinkedHashMap();
                        linkedHashMap.put(PACKAGE_AGENT, str3);
                        linkedHashMap.put(PACKAGE_DATA, jsonUtils);
                        return JsonUtils.toString(linkedHashMap);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("net/ibizsys/dataflow/spark/dataentity/dataflow/SparkPSDEDFSysDataSyncAgentSinkNodeAddin") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Row;)Ljava/lang/String;")) {
                    return row -> {
                        return row.json();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
