package uk.gov.gchq.gaffer.parquetstore;

import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.commonutil.iterable.CloseableIterable;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.element.IdentifierType;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.AddElementsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.GetAdjacentIdsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.GetAllElementsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.GetElementsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.spark.ImportJavaRDDOfElementsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.spark.ImportRDDOfElementsHandler;
import uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities.CalculatePartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.GraphPartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.GroupPartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.Partition;
import uk.gov.gchq.gaffer.parquetstore.partitioner.serialisation.GraphPartitionerSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.ArrayListStringParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.BooleanParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.ByteParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.DateParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.DoubleParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.FloatParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.FreqMapParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.HashSetStringParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.InLineHyperLogLogPlusParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.IntegerParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.LongParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.ShortParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.StringParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.TreeSetStringParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.TypeSubTypeValueParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.serialisation.impl.TypeValueParquetSerialiser;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;
import uk.gov.gchq.gaffer.serialisation.Serialiser;
import uk.gov.gchq.gaffer.serialisation.implementation.JavaSerialiser;
import uk.gov.gchq.gaffer.spark.operation.javardd.ImportJavaRDDOfElements;
import uk.gov.gchq.gaffer.spark.operation.scalardd.ImportRDDOfElements;
import uk.gov.gchq.gaffer.store.SerialisationFactory;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.StoreProperties;
import uk.gov.gchq.gaffer.store.StoreTrait;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;
import uk.gov.gchq.gaffer.store.operation.handler.OutputOperationHandler;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.SchemaElementDefinition;
import uk.gov.gchq.gaffer.store.schema.SchemaOptimiser;
import uk.gov.gchq.koryphe.ValidationResult;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/ParquetStore.class */
public class ParquetStore extends Store {
    public static final String GROUP = "group";
    public static final String SNAPSHOT = "snapshot";
    public static final String REVERSED_GROUP = "reversed-group";
    public static final String PARTITION = "partition";
    public static final int LENGTH_OF_PARTITION_NUMBER_IN_FILENAME = 7;
    private GraphPartitioner graphPartitioner;
    private long currentSnapshot;
    private SchemaUtils schemaUtils;
    private FileSystem fs;
    public static final String VERTEX = IdentifierType.VERTEX.name();
    public static final String SOURCE = IdentifierType.SOURCE.name();
    public static final String DESTINATION = IdentifierType.DESTINATION.name();
    public static final String DIRECTED = IdentifierType.DIRECTED.name();

    @SuppressFBWarnings({"MS_MUTABLE_ARRAY"})
    public static final Serialiser[] SERIALISERS = {new StringParquetSerialiser(), new ByteParquetSerialiser(), new IntegerParquetSerialiser(), new LongParquetSerialiser(), new BooleanParquetSerialiser(), new DateParquetSerialiser(), new DoubleParquetSerialiser(), new FloatParquetSerialiser(), new InLineHyperLogLogPlusParquetSerialiser(), new ShortParquetSerialiser(), new TypeValueParquetSerialiser(), new FreqMapParquetSerialiser(), new TreeSetStringParquetSerialiser(), new TypeSubTypeValueParquetSerialiser(), new ArrayListStringParquetSerialiser(), new HashSetStringParquetSerialiser(), new JavaSerialiser()};
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ParquetStore.class);
    private static final Set<StoreTrait> TRAITS = Collections.unmodifiableSet(Sets.newHashSet(StoreTrait.ORDERED, StoreTrait.INGEST_AGGREGATION, StoreTrait.PRE_AGGREGATION_FILTERING));

    @Override // uk.gov.gchq.gaffer.store.Store
    public void initialise(String str, Schema schema, StoreProperties storeProperties) throws StoreException {
        if (!(storeProperties instanceof ParquetStoreProperties)) {
            throw new StoreException("ParquetStore must be initialised with properties of class ParquetStoreProperties");
        }
        ParquetStoreProperties parquetStoreProperties = (ParquetStoreProperties) storeProperties;
        if (null == parquetStoreProperties.getDataDir()) {
            throw new StoreException("The ParquetStoreProperties must contain a non-null data directory (parquet.data.dir)");
        }
        if (null == parquetStoreProperties.getTempFilesDir()) {
            throw new StoreException("The ParquetStoreProperties must contain a non-null temporary data directory (parquet.temp_data.dir)");
        }
        LOGGER.info("Initialising ParquetStore for graph id {}", str);
        super.initialise(str, schema, parquetStoreProperties);
        try {
            this.fs = FileSystem.get(new Configuration());
            this.schemaUtils = new SchemaUtils(getSchema());
            initialise();
            loadGraphPartitioner();
        } catch (IOException e) {
            throw new StoreException("Could not connect to the file system", e);
        }
    }

    public static String getSnapshotPath(long j) {
        return "snapshot=" + j;
    }

    private void initialise() throws IOException, StoreException {
        Path path = new Path(getDataDir());
        if (this.fs.exists(path) && 0 != this.fs.listStatus(path).length) {
            LOGGER.info("Data directory {} exists and is non-empty, validating a snapshot directory exists", path);
            if (0 == ((List) Arrays.stream(this.fs.listStatus(path, path2 -> {
                return path2.getName().startsWith("snapshot=");
            })).filter(fileStatus -> {
                return fileStatus.isDirectory();
            }).collect(Collectors.toList())).size()) {
                LOGGER.error("Data directory {} should contain a snapshot directory", path);
                throw new StoreException("Data directory should contain a snapshot directory");
            }
            long latestSnapshot = getLatestSnapshot();
            String str = path + "/" + SNAPSHOT + "=" + latestSnapshot;
            LOGGER.info("Latest snapshot directory in data directory {} is {}", path, Long.valueOf(latestSnapshot));
            LOGGER.info("Verifying snapshot directory contains the correct directories");
            Iterator<String> it = getSchema().getGroups().iterator();
            while (it.hasNext()) {
                Path path3 = new Path(str, "group=" + it.next());
                if (!this.fs.exists(path3)) {
                    LOGGER.error("Directory {} should exist", path3);
                    throw new StoreException("Group directory " + path3 + " should exist in snapshot directory " + str);
                }
            }
            Iterator<String> it2 = getSchema().getEdgeGroups().iterator();
            while (it2.hasNext()) {
                Path path4 = new Path(str, "reversed-group=" + it2.next());
                if (!this.fs.exists(path4)) {
                    LOGGER.error("Directory {} should exist", path4);
                    throw new StoreException("Group directory " + path4 + " should exist in snapshot directory " + str);
                }
            }
            return;
        }
        LOGGER.info("Data directory {} doesn't exist or is empty so initialising directory structure", path);
        this.currentSnapshot = System.currentTimeMillis();
        LOGGER.info("Initialising snapshot id to {}", Long.valueOf(this.currentSnapshot));
        Path path5 = new Path(path, getSnapshotPath(this.currentSnapshot));
        LOGGER.info("Creating snapshot directory {}", path5);
        this.fs.mkdirs(path5);
        LOGGER.info("Creating group directories under {}", path5);
        Iterator<String> it3 = getSchema().getGroups().iterator();
        while (it3.hasNext()) {
            Path path6 = new Path(path5, "group=" + it3.next());
            this.fs.mkdirs(path6);
            LOGGER.info("Created directory {}", path6);
        }
        LOGGER.info("Creating group directories for reversed edges under {}", path5);
        Iterator<String> it4 = getSchema().getEdgeGroups().iterator();
        while (it4.hasNext()) {
            Path path7 = new Path(path5, "reversed-group=" + it4.next());
            this.fs.mkdirs(path7);
            LOGGER.info("Created directory {}", path7);
        }
        LOGGER.info("Creating GraphPartitioner with 0 split points for each group");
        this.graphPartitioner = new GraphPartitioner();
        for (String str2 : getSchema().getGroups()) {
            this.graphPartitioner.addGroupPartitioner(str2, new GroupPartitioner(str2, new ArrayList()));
        }
        for (String str3 : getSchema().getEdgeGroups()) {
            this.graphPartitioner.addGroupPartitionerForReversedEdges(str3, new GroupPartitioner(str3, new ArrayList()));
        }
        LOGGER.info("Writing GraphPartitioner to snapshot directory");
        DataOutputStream create = this.fs.create(getGraphPartitionerPath());
        new GraphPartitionerSerialiser().write(this.graphPartitioner, create);
        create.close();
        LOGGER.info("Wrote GraphPartitioner to file {}", getGraphPartitionerPath().toString());
    }

    public Path getGraphPartitionerPath() {
        return new Path(getProperties().getDataDir() + "/" + SNAPSHOT + "=" + this.currentSnapshot, "graphPartitioner");
    }

    private void loadGraphPartitioner() throws StoreException {
        String dataDir = getDataDir();
        try {
            if (!this.fs.exists(new Path(dataDir))) {
                throw new StoreException("Data directory " + dataDir + " does not exist - store is in an inconsistent state");
            }
            this.currentSnapshot = getLatestSnapshot(dataDir);
            LOGGER.info("Setting currentSnapshot to {}", Long.valueOf(this.currentSnapshot));
            Path graphPartitionerPath = getGraphPartitionerPath();
            if (!this.fs.exists(graphPartitionerPath)) {
                LOGGER.info("Graph partitioner does not exist in {} so creating it", graphPartitionerPath);
                GraphPartitioner call = new CalculatePartitioner(new Path(dataDir + "/" + getSnapshotPath(this.currentSnapshot)), getSchema(), this.fs).call();
                LOGGER.info("Writing graph partitioner to {}", graphPartitionerPath);
                DataOutputStream create = this.fs.create(graphPartitionerPath);
                new GraphPartitionerSerialiser().write(call, create);
                create.close();
            }
            LOGGER.info("Loading graph partitioner from path {}", graphPartitionerPath);
            loadGraphPartitioner(graphPartitionerPath);
        } catch (IOException e) {
            throw new StoreException(e.getMessage(), e);
        }
    }

    private void loadGraphPartitioner(Path path) throws IOException {
        DataInputStream open = this.fs.open(path);
        this.graphPartitioner = new GraphPartitionerSerialiser().read(open);
        open.close();
    }

    public FileSystem getFS() {
        return this.fs;
    }

    public SchemaUtils getSchemaUtils() {
        return this.schemaUtils;
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    public Set<StoreTrait> getTraits() {
        return TRAITS;
    }

    public String getDataDir() {
        return getProperties().getDataDir();
    }

    public String getTempFilesDir() {
        return getProperties().getTempFilesDir();
    }

    public String getFile(String str, Partition partition) {
        return getFile(str, Integer.valueOf(partition.getPartitionId()));
    }

    public String getFile(String str, Integer num) {
        return getDataDir() + "/" + getSnapshotPath(this.currentSnapshot) + "/group=" + str + "/" + getFile(num);
    }

    public static String getFile(Integer num) {
        return "partition-" + zeroPad("" + num) + ".parquet";
    }

    private static String zeroPad(String str) {
        StringBuilder sb = new StringBuilder(str);
        while (sb.length() < 7) {
            sb.insert(0, "0");
        }
        return sb.toString();
    }

    public String getFileForReversedEdges(String str, Partition partition) {
        return getFileForReversedEdges(str, Integer.valueOf(partition.getPartitionId()));
    }

    public String getFileForReversedEdges(String str, Integer num) {
        return getDataDir() + "/" + getSnapshotPath(this.currentSnapshot) + "/" + REVERSED_GROUP + "=" + str + "/" + getFile(num);
    }

    public List<Path> getFilesForGroup(String str) throws IOException {
        return (List) Arrays.stream(this.fs.listStatus(new Path(getDataDir() + "/" + getSnapshotPath(this.currentSnapshot) + "/group=" + str), path -> {
            return path.getName().endsWith(".parquet");
        })).map((v0) -> {
            return v0.getPath();
        }).collect(Collectors.toList());
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    @SuppressFBWarnings(value = {"BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"}, justification = "The properties should always be ParquetStoreProperties")
    public ParquetStoreProperties getProperties() {
        return (ParquetStoreProperties) super.getProperties();
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected Class<ParquetStoreProperties> getPropertiesClass() {
        return ParquetStoreProperties.class;
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected OutputOperationHandler<GetElements, CloseableIterable<? extends Element>> getGetElementsHandler() {
        return new GetElementsHandler();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.gov.gchq.gaffer.store.Store
    public GetAllElementsHandler getGetAllElementsHandler() {
        return new GetAllElementsHandler();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.gov.gchq.gaffer.store.Store
    public GetAdjacentIdsHandler getAdjacentIdsHandler() {
        return new GetAdjacentIdsHandler();
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected OperationHandler<? extends AddElements> getAddElementsHandler() {
        return new AddElementsHandler();
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected void addAdditionalOperationHandlers() {
        addOperationHandler(ImportJavaRDDOfElements.class, new ImportJavaRDDOfElementsHandler());
        addOperationHandler(ImportRDDOfElements.class, new ImportRDDOfElementsHandler());
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected Class<? extends Serialiser> getRequiredParentSerialiserClass() {
        return Serialiser.class;
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    protected SchemaOptimiser createSchemaOptimiser() {
        return new SchemaOptimiser(new SerialisationFactory(SERIALISERS));
    }

    @Override // uk.gov.gchq.gaffer.store.Store
    public void validateSchemas() {
        super.validateSchemas();
        validateConsistentVertex();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.gov.gchq.gaffer.store.Store
    public void validateSchemaElementDefinition(Map.Entry<String, SchemaElementDefinition> entry, ValidationResult validationResult) {
        super.validateSchemaElementDefinition(entry, validationResult);
        validateConsistentGroupByProperties(entry, validationResult);
    }

    public long getLatestSnapshot() throws StoreException {
        return getLatestSnapshot(getDataDir());
    }

    public void setLatestSnapshot(long j) throws StoreException {
        Path path = new Path(getDataDir(), getSnapshotPath(j));
        try {
            if (!this.fs.exists(path)) {
                throw new StoreException(String.format("Failed setting currentSnapshot: '%s' does not exist", path.toString()));
            }
            LOGGER.info("Setting currentSnapshot to {} and reloading graph partitioner", Long.valueOf(j));
            this.currentSnapshot = j;
            loadGraphPartitioner();
        } catch (IOException e) {
            throw new StoreException("IOException checking Path: ", e);
        }
    }

    private long getLatestSnapshot(String str) throws StoreException {
        long j = 0;
        try {
            for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
                long parseLong = Long.parseLong(fileStatus.getPath().getName().replace("snapshot=", ""));
                if (j < parseLong) {
                    j = parseLong;
                }
            }
            return j;
        } catch (IOException e) {
            throw new StoreException(e.getMessage());
        }
    }

    public GraphPartitioner getGraphPartitioner() {
        return this.graphPartitioner;
    }
}
