package uk.gov.gchq.gaffer.parquetstore.operation.handler.spark;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.exception.SerialisationException;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.spark.utilities.WriteData;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.AggregateDataForGroup;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.CalculatePartitioner;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.SortFullGroup;
import uk.gov.gchq.gaffer.parquetstore.partitioner.GraphPartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.serialisation.GraphPartitionerSerialiser;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;
import uk.gov.gchq.gaffer.parquetstore.utils.SparkParquetUtils;
import uk.gov.gchq.gaffer.spark.SparkContextUtil;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.schema.Schema;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/handler/spark/AddElementsFromRDD.class */
public class AddElementsFromRDD {
    private static final Logger LOGGER = LoggerFactory.getLogger(AddElementsFromRDD.class);
    private final ParquetStore store;
    private final Schema schema;
    private final SchemaUtils schemaUtils;
    private final FileSystem fs;
    private final SparkSession spark;
    private final String tempDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AddElementsFromRDD(Context context, ParquetStore parquetStore) {
        this.store = parquetStore;
        this.schema = parquetStore.getSchema();
        this.schemaUtils = parquetStore.getSchemaUtils();
        this.fs = parquetStore.getFS();
        this.spark = SparkContextUtil.getSparkSession(context, parquetStore.m3getProperties());
        SparkParquetUtils.configureSparkForAddElements(this.spark, parquetStore.m3getProperties());
        this.tempDir = parquetStore.m3getProperties().getTempFilesDir();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addElementsFromRDD(JavaRDD<Element> javaRDD) throws OperationException {
        writeInputData(javaRDD);
        aggregateNewAndOldData();
        sort();
        sortEdgeGroupsByDestination();
        calculateAndWritePartitioner();
        createNewSnapshotDirectory();
        deleteTempDirectory();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addElementsFromRDD(RDD<Element> rdd) throws OperationException {
        addElementsFromRDD(rdd.toJavaRDD());
    }

    private String getDirectory(String str, boolean z, boolean z2, boolean z3) {
        return this.tempDir + "/AddElementsFromRDDTemp/" + (z ? "sorted" : "unsorted") + "_" + (z2 ? "aggregated" : "unaggregated") + "_new/" + (z3 ? "reversed-" : "") + (null != str ? "group=" + str : "") + "/";
    }

    private void writeInputData(JavaRDD<Element> javaRDD) {
        LOGGER.info("Writing data for input RDD");
        javaRDD.foreachPartition(new WriteData(str -> {
            return getDirectory(str, false, false, false);
        }, this.schema, this.store.m3getProperties().getCompressionCodecName()));
    }

    private void aggregateNewAndOldData() throws OperationException {
        LOGGER.info("Creating AggregateDataForGroup tasks for groups that require aggregation");
        for (String str : this.schema.getAggregatedGroups()) {
            LOGGER.info("Creating AggregateDataForGroup task for group {}", str);
            ArrayList arrayList = new ArrayList();
            String directory = getDirectory(str, false, false, false);
            try {
                Stream map = Arrays.stream(this.fs.listStatus(new Path(directory), path -> {
                    return path.getName().endsWith(".parquet");
                })).map(fileStatus -> {
                    return fileStatus.getPath().toString();
                });
                arrayList.getClass();
                map.forEach((v1) -> {
                    r1.add(v1);
                });
                try {
                    Stream<R> map2 = this.store.getFilesForGroup(str).stream().map((v0) -> {
                        return v0.toString();
                    });
                    arrayList.getClass();
                    map2.forEach((v1) -> {
                        r1.add(v1);
                    });
                    String directory2 = getDirectory(str, false, true, false);
                    try {
                        AggregateDataForGroup aggregateDataForGroup = new AggregateDataForGroup(this.fs, this.schemaUtils, str, arrayList, directory2, this.spark);
                        LOGGER.info("AggregateDataForGroup task for group {} is being called ({} files as input, outputting to {})", new Object[]{str, Integer.valueOf(arrayList.size()), directory2});
                        aggregateDataForGroup.call();
                    } catch (SerialisationException e) {
                        throw new OperationException("SerialisationException creating AggregateDataForGroup task", e);
                    }
                } catch (IOException e2) {
                    throw new OperationException("IOException finding files for group " + str, e2);
                }
            } catch (IOException e3) {
                throw new OperationException("IOException finding Parquet files in " + directory, e3);
            }
        }
    }

    private void sort() throws OperationException {
        try {
            for (String str : this.schemaUtils.getGroups()) {
                ArrayList arrayList = new ArrayList();
                String directory = getDirectory(str, true, true, false);
                if (this.schemaUtils.getGafferSchema().getAggregatedGroups().contains(str)) {
                    Stream map = Arrays.stream(this.fs.listStatus(new Path(getDirectory(str, false, true, false)), path -> {
                        return path.getName().endsWith(".parquet");
                    })).map(fileStatus -> {
                        return fileStatus.getPath().toString();
                    });
                    arrayList.getClass();
                    map.forEach((v1) -> {
                        r1.add(v1);
                    });
                } else {
                    Stream map2 = Arrays.stream(this.fs.listStatus(new Path(getDirectory(str, false, false, false)), path2 -> {
                        return path2.getName().endsWith(".parquet");
                    })).map(fileStatus2 -> {
                        return fileStatus2.getPath().toString();
                    });
                    arrayList.getClass();
                    map2.forEach((v1) -> {
                        r1.add(v1);
                    });
                    Stream<R> map3 = this.store.getFilesForGroup(str).stream().map(path3 -> {
                        return path3.toString();
                    });
                    arrayList.getClass();
                    map3.forEach((v1) -> {
                        r1.add(v1);
                    });
                }
                sort(str, false, arrayList, directory);
            }
        } catch (IOException e) {
            throw new OperationException("IOException sorting data", e);
        }
    }

    private void sortEdgeGroupsByDestination() throws OperationException {
        try {
            for (String str : this.schemaUtils.getEdgeGroups()) {
                ArrayList arrayList = new ArrayList();
                Stream map = Arrays.stream(this.fs.listStatus(new Path(getDirectory(str, true, true, false)), path -> {
                    return path.getName().endsWith(".parquet");
                })).map(fileStatus -> {
                    return fileStatus.getPath().toString();
                });
                arrayList.getClass();
                map.forEach((v1) -> {
                    r1.add(v1);
                });
                sort(str, true, arrayList, getDirectory(str, true, true, true));
            }
        } catch (IOException e) {
            throw new OperationException("IOException sorting edge groups by destination", e);
        }
    }

    private void sort(String str, boolean z, List<String> list, String str2) throws OperationException {
        try {
            List<String> columnsToSortBy = this.schemaUtils.columnsToSortBy(str, z);
            Logger logger = LOGGER;
            Object[] objArr = new Object[4];
            objArr[0] = z ? "reversed" : "";
            objArr[1] = str;
            objArr[2] = Integer.valueOf(list.size());
            objArr[3] = str2;
            logger.info("SortFullGroup task for {} group {} is being called ({} files as input, outputting to {})", objArr);
            new SortFullGroup(str, this.schemaUtils.getEntityGroups().contains(str), z, this.schemaUtils, columnsToSortBy, list, str2, this.store.m3getProperties().getAddElementsOutputFilesPerGroup(), this.store.m3getProperties().getCompressionCodecName(), this.spark, this.fs).call();
        } catch (IOException e) {
            throw new OperationException("TODO");
        }
    }

    private void calculateAndWritePartitioner() throws OperationException {
        LOGGER.info("Calculating new GraphPartitioner");
        try {
            GraphPartitioner call = new CalculatePartitioner(new Path(getDirectory(null, true, true, false)), this.store.getSchema(), this.fs).call();
            LOGGER.info("New GraphPartitioner has partitions for {} groups, {} reversed edge groups", Integer.valueOf(call.getGroups().size()), Integer.valueOf(call.getGroupsForReversedEdges().size()));
            Path path = null;
            try {
                path = new Path(getDirectory(null, true, true, false) + "graphPartitioner");
                DataOutputStream create = this.fs.create(path);
                LOGGER.info("Writing graph partitioner to {}", path);
                new GraphPartitionerSerialiser().write(call, create);
                create.close();
            } catch (IOException e) {
                throw new OperationException("IOException writing out graph partitioner to " + path, e);
            }
        } catch (IOException e2) {
            throw new OperationException("IOException calculating new graph partitioner", e2);
        }
    }

    private void createNewSnapshotDirectory() throws OperationException {
        long currentTimeMillis = System.currentTimeMillis();
        String str = this.store.getDataDir() + "/" + ParquetStore.getSnapshotPath(currentTimeMillis) + "-tmp/";
        LOGGER.info("Moving aggregated and sorted data to new snapshot-tmp directory {}", str);
        try {
            this.fs.mkdirs(new Path(str));
            FileStatus[] listStatus = this.fs.listStatus(new Path(getDirectory(null, true, true, false)));
            for (int i = 0; i < listStatus.length; i++) {
                Path path = new Path(str, listStatus[i].getPath().getName());
                this.fs.rename(listStatus[i].getPath(), path);
                LOGGER.debug("Renamed {} to {}", listStatus[i].getPath(), path);
            }
            String substring = str.substring(0, str.lastIndexOf("-tmp"));
            LOGGER.info("Renaming {} to {}", str, substring);
            this.fs.rename(new Path(str), new Path(substring));
            LOGGER.info("Updating latest snapshot on store to {}", Long.valueOf(currentTimeMillis));
            try {
                this.store.setLatestSnapshot(currentTimeMillis);
            } catch (StoreException e) {
                throw new OperationException("StoreException setting the latest snapshot on the store to " + currentTimeMillis, e);
            }
        } catch (IOException e2) {
            throw new OperationException("IOException moving files to new snapshot directory", e2);
        }
    }

    private void deleteTempDirectory() throws OperationException {
        LOGGER.info("Deleting temporary directory {}", new Path(this.tempDir));
        try {
            this.fs.delete(new Path(this.tempDir), true);
        } catch (IOException e) {
            throw new OperationException("IOException deleting temporary directory " + this.tempDir, e);
        }
    }
}
