package uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.Builder;
import uk.gov.gchq.gaffer.exception.SerialisationException;
import uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.utils.GafferGroupObjectConverter;
import uk.gov.gchq.gaffer.parquetstore.utils.ParquetStoreConstants;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.SchemaElementDefinition;
import uk.gov.gchq.gaffer.store.schema.SchemaEntityDefinition;
import uk.gov.gchq.gaffer.store.util.AggregatorUtil;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/addelements/impl/AggregateAndSortGroup.class */
public class AggregateAndSortGroup implements Callable<OperationException>, Serializable {
    private static final JSONSerialiser JSON_SERIALISER = new JSONSerialiser();
    private static final Logger LOGGER = LoggerFactory.getLogger(AggregateAndSortGroup.class);
    private static final long serialVersionUID = -7828247145178905841L;
    private static final String SORTED = "/sorted";
    private final String group;
    private final String tempFileDir;
    private final SparkSession spark;
    private final Map<String, String[]> columnToPaths;
    private final StructType sparkSchema;
    private final GafferGroupObjectConverter gafferGroupObjectConverter;
    private final String currentGraphDir;
    private final String inputDir;
    private final String outputDir;
    private final int filesPerGroup;
    private final Boolean isEntity;
    private final String[] gafferProperties;
    private final byte[] aggregatorJson;
    private final Set<String> groupByColumns;
    private final boolean aggregate;

    public AggregateAndSortGroup(String str, String str2, ParquetStore parquetStore, String str3, SparkSession sparkSession) throws SerialisationException {
        this.group = str;
        this.tempFileDir = parquetStore.getTempFilesDir();
        Schema gafferSchema = parquetStore.getSchemaUtils().getGafferSchema();
        SchemaElementDefinition element = gafferSchema.getElement(str);
        this.isEntity = Boolean.valueOf(element instanceof SchemaEntityDefinition);
        this.aggregate = element.isAggregate();
        this.groupByColumns = new HashSet(AggregatorUtil.getIngestGroupBy(str, gafferSchema));
        this.aggregatorJson = JSON_SERIALISER.serialise(element.getIngestAggregator(), new String[0]);
        this.gafferProperties = new String[element.getProperties().size()];
        element.getProperties().toArray(this.gafferProperties);
        this.spark = sparkSession;
        this.columnToPaths = parquetStore.getSchemaUtils().getColumnToPaths(str);
        this.sparkSchema = parquetStore.getSchemaUtils().getSparkSchema(str);
        this.gafferGroupObjectConverter = parquetStore.getSchemaUtils().getConverter(str);
        this.currentGraphDir = str3;
        if (this.isEntity.booleanValue()) {
            this.inputDir = ParquetStore.getGroupDirectory(str, ParquetStoreConstants.VERTEX, this.tempFileDir);
            this.outputDir = ParquetStore.getGroupDirectory(str, str2, this.tempFileDir + SORTED);
        } else {
            this.inputDir = ParquetStore.getGroupDirectory(str, ParquetStoreConstants.SOURCE, this.tempFileDir);
            this.outputDir = ParquetStore.getGroupDirectory(str, str2, this.tempFileDir + SORTED);
        }
        this.filesPerGroup = parquetStore.m3getProperties().getAddElementsOutputFilesPerGroup();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public OperationException call() {
        String str;
        try {
            FileSystem fileSystem = FileSystem.get(new Configuration());
            if (fileSystem.exists(new Path(this.inputDir))) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(this.inputDir);
                if (this.currentGraphDir != null && fileSystem.exists(new Path(this.currentGraphDir))) {
                    arrayList.add(this.currentGraphDir);
                }
                LOGGER.debug("Aggregating and sorting the data for group {} stored in directories {}", this.group, StringUtils.join(arrayList, ','));
                Dataset parquet = this.spark.read().parquet(JavaConversions.asScalaBuffer(arrayList));
                Dataset createDataFrame = this.aggregate ? this.spark.createDataFrame(parquet.javaRDD().keyBy(new ExtractKeyFromRow(this.groupByColumns, this.columnToPaths, this.isEntity.booleanValue())).reduceByKey(new AggregateGafferRowsFunction(this.gafferProperties, this.isEntity.booleanValue(), this.groupByColumns, this.columnToPaths, this.aggregatorJson, this.gafferGroupObjectConverter)).values(), this.sparkSchema) : parquet;
                Builder newBuilder = Seq$.MODULE$.newBuilder();
                Map<String, String[]> map = this.columnToPaths;
                if (this.isEntity.booleanValue()) {
                    String[] strArr = map.get(ParquetStoreConstants.VERTEX);
                    str = strArr[0];
                    if (strArr.length > 1) {
                        for (int i = 1; i < strArr.length; i++) {
                            newBuilder.$plus$eq(strArr[i]);
                        }
                    }
                } else {
                    String[] strArr2 = map.get(ParquetStoreConstants.SOURCE);
                    String[] strArr3 = map.get(ParquetStoreConstants.DESTINATION);
                    str = strArr2[0];
                    if (strArr2.length > 1) {
                        for (int i2 = 1; i2 < strArr2.length; i2++) {
                            newBuilder.$plus$eq(strArr2[i2]);
                        }
                    }
                    for (String str2 : strArr3) {
                        newBuilder.$plus$eq(str2);
                    }
                    newBuilder.$plus$eq(ParquetStoreConstants.DIRECTED);
                }
                createDataFrame.sort(str, (Seq) newBuilder.result()).coalesce(this.filesPerGroup).write().option("compression", "gzip").parquet(this.outputDir);
            } else {
                LOGGER.debug("Skipping the sorting and aggregation of group: {}, due to no data existing in the temporary files directory: {}", this.group, this.tempFileDir);
            }
            return null;
        } catch (IOException e) {
            return new OperationException("IOException occurred during aggregation and sorting of data", e);
        }
    }
}
