package uk.gov.gchq.gaffer.parquetstore.operation.addelements.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.ParquetStoreProperties;
import uk.gov.gchq.gaffer.parquetstore.index.GraphIndex;
import uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl.AggregateAndSortTempData;
import uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl.CalculateSplitPointsFromIndex;
import uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl.GenerateIndices;
import uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl.rdd.CalculateSplitPointsFromJavaRDD;
import uk.gov.gchq.gaffer.parquetstore.operation.addelements.impl.rdd.WriteUnsortedDataFunction;
import uk.gov.gchq.gaffer.parquetstore.utils.ParquetStoreConstants;
import uk.gov.gchq.gaffer.parquetstore.utils.ParquetStoreUtils;
import uk.gov.gchq.gaffer.parquetstore.utils.SparkParquetUtils;
import uk.gov.gchq.gaffer.spark.SparkContextUtil;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.schema.Schema;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/addelements/handler/AddElementsFromRDD.class */
public class AddElementsFromRDD {
    private static final Logger LOGGER = LoggerFactory.getLogger(AddElementsFromRDD.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public void addElementsFromRDD(JavaRDD<Element> javaRDD, Context context, ParquetStore parquetStore) throws OperationException {
        Map<String, Map<Object, Integer>> apply;
        try {
            FileSystem fs = parquetStore.getFS();
            ParquetStoreProperties m3getProperties = parquetStore.m3getProperties();
            String tempFilesDir = parquetStore.getTempFilesDir();
            Path path = new Path(tempFilesDir);
            String dataDir = parquetStore.getDataDir();
            if (fs.exists(path)) {
                fs.delete(path, true);
                LOGGER.warn("Temp data directory '{}' has been deleted.", tempFilesDir);
            }
            SparkSession sparkSession = SparkContextUtil.getSparkSession(context, parquetStore.m3getProperties());
            SparkParquetUtils.configureSparkForAddElements(sparkSession, m3getProperties);
            ExecutorService createThreadPool = ParquetStoreUtils.createThreadPool(sparkSession, m3getProperties);
            LOGGER.debug("Starting to write the new unsorted Parquet data after aggregation to {} split by group", tempFilesDir);
            Schema schema = parquetStore.getSchema();
            ArrayList arrayList = new ArrayList();
            GraphIndex graphIndex = parquetStore.getGraphIndex();
            if (null == graphIndex) {
                apply = new HashMap();
                Iterator it = schema.getEdgeGroups().iterator();
                while (it.hasNext()) {
                    arrayList.add(new CalculateSplitPointsFromJavaRDD(m3getProperties.getSampleRate().intValue(), m3getProperties.getAddElementsOutputFilesPerGroup() - 1, javaRDD, (String) it.next(), false));
                }
                Iterator it2 = schema.getEntityGroups().iterator();
                while (it2.hasNext()) {
                    arrayList.add(new CalculateSplitPointsFromJavaRDD(m3getProperties.getSampleRate().intValue(), m3getProperties.getAddElementsOutputFilesPerGroup() - 1, javaRDD, (String) it2.next(), true));
                }
                ParquetStoreUtils.invokeSplitPointCalculations(createThreadPool, arrayList, apply);
            } else {
                apply = CalculateSplitPointsFromIndex.apply(graphIndex, parquetStore.getSchemaUtils(), m3getProperties, javaRDD, createThreadPool);
            }
            javaRDD.foreachPartition(new WriteUnsortedDataFunction(parquetStore.getTempFilesDir(), parquetStore.getSchemaUtils(), apply));
            LOGGER.debug("Finished writing the unsorted Parquet data to {}", tempFilesDir);
            LOGGER.debug("Starting to write the sorted and aggregated Parquet data to {} split by group", tempFilesDir);
            new AggregateAndSortTempData(parquetStore, sparkSession, apply, createThreadPool);
            createThreadPool.shutdown();
            LOGGER.debug("Finished writing the sorted and aggregated Parquet data to {}", tempFilesDir);
            LOGGER.debug("Starting to write the indexes");
            GraphIndex graphIndex2 = new GenerateIndices(parquetStore, sparkSession).getGraphIndex();
            LOGGER.debug("Finished writing the indexes");
            try {
                moveDataToDataDir(parquetStore, fs, dataDir, tempFilesDir, graphIndex2);
                tidyUp(fs, tempFilesDir);
            } catch (IOException e) {
                throw new OperationException("Failed to move data from temporary files directory to the data directory.", e);
            } catch (StoreException e2) {
                throw new OperationException("Failed to reload the indices", e2);
            }
        } catch (IOException e3) {
            throw new OperationException("IOException: Failed to connect to the file system", e3);
        } catch (StoreException e4) {
            throw new OperationException(e4.getMessage(), e4);
        }
    }

    private void moveDataToDataDir(ParquetStore parquetStore, FileSystem fileSystem, String str, String str2, GraphIndex graphIndex) throws StoreException, IOException {
        long currentTimeMillis = System.currentTimeMillis();
        String str3 = str + "/" + currentTimeMillis;
        fileSystem.mkdirs(new Path(str3).getParent());
        fileSystem.rename(new Path(str2 + "/" + ParquetStoreConstants.SORTED), new Path(str3));
        graphIndex.setSnapshotTimestamp(currentTimeMillis);
        parquetStore.setGraphIndex(graphIndex);
    }

    private void tidyUp(FileSystem fileSystem, String str) throws IOException {
        Path path = new Path(str);
        fileSystem.delete(path, true);
        LOGGER.debug("Temp data directory '{}' has been deleted.", str);
        while (fileSystem.listStatus(path.getParent()).length == 0) {
            path = path.getParent();
            LOGGER.debug("Empty directory '{}' has been deleted.", str);
            fileSystem.delete(path, true);
        }
    }
}
