package net.ibizsys.dataflow.spark.eai;

import net.ibizsys.dataflow.core.eai.PSSysDataSyncAgentEngineBase;
import net.ibizsys.model.PSModelEnums;
import net.ibizsys.model.engine.PSModelEngineException;
import net.ibizsys.model.engine.util.IAction;
import net.ibizsys.model.res.IPSSysDataSyncAgent;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;

/* loaded from: input_file:net/ibizsys/dataflow/spark/eai/SparkPSSysDataSyncAgentEngine.class */
public class SparkPSSysDataSyncAgentEngine extends PSSysDataSyncAgentEngineBase<IPSSysDataSyncAgent> implements ISparkPSSysDataSyncAgentEngine<IPSSysDataSyncAgent> {
    @Override // net.ibizsys.dataflow.spark.IDataStreamReaderProvider
    public DataStreamReader getDataStreamReader(final SparkSession sparkSession) {
        if (PSModelEnums.DataSyncAgentDir.IN.value.equalsIgnoreCase(getPSSysDataSyncAgent().getSyncDir()) || PSModelEnums.DataSyncAgentDir.INOUT.value.equalsIgnoreCase(getPSSysDataSyncAgent().getSyncDir())) {
            return (DataStreamReader) executeAction("获取数据源读取对象", new IAction<DataStreamReader>() { // from class: net.ibizsys.dataflow.spark.eai.SparkPSSysDataSyncAgentEngine.1
                /* renamed from: execute, reason: merged with bridge method [inline-methods] */
                public DataStreamReader m35execute(Object[] objArr) throws Throwable {
                    return SparkPSSysDataSyncAgentEngine.this.onGetDataStreamReader(sparkSession);
                }
            }, DataStreamReader.class);
        }
        throw new PSModelEngineException(this, String.format("未启用输入代理功能", new Object[0]));
    }

    protected DataStreamReader onGetDataStreamReader(SparkSession sparkSession) throws Throwable {
        if (PSModelEnums.DataSyncAgentType.KAFKA.value.equalsIgnoreCase(getAgentType())) {
            return sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", getServiceUrl()).option("subscribe", getDefaultTopic()).option("kafka.group.id", getGroupId());
        }
        throw new Exception(String.format("未支持代理类型[%1$s]", getAgentType()));
    }

    @Override // net.ibizsys.dataflow.spark.IDataStreamWriterProvider
    public <T> DataStreamWriter<?> getDataStreamWriter(final SparkSession sparkSession, final Dataset<T> dataset, final Class<T> cls) {
        if (PSModelEnums.DataSyncAgentDir.OUT.value.equalsIgnoreCase(getPSSysDataSyncAgent().getSyncDir()) || PSModelEnums.DataSyncAgentDir.INOUT.value.equalsIgnoreCase(getPSSysDataSyncAgent().getSyncDir())) {
            return (DataStreamWriter) executeAction("获取数据源写入对象", new IAction<DataStreamWriter>() { // from class: net.ibizsys.dataflow.spark.eai.SparkPSSysDataSyncAgentEngine.2
                /* renamed from: execute, reason: merged with bridge method [inline-methods] */
                public DataStreamWriter m36execute(Object[] objArr) throws Throwable {
                    return SparkPSSysDataSyncAgentEngine.this.onGetDataStreamWriter(sparkSession, dataset, cls);
                }
            }, DataStreamWriter.class);
        }
        throw new PSModelEngineException(this, String.format("未启用输出功能", new Object[0]));
    }

    protected <T> DataStreamWriter<?> onGetDataStreamWriter(SparkSession sparkSession, Dataset<T> dataset, Class<T> cls) throws Throwable {
        if (!PSModelEnums.DataSyncAgentType.KAFKA.value.equalsIgnoreCase(getAgentType())) {
            throw new Exception(String.format("未支持代理类型[%1$s]", getAgentType()));
        }
        if (dataset.isStreaming()) {
            return dataset.writeStream().format("kafka").option("kafka.bootstrap.servers", getServiceUrl()).option("topic", getDefaultTopic());
        }
        dataset.write().format("kafka").option("kafka.bootstrap.servers", getServiceUrl()).option("topic", getDefaultTopic()).save();
        return null;
    }
}
