package uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.exception.SerialisationException;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.ParquetStoreProperties;
import uk.gov.gchq.gaffer.parquetstore.index.GraphIndex;
import uk.gov.gchq.gaffer.parquetstore.utils.ParquetStoreConstants;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;
import uk.gov.gchq.gaffer.store.StoreException;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/handler/utilities/AggregateAndSortTempData.class */
public class AggregateAndSortTempData {
    private static final Logger LOGGER = LoggerFactory.getLogger(AggregateAndSortTempData.class);
    private static final String SORTED = "/sorted";
    private static final String SPLIT = "/split";

    public AggregateAndSortTempData(ParquetStore parquetStore, SparkSession sparkSession, Map<String, Map<Object, Integer>> map, ExecutorService executorService) throws OperationException, SerialisationException {
        ArrayList arrayList = new ArrayList();
        SchemaUtils schemaUtils = parquetStore.getSchemaUtils();
        GraphIndex graphIndex = parquetStore.getGraphIndex();
        String str = null != graphIndex ? parquetStore.getDataDir() + "/" + graphIndex.getSnapshotTimestamp() : null;
        LOGGER.debug("Starting to aggregate the input data with existing data");
        Iterator<String> it = schemaUtils.getEdgeGroups().iterator();
        while (it.hasNext()) {
            addAggregationTask(it.next(), ParquetStoreConstants.SOURCE, str, map, arrayList, parquetStore, sparkSession);
        }
        Iterator<String> it2 = schemaUtils.getEntityGroups().iterator();
        while (it2.hasNext()) {
            addAggregationTask(it2.next(), ParquetStoreConstants.VERTEX, str, map, arrayList, parquetStore, sparkSession);
        }
        try {
            List invokeAll = executorService.invokeAll(arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                OperationException operationException = (OperationException) ((Future) invokeAll.get(i)).get();
                if (null != operationException) {
                    throw operationException;
                }
            }
            LOGGER.debug("Finished aggregating the data");
            arrayList.clear();
            LOGGER.debug("Starting to sort the aggregated data");
            ParquetStoreProperties m3getProperties = parquetStore.m3getProperties();
            for (String str2 : schemaUtils.getEdgeGroups()) {
                if (map.containsKey(str2)) {
                    Collection<Integer> values = map.get(str2).values();
                    if (m3getProperties.getSortBySplitsOnIngest()) {
                        arrayList.add(new SortFullGroup(str2, ParquetStoreConstants.DESTINATION, parquetStore, sparkSession, values.size()));
                        Iterator<Integer> it3 = values.iterator();
                        while (it3.hasNext()) {
                            arrayList.add(new SortGroupSplit(str2, ParquetStoreConstants.SOURCE, parquetStore, sparkSession, it3.next().intValue()));
                        }
                    } else {
                        arrayList.add(new SortFullGroup(str2, ParquetStoreConstants.SOURCE, parquetStore, sparkSession, m3getProperties.getAddElementsOutputFilesPerGroup()));
                        arrayList.add(new SortFullGroup(str2, ParquetStoreConstants.DESTINATION, parquetStore, sparkSession, m3getProperties.getAddElementsOutputFilesPerGroup()));
                    }
                }
            }
            for (String str3 : schemaUtils.getEntityGroups()) {
                if (map.containsKey(str3)) {
                    Collection<Integer> values2 = map.get(str3).values();
                    if (m3getProperties.getSortBySplitsOnIngest()) {
                        Iterator<Integer> it4 = values2.iterator();
                        while (it4.hasNext()) {
                            arrayList.add(new SortGroupSplit(str3, ParquetStoreConstants.VERTEX, parquetStore, sparkSession, it4.next().intValue()));
                        }
                    } else {
                        arrayList.add(new SortFullGroup(str3, ParquetStoreConstants.VERTEX, parquetStore, sparkSession, m3getProperties.getAddElementsOutputFilesPerGroup()));
                    }
                }
            }
            List invokeAll2 = executorService.invokeAll(arrayList);
            for (int i2 = 0; i2 < arrayList.size(); i2++) {
                ((Future) invokeAll2.get(i2)).get();
            }
            LOGGER.debug("Finished sorting the aggregated data");
            if (m3getProperties.getSortBySplitsOnIngest()) {
                for (String str4 : schemaUtils.getEdgeGroups()) {
                    if (map.containsKey(str4)) {
                        Iterator<Integer> it5 = map.get(str4).values().iterator();
                        while (it5.hasNext()) {
                            moveData(parquetStore.getFS(), parquetStore.getTempFilesDir(), str4, ParquetStoreConstants.SOURCE, String.valueOf(it5.next().intValue()));
                        }
                    }
                }
                for (String str5 : schemaUtils.getEntityGroups()) {
                    if (map.containsKey(str5)) {
                        Iterator<Integer> it6 = map.get(str5).values().iterator();
                        while (it6.hasNext()) {
                            moveData(parquetStore.getFS(), parquetStore.getTempFilesDir(), str5, ParquetStoreConstants.VERTEX, String.valueOf(it6.next().intValue()));
                        }
                    }
                }
            }
            executorService.shutdown();
        } catch (StoreException e) {
            throw new OperationException("AggregateAndSortData had a store exception thrown", e);
        } catch (IOException e2) {
            throw new OperationException("AggregateAndSortData had an IO exception thrown", e2);
        } catch (InterruptedException e3) {
            throw new OperationException("AggregateAndSortData was interrupted", e3);
        } catch (ExecutionException e4) {
            throw new OperationException("AggregateAndSortData had an execution exception thrown", e4);
        }
    }

    private void addAggregationTask(String str, String str2, String str3, Map<String, Map<Object, Integer>> map, List<Callable<OperationException>> list, ParquetStore parquetStore, SparkSession sparkSession) throws SerialisationException {
        if (map.containsKey(str)) {
            String groupDirectory = null != str3 ? ParquetStore.getGroupDirectory(str, str2, str3) : null;
            Iterator<Integer> it = map.get(str).values().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                HashSet hashSet = new HashSet();
                if (null != groupDirectory) {
                    hashSet.add(groupDirectory + "/part-" + zeroPad(String.valueOf(intValue), 5) + "*.parquet");
                }
                list.add(new AggregateGroupSplit(str, str2, parquetStore, hashSet, sparkSession, intValue));
            }
        }
    }

    private void moveData(FileSystem fileSystem, String str, String str2, String str3, String str4) throws StoreException, IOException, OperationException {
        String str5 = ParquetStore.getGroupDirectory(str2, str3, str) + SORTED + SPLIT + str4 + "/part-00000-*.parquet";
        FileStatus[] globStatus = fileSystem.globStatus(new Path(str5));
        if (globStatus.length != 1) {
            if (globStatus.length > 1) {
                throw new OperationException("Expected to get only one file which matched the file pattern " + str5);
            }
        } else {
            Path path = new Path(ParquetStore.getGroupDirectory(str2, str3, str + SORTED) + "/part-" + zeroPad(str4, 5) + ".gz.parquet");
            fileSystem.mkdirs(path.getParent());
            fileSystem.rename(globStatus[0].getPath(), path);
        }
    }

    private String zeroPad(String str, int i) {
        StringBuilder sb = new StringBuilder(str);
        while (sb.length() < i) {
            sb.insert(0, "0");
        }
        return sb.toString();
    }
}
