package uk.gov.gchq.gaffer.parquetstore.operation.handler;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.ParquetStoreProperties;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.AggregateAndSortData;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.CallableResult;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.WriteUnsortedData;
import uk.gov.gchq.gaffer.parquetstore.partitioner.GraphPartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.Partition;
import uk.gov.gchq.gaffer.parquetstore.partitioner.serialisation.GraphPartitionerSerialiser;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;
import uk.gov.gchq.gaffer.parquetstore.utils.SparkParquetUtils;
import uk.gov.gchq.gaffer.spark.SparkContextUtil;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;
import uk.gov.gchq.gaffer.store.schema.Schema;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/handler/AddElementsHandler.class */
public class AddElementsHandler implements OperationHandler<AddElements> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AddElementsHandler.class);

    @Override // uk.gov.gchq.gaffer.store.operation.handler.OperationHandler
    public Void doOperation(AddElements addElements, Context context, Store store) throws OperationException {
        addElements(addElements, context, (ParquetStore) store);
        return null;
    }

    private void addElements(AddElements addElements, Context context, ParquetStore parquetStore) throws OperationException {
        FileSystem fs = parquetStore.getFS();
        Schema schema = parquetStore.getSchema();
        SchemaUtils schemaUtils = parquetStore.getSchemaUtils();
        SparkSession sparkSession = SparkContextUtil.getSparkSession(context, parquetStore.getProperties());
        ExecutorService createThreadPool = createThreadPool(sparkSession, parquetStore.getProperties());
        GraphPartitioner graphPartitioner = parquetStore.getGraphPartitioner();
        SparkParquetUtils.configureSparkForAddElements(sparkSession, parquetStore.getProperties());
        String tempFilesDir = parquetStore.getTempFilesDir();
        BiFunction biFunction = (str, num) -> {
            return tempFilesDir + "/unsorted_unaggregated_new/group=" + str + "/partition=" + num;
        };
        BiFunction biFunction2 = (str2, num2) -> {
            return tempFilesDir + "/unsorted_unaggregated_new/reversed-group=" + str2 + "/partition=" + num2;
        };
        LOGGER.info("Calling WriteUnsortedData to add elements");
        LOGGER.trace("currentGraphPartitioner is {}", graphPartitioner);
        new WriteUnsortedData(parquetStore, graphPartitioner, biFunction, biFunction2).writeElements(addElements.getInput());
        BiFunction biFunction3 = (str3, num3) -> {
            return tempFilesDir + "/sorted_new_old_merged/group=" + str3 + "/partition=" + num3;
        };
        BiFunction biFunction4 = (str4, num4) -> {
            return tempFilesDir + "/sorted_new_old_merged/REVERSED-group=" + str4 + "/partition=" + num4;
        };
        ArrayList arrayList = new ArrayList();
        for (String str5 : schema.getGroups()) {
            for (Partition partition : graphPartitioner.getGroupPartitioner(str5).getPartitions()) {
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(biFunction.apply(str5, Integer.valueOf(partition.getPartitionId())));
                arrayList2.add(parquetStore.getFile(str5, partition));
                arrayList.add(new AggregateAndSortData(schemaUtils, fs, arrayList2, (String) biFunction3.apply(str5, Integer.valueOf(partition.getPartitionId())), str5, str5 + "-" + partition.getPartitionId(), false, parquetStore.getProperties().getCompressionCodecName(), sparkSession));
                LOGGER.info("Created AggregateAndSortData task for group {}, partition {}", str5, Integer.valueOf(partition.getPartitionId()));
            }
        }
        for (String str6 : schema.getEdgeGroups()) {
            for (Partition partition2 : graphPartitioner.getGroupPartitionerForReversedEdges(str6).getPartitions()) {
                ArrayList arrayList3 = new ArrayList();
                arrayList3.add(biFunction2.apply(str6, Integer.valueOf(partition2.getPartitionId())));
                arrayList3.add(parquetStore.getFileForReversedEdges(str6, partition2));
                arrayList.add(new AggregateAndSortData(schemaUtils, fs, arrayList3, (String) biFunction4.apply(str6, Integer.valueOf(partition2.getPartitionId())), str6, "reversed-" + str6 + "-" + partition2.getPartitionId(), true, parquetStore.getProperties().getCompressionCodecName(), sparkSession));
                LOGGER.info("Created AggregateAndSortData task for reversed edge group {}, partition {}", str6, Integer.valueOf(partition2.getPartitionId()));
            }
        }
        try {
            LOGGER.info("Invoking {} AggregateAndSortData tasks", Integer.valueOf(arrayList.size()));
            Iterator it = createThreadPool.invokeAll(arrayList).iterator();
            while (it.hasNext()) {
                LOGGER.info("Result {} from task", (CallableResult) ((Future) it.next()).get());
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                String str7 = parquetStore.getDataDir() + "/" + ParquetStore.getSnapshotPath(currentTimeMillis) + "-tmp";
                LOGGER.info("Moving aggregated and sorted data to new snapshot directory {}", str7);
                fs.mkdirs(new Path(str7));
                Iterator<String> it2 = schema.getGroups().iterator();
                while (it2.hasNext()) {
                    Path path = new Path(str7, "group=" + it2.next());
                    fs.mkdirs(path);
                    LOGGER.info("Created directory {}", path);
                }
                Iterator<String> it3 = schema.getEdgeGroups().iterator();
                while (it3.hasNext()) {
                    Path path2 = new Path(str7, "reversed-group=" + it3.next());
                    fs.mkdirs(path2);
                    LOGGER.info("Created directory {}", path2);
                }
                for (String str8 : schema.getGroups()) {
                    String str9 = str7 + "/group=" + str8;
                    for (Partition partition3 : graphPartitioner.getGroupPartitioner(str8).getPartitions()) {
                        Path path3 = new Path((String) biFunction3.apply(str8, Integer.valueOf(partition3.getPartitionId())));
                        if (fs.exists(path3)) {
                            FileStatus[] listStatus = fs.listStatus(path3, path4 -> {
                                return path4.getName().endsWith(".parquet");
                            });
                            if (1 != listStatus.length) {
                                LOGGER.error("Didn't find one Parquet file in path {} (found {} files)", path3, Integer.valueOf(listStatus.length));
                                throw new OperationException("Expected to find one Parquet file in path " + path3 + " (found " + listStatus.length + " files)");
                            }
                            Path path5 = new Path(str9, ParquetStore.getFile(Integer.valueOf(partition3.getPartitionId())));
                            LOGGER.info("Renaming {} to {}", listStatus[0].getPath(), path5);
                            fs.rename(listStatus[0].getPath(), path5);
                        } else {
                            LOGGER.info("Not moving data for group {}, partition id {} as the outputDir {} does not exist", str8, Integer.valueOf(partition3.getPartitionId()), path3);
                        }
                    }
                }
                for (String str10 : schema.getEdgeGroups()) {
                    String str11 = str7 + "/reversed-group=" + str10;
                    for (Partition partition4 : graphPartitioner.getGroupPartitionerForReversedEdges(str10).getPartitions()) {
                        Path path6 = new Path((String) biFunction4.apply(str10, Integer.valueOf(partition4.getPartitionId())));
                        if (fs.exists(path6)) {
                            FileStatus[] listStatus2 = fs.listStatus(path6, path7 -> {
                                return path7.getName().endsWith(".parquet");
                            });
                            if (1 != listStatus2.length) {
                                LOGGER.error("Didn't find one Parquet file in path {} (found {} files)", path6, Integer.valueOf(listStatus2.length));
                                throw new OperationException("Expected to find one Parquet file in path " + path6 + " (found " + listStatus2.length + " files)");
                            }
                            Path path8 = new Path(str11, ParquetStore.getFile(Integer.valueOf(partition4.getPartitionId())));
                            LOGGER.info("Renaming {} to {}", listStatus2[0].getPath(), path8);
                            fs.rename(listStatus2[0].getPath(), path8);
                        } else {
                            LOGGER.info("Not moving data for reversed edge group {}, partition id {} as the outputDir {} does not exist", str10, Integer.valueOf(partition4.getPartitionId()), path6);
                        }
                    }
                }
                LOGGER.info("Deleting temporary directory {}", tempFilesDir);
                fs.delete(new Path(tempFilesDir), true);
                Path path9 = new Path(str7 + "/graphPartitioner");
                DataOutputStream create = fs.create(path9);
                LOGGER.info("Writing graph partitioner to {}", path9);
                new GraphPartitionerSerialiser().write(graphPartitioner, create);
                create.close();
                String substring = str7.substring(0, str7.lastIndexOf("-tmp"));
                LOGGER.info("Renaming {} to {}", str7, substring);
                fs.rename(new Path(str7), new Path(substring));
                LOGGER.info("Updating latest snapshot on store to {}", Long.valueOf(currentTimeMillis));
                parquetStore.setLatestSnapshot(currentTimeMillis);
            } catch (IOException | StoreException e) {
                throw new OperationException("IOException moving results files into new snapshot directory", e);
            }
        } catch (InterruptedException e2) {
            throw new OperationException("InterruptedException running AggregateAndSortData tasks", e2);
        } catch (ExecutionException e3) {
            throw new OperationException("ExecutionException running AggregateAndSortData tasks", e3);
        }
    }

    private static ExecutorService createThreadPool(SparkSession sparkSession, ParquetStoreProperties parquetStoreProperties) {
        Option option = sparkSession.conf().getOption("spark.driver.cores");
        int parseInt = option.nonEmpty() ? Integer.parseInt((String) option.get()) : parquetStoreProperties.getThreadsAvailable().intValue();
        LOGGER.debug("Created thread pool of size {}", Integer.valueOf(parseInt));
        return Executors.newFixedThreadPool(parseInt);
    }
}
