package it.agilelab.bigdata.wasp.consumers.spark.plugins.http;

import it.agilelab.bigdata.wasp.consumers.spark.utils.CompressExpression$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.HttpCompression;
import it.agilelab.bigdata.wasp.models.HttpCompression$Disabled$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Gzip$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Lz4$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Snappy$;
import it.agilelab.bigdata.wasp.models.HttpModel;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: HttpWaspWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001=4Aa\u0002\u0005\u00013!Aa\u0006\u0001B\u0001B\u0003%q\u0006C\u00036\u0001\u0011\u0005a\u0007C\u0004;\u0001\t\u0007I\u0011B\u001e\t\r\u0011\u0003\u0001\u0015!\u0003=\u0011\u0015)\u0005\u0001\"\u0011G\u0011\u0019a\u0007\u0001\"\u0005\u0011[\nq\u0001\n\u001e;q/\u0006\u001c\bo\u0016:ji\u0016\u0014(BA\u0005\u000b\u0003\u0011AG\u000f\u001e9\u000b\u0005-a\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u001b9\tQa\u001d9be.T!a\u0004\t\u0002\u0013\r|gn];nKJ\u001c(BA\t\u0013\u0003\u00119\u0018m\u001d9\u000b\u0005M!\u0012a\u00022jO\u0012\fG/\u0019\u0006\u0003+Y\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002/\u0005\u0011\u0011\u000e^\u0002\u0001'\u0011\u0001!\u0004\t\u0014\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\r\u0005s\u0017PU3g!\t\tC%D\u0001#\u0015\t\u0019C\"A\u0004xe&$XM]:\n\u0005\u0015\u0012#AH*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3s!\t9C&D\u0001)\u0015\tI#&A\u0004m_\u001e<\u0017N\\4\u000b\u0005-\u0002\u0012\u0001B2pe\u0016L!!\f\u0015\u0003\u000f1{wmZ5oO\u0006I\u0001\u000e\u001e;q\u001b>$W\r\u001c\t\u0003aMj\u0011!\r\u0006\u0003eA\ta!\\8eK2\u001c\u0018B\u0001\u001b2\u0005%AE\u000f\u001e9N_\u0012,G.\u0001\u0004=S:LGO\u0010\u000b\u0003oe\u0002\"\u0001\u000f\u0001\u000e\u0003!AQA\f\u0002A\u0002=\n!B^1m\u0007>dg*Y7f+\u0005a\u0004CA\u001fC\u001b\u0005q$BA A\u0003\u0011a\u0017M\\4\u000b\u0003\u0005\u000bAA[1wC&\u00111I\u0010\u0002\u0007'R\u0014\u0018N\\4\u0002\u0017Y\fGnQ8m\u001d\u0006lW\rI\u0001\u0006oJLG/\u001a\u000b\u0003\u000fb\u00032\u0001\u0013*U\u001b\u0005I%B\u0001&L\u0003%\u0019HO]3b[&twM\u0003\u0002M\u001b\u0006\u00191/\u001d7\u000b\u00055q%BA(Q\u0003\u0019\t\u0007/Y2iK*\t\u0011+A\u0002pe\u001eL!aU%\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\bCA+W\u001b\u0005Y\u0015BA,L\u0005\r\u0011vn\u001e\u0005\u00063\u0016\u0001\rAW\u0001\u0007gR\u0014X-Y7\u0011\u0005mKgB\u0001/h\u001d\tifM\u0004\u0002_K:\u0011q\f\u001a\b\u0003A\u000el\u0011!\u0019\u0006\u0003Eb\ta\u0001\u0010:p_Rt\u0014\"A)\n\u0005=\u0003\u0016BA\u0007O\u0013\taU*\u0003\u0002i\u0017\u00069\u0001/Y2lC\u001e,\u0017B\u00016l\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0002i\u0017\u0006I\u0001O]3qCJ,GI\u0012\u000b\u00035:DQ!\u0017\u0004A\u0002i\u0003")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/http/HttpWaspWriter.class */
public class HttpWaspWriter implements SparkStructuredStreamingWriter, Logging {
    private final HttpModel httpModel;
    private final String valColName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    private String valColName() {
        return this.valColName;
    }

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        Dataset<Row> prepareDF = prepareDF(dataset);
        return prepareDF.writeStream().foreach(HttpWriter$.MODULE$.apply(this.httpModel, valColName()));
    }

    public Dataset<Row> prepareDF(Dataset<Row> dataset) {
        None$ some;
        Column column;
        Column column2;
        HttpCompression compression = this.httpModel.compression();
        if (HttpCompression$Disabled$.MODULE$.equals(compression)) {
            some = None$.MODULE$;
        } else {
            if (!HttpCompression$Gzip$.MODULE$.equals(compression)) {
                if (HttpCompression$Snappy$.MODULE$.equals(compression)) {
                    throw new IllegalArgumentException("Unsupported compression format: snappy");
                }
                if (HttpCompression$Lz4$.MODULE$.equals(compression)) {
                    throw new IllegalArgumentException("Unsupported compression format: lz4");
                }
                throw new MatchError(compression);
            }
            some = new Some("gzip");
        }
        None$ none$ = some;
        List list = this.httpModel.valueFieldsNames().isEmpty() ? new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) this.httpModel.headersFieldName().fold(() -> {
            return dataset.schema().fieldNames();
        }, str -> {
            return (String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.schema().fieldNames())).filterNot(str -> {
                return BoxesRunTime.boxToBoolean($anonfun$prepareDF$3(str, str));
            });
        }))).toList() : this.httpModel.valueFieldsNames();
        Configuration hadoopConfiguration = dataset.sparkSession().sparkContext().hadoopConfiguration();
        if (list instanceof $colon.colon) {
            $colon.colon colonVar = ($colon.colon) list;
            String str2 = (String) colonVar.head();
            if (Nil$.MODULE$.equals(colonVar.tl$access$1())) {
                Some map = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.schema().fields())).find(structField -> {
                    return BoxesRunTime.boxToBoolean($anonfun$prepareDF$4(str2, structField));
                }).map(structField2 -> {
                    return structField2.dataType();
                });
                if (((map instanceof Some) && (map.value() instanceof MapType)) ? true : (map instanceof Some) && (map.value() instanceof ArrayType)) {
                    column2 = this.httpModel.structured() ? functions$.MODULE$.to_json(functions$.MODULE$.struct(str2, Predef$.MODULE$.wrapRefArray(new String[0]))) : functions$.MODULE$.to_json(functions$.MODULE$.col(str2));
                } else if ((map instanceof Some) && (map.value() instanceof StructType)) {
                    column2 = functions$.MODULE$.to_json(functions$.MODULE$.col(str2));
                } else {
                    if (None$.MODULE$.equals(map)) {
                        throw new IllegalArgumentException(new StringBuilder(51).append("Cannot find column ").append(str2).append(" inside data frame, columns are ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.schema().fields())).mkString("[", ",", "]")).toString());
                    }
                    column2 = functions$.MODULE$.to_json(functions$.MODULE$.struct(str2, Predef$.MODULE$.wrapRefArray(new String[0])));
                }
                column = column2;
                Column cast = column.cast(BinaryType$.MODULE$);
                return dataset.select((Seq) Option$.MODULE$.option2Iterable(this.httpModel.headersFieldName().map(str3 -> {
                    Column as;
                    boolean z = false;
                    MapType mapType = null;
                    ArrayType dataType = ((StructField) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.schema().fields())).find(structField3 -> {
                        return BoxesRunTime.boxToBoolean($anonfun$prepareDF$10(str3, structField3));
                    }).getOrElse(() -> {
                        throw new RuntimeException(new StringBuilder(27).append("Cannot find header column: ").append(str3).toString());
                    })).dataType();
                    if (dataType instanceof MapType) {
                        z = true;
                        mapType = (MapType) dataType;
                        DataType keyType = mapType.keyType();
                        DataType valueType = mapType.valueType();
                        if (StringType$.MODULE$.equals(keyType) && StringType$.MODULE$.equals(valueType)) {
                            as = functions$.MODULE$.col(str3);
                            return as;
                        }
                    }
                    if (!z) {
                        if (dataType instanceof ArrayType) {
                            StructType elementType = dataType.elementType();
                            if (elementType instanceof StructType) {
                                StructType structType = elementType;
                                Option unapplySeq = Array$.MODULE$.unapplySeq(structType.fields());
                                if (!unapplySeq.isEmpty() && unapplySeq.get() != null && ((SeqLike) unapplySeq.get()).lengthCompare(2) == 0) {
                                    this.logger().warn(() -> {
                                        return new StringBuilder(92).append("header column ").append(str3).append(" is not of type Map[String, String] but it is ").append("Array[").append(structType).append("] a cast will be performed").toString();
                                    });
                                    as = functions$.MODULE$.map_from_entries(functions$.MODULE$.col(str3)).cast(MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)).as(str3);
                                }
                            }
                        }
                        throw new RuntimeException(new StringBuilder(56).append("column ").append(str3).append(" is of type ").append(dataType).append(" which cannot ").append("be used as http headers").toString());
                    }
                    DataType keyType2 = mapType.keyType();
                    DataType valueType2 = mapType.valueType();
                    this.logger().warn(() -> {
                        return new StringBuilder(92).append("header column ").append(str3).append(" is not of type Map[String, String] but it is ").append("Map[").append(keyType2).append(", ").append(valueType2).append("] a cast will be performed").toString();
                    });
                    as = functions$.MODULE$.col(str3).cast(MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)).as(str3);
                    return as;
                })).toSeq().$colon$plus(((Column) none$.map(str4 -> {
                    return CompressExpression$.MODULE$.compress(cast, str4, hadoopConfiguration);
                }).getOrElse(() -> {
                    return cast;
                })).as(valColName()), Seq$.MODULE$.canBuildFrom()));
            }
        }
        column = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq) list.map(str5 -> {
            return functions$.MODULE$.col(str5);
        }, List$.MODULE$.canBuildFrom())));
        Column cast2 = column.cast(BinaryType$.MODULE$);
        return dataset.select((Seq) Option$.MODULE$.option2Iterable(this.httpModel.headersFieldName().map(str32 -> {
            Column as;
            boolean z = false;
            MapType mapType = null;
            ArrayType dataType = ((StructField) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.schema().fields())).find(structField3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$prepareDF$10(str32, structField3));
            }).getOrElse(() -> {
                throw new RuntimeException(new StringBuilder(27).append("Cannot find header column: ").append(str32).toString());
            })).dataType();
            if (dataType instanceof MapType) {
                z = true;
                mapType = (MapType) dataType;
                DataType keyType = mapType.keyType();
                DataType valueType = mapType.valueType();
                if (StringType$.MODULE$.equals(keyType) && StringType$.MODULE$.equals(valueType)) {
                    as = functions$.MODULE$.col(str32);
                    return as;
                }
            }
            if (!z) {
                if (dataType instanceof ArrayType) {
                    StructType elementType = dataType.elementType();
                    if (elementType instanceof StructType) {
                        StructType structType = elementType;
                        Option unapplySeq = Array$.MODULE$.unapplySeq(structType.fields());
                        if (!unapplySeq.isEmpty() && unapplySeq.get() != null && ((SeqLike) unapplySeq.get()).lengthCompare(2) == 0) {
                            this.logger().warn(() -> {
                                return new StringBuilder(92).append("header column ").append(str32).append(" is not of type Map[String, String] but it is ").append("Array[").append(structType).append("] a cast will be performed").toString();
                            });
                            as = functions$.MODULE$.map_from_entries(functions$.MODULE$.col(str32)).cast(MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)).as(str32);
                        }
                    }
                }
                throw new RuntimeException(new StringBuilder(56).append("column ").append(str32).append(" is of type ").append(dataType).append(" which cannot ").append("be used as http headers").toString());
            }
            DataType keyType2 = mapType.keyType();
            DataType valueType2 = mapType.valueType();
            this.logger().warn(() -> {
                return new StringBuilder(92).append("header column ").append(str32).append(" is not of type Map[String, String] but it is ").append("Map[").append(keyType2).append(", ").append(valueType2).append("] a cast will be performed").toString();
            });
            as = functions$.MODULE$.col(str32).cast(MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)).as(str32);
            return as;
        })).toSeq().$colon$plus(((Column) none$.map(str42 -> {
            return CompressExpression$.MODULE$.compress(cast2, str42, hadoopConfiguration);
        }).getOrElse(() -> {
            return cast2;
        })).as(valColName()), Seq$.MODULE$.canBuildFrom()));
    }

    public static final /* synthetic */ boolean $anonfun$prepareDF$3(String str, String str2) {
        return str2 != null ? str2.equals(str) : str == null;
    }

    public static final /* synthetic */ boolean $anonfun$prepareDF$4(String str, StructField structField) {
        String name = structField.name();
        return name != null ? name.equals(str) : str == null;
    }

    public static final /* synthetic */ boolean $anonfun$prepareDF$10(String str, StructField structField) {
        String name = structField.name();
        return name != null ? name.equals(str) : str == null;
    }

    public HttpWaspWriter(HttpModel httpModel) {
        this.httpModel = httpModel;
        Logging.$init$(this);
        this.valColName = "value";
    }
}
