package uk.gov.gchq.gaffer.parquetstore.operation.handler.utilities;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.commonutil.iterable.CloseableIterable;
import uk.gov.gchq.gaffer.data.element.Edge;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.ParquetStore;
import uk.gov.gchq.gaffer.parquetstore.io.writer.ParquetElementWriter;
import uk.gov.gchq.gaffer.parquetstore.partitioner.GraphPartitioner;
import uk.gov.gchq.gaffer.parquetstore.partitioner.PartitionKey;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/handler/utilities/WriteUnsortedData.class */
public class WriteUnsortedData {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteUnsortedData.class);
    private final CompressionCodecName compressionCodecName;
    private String tempFilesDir;
    private final SchemaUtils schemaUtils;
    private final Map<String, Map<Integer, ParquetWriter<Element>>> groupToPartitionIdToWriter;
    private final Map<String, Map<Integer, ParquetWriter<Element>>> groupToPartitionIdToWriterForReversedEdges;
    private final GraphPartitioner graphPartitioner;
    private final BiFunction<String, Integer, String> fileNameForGroupAndPartitionId;
    private final BiFunction<String, Integer, String> fileNameForGroupAndPartitionIdForReversedEdges;

    public WriteUnsortedData(ParquetStore parquetStore, GraphPartitioner graphPartitioner, BiFunction<String, Integer, String> biFunction, BiFunction<String, Integer, String> biFunction2) throws OperationException {
        this(parquetStore.getTempFilesDir(), parquetStore.m3getProperties().getCompressionCodecName(), parquetStore.getSchemaUtils(), graphPartitioner, biFunction, biFunction2);
    }

    public WriteUnsortedData(String str, CompressionCodecName compressionCodecName, SchemaUtils schemaUtils, GraphPartitioner graphPartitioner, BiFunction<String, Integer, String> biFunction, BiFunction<String, Integer, String> biFunction2) throws OperationException {
        this.tempFilesDir = str;
        this.compressionCodecName = compressionCodecName;
        this.schemaUtils = schemaUtils;
        this.graphPartitioner = graphPartitioner;
        this.fileNameForGroupAndPartitionId = biFunction;
        this.fileNameForGroupAndPartitionIdForReversedEdges = biFunction2;
        this.groupToPartitionIdToWriter = new HashMap();
        this.groupToPartitionIdToWriterForReversedEdges = new HashMap();
        for (String str2 : schemaUtils.getGroups()) {
            if (null == this.graphPartitioner.getGroupPartitioner(str2)) {
                throw new OperationException("graphPartitioner does not contain a partitioner for group " + str2);
            }
        }
        for (String str3 : schemaUtils.getEdgeGroups()) {
            if (null == this.graphPartitioner.getGroupPartitionerForReversedEdges(str3)) {
                throw new OperationException("graphPartitioner does not contain a partitioner for the reversed version of edge group " + str3);
            }
        }
    }

    public void writeElements(Iterable<? extends Element> iterable) throws OperationException {
        Iterator<String> it = this.schemaUtils.getGroups().iterator();
        while (it.hasNext()) {
            this.groupToPartitionIdToWriter.put(it.next(), new HashMap());
        }
        Iterator<String> it2 = this.schemaUtils.getEdgeGroups().iterator();
        while (it2.hasNext()) {
            this.groupToPartitionIdToWriterForReversedEdges.put(it2.next(), new HashMap());
        }
        try {
            try {
                LOGGER.info("Writing unsorted elements");
                _writeElements(iterable);
                LOGGER.info("Finished writing unsorted elements");
                try {
                    Iterator<Map<Integer, ParquetWriter<Element>>> it3 = this.groupToPartitionIdToWriter.values().iterator();
                    while (it3.hasNext()) {
                        closeWriters(it3.next());
                    }
                    Iterator<Map<Integer, ParquetWriter<Element>>> it4 = this.groupToPartitionIdToWriterForReversedEdges.values().iterator();
                    while (it4.hasNext()) {
                        closeWriters(it4.next());
                    }
                } catch (IOException e) {
                    throw new OperationException("IOException closing writers", e);
                }
            } catch (IOException e2) {
                throw new OperationException("Exception writing elements to temporary directory: " + this.tempFilesDir, e2);
            }
        } catch (Throwable th) {
            try {
                Iterator<Map<Integer, ParquetWriter<Element>>> it5 = this.groupToPartitionIdToWriter.values().iterator();
                while (it5.hasNext()) {
                    closeWriters(it5.next());
                }
                Iterator<Map<Integer, ParquetWriter<Element>>> it6 = this.groupToPartitionIdToWriterForReversedEdges.values().iterator();
                while (it6.hasNext()) {
                    closeWriters(it6.next());
                }
                throw th;
            } catch (IOException e3) {
                throw new OperationException("IOException closing writers", e3);
            }
        }
    }

    private void closeWriters(Map<Integer, ParquetWriter<Element>> map) throws IOException {
        for (ParquetWriter<Element> parquetWriter : map.values()) {
            LOGGER.debug("Closing writer {}", parquetWriter);
            parquetWriter.close();
        }
    }

    private void _writeElements(Iterable<? extends Element> iterable) throws IOException {
        Iterator<? extends Element> it = iterable.iterator();
        while (it.hasNext()) {
            Edge edge = (Element) it.next();
            if (this.schemaUtils.getGroups().contains(edge.getGroup())) {
                writeElement(edge);
                if (edge instanceof Edge) {
                    Edge edge2 = edge;
                    if (!edge2.getSource().equals(edge2.getDestination())) {
                        writeEdgeReversed(edge);
                    }
                }
            } else {
                LOGGER.warn("Skipped the addition of an Element of group {} as that group does not exist in the schema.", edge.getGroup());
            }
        }
        if (iterable instanceof CloseableIterable) {
            ((CloseableIterable) iterable).close();
        }
    }

    private void writeElement(Element element) throws IOException {
        String group = element.getGroup();
        ParquetWriter<Element> writer = getWriter(this.graphPartitioner.getGroupPartitioner(group).getPartitionId(new PartitionKey(this.schemaUtils.getConverter(group).corePropertiesToParquetObjects(element))), group, false);
        if (null != writer) {
            writer.write(element);
        } else {
            LOGGER.warn("Skipped the addition of an Element of group {} as that group does not exist in the schema.", group);
        }
    }

    private void writeEdgeReversed(Edge edge) throws IOException {
        String group = edge.getGroup();
        ParquetWriter<Element> writer = getWriter(this.graphPartitioner.getGroupPartitionerForReversedEdges(group).getPartitionId(new PartitionKey(this.schemaUtils.getConverter(group).corePropertiesToParquetObjectsForReversedEdge(edge))), group, true);
        if (null != writer) {
            writer.write(edge);
        } else {
            LOGGER.warn("Skipped the addition of an Element of group {} as that group does not exist in the schema.", group);
        }
    }

    private ParquetWriter<Element> getWriter(int i, String str, boolean z) throws IOException {
        ParquetWriter<Element> parquetWriter;
        if (z) {
            parquetWriter = this.groupToPartitionIdToWriterForReversedEdges.get(str).get(Integer.valueOf(i));
            if (null == parquetWriter) {
                parquetWriter = buildWriter(str, z, Integer.valueOf(i));
            }
            this.groupToPartitionIdToWriterForReversedEdges.get(str).put(Integer.valueOf(i), parquetWriter);
        } else {
            parquetWriter = this.groupToPartitionIdToWriter.get(str).get(Integer.valueOf(i));
            if (null == parquetWriter) {
                parquetWriter = buildWriter(str, z, Integer.valueOf(i));
            }
            this.groupToPartitionIdToWriter.get(str).put(Integer.valueOf(i), parquetWriter);
        }
        return parquetWriter;
    }

    private ParquetWriter<Element> buildWriter(String str, boolean z, Integer num) throws IOException {
        Path path = new Path(!z ? this.fileNameForGroupAndPartitionId.apply(str, num) + "/part-" + TaskContext.getPartitionId() + ".parquet" : this.fileNameForGroupAndPartitionIdForReversedEdges.apply(str, num) + "/part-" + TaskContext.getPartitionId() + ".parquet");
        LOGGER.info("Creating a new writer for group {}, partition id {} in path {}", new Object[]{str, num, path});
        return ((ParquetElementWriter.Builder) new ParquetElementWriter.Builder(path).withType(this.schemaUtils.getParquetSchema(str)).usingConverter(this.schemaUtils.getConverter(str)).withCompressionCodec(this.compressionCodecName)).withSparkSchema(this.schemaUtils.getSparkSchema(str)).build();
    }
}
