package uk.gov.gchq.gaffer.parquetstore.operation.handler.spark.utilities;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.parquetstore.io.writer.ParquetElementWriter;
import uk.gov.gchq.gaffer.parquetstore.utils.SchemaUtils;
import uk.gov.gchq.gaffer.store.schema.Schema;

/* loaded from: input_file:uk/gov/gchq/gaffer/parquetstore/operation/handler/spark/utilities/WriteData.class */
public class WriteData implements VoidFunction<Iterator<Element>> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) WriteData.class);
    private final HashMap<String, String> groupToDirectory = new HashMap<>();
    private final byte[] schemaAsJson;
    private final CompressionCodecName compressionCodecName;

    public WriteData(Function<String, String> function, Schema schema, CompressionCodecName compressionCodecName) {
        for (String str : schema.getGroups()) {
            this.groupToDirectory.put(str, function.apply(str));
        }
        this.schemaAsJson = schema.toCompactJson();
        this.compressionCodecName = compressionCodecName;
    }

    public void call(Iterator<Element> it) throws Exception {
        TaskContext taskContext = TaskContext.get();
        if (null == taskContext) {
            throw new OperationException("Method call(Iterator<Element>) should only be called from within a Spark job");
        }
        call(it, taskContext.partitionId(), taskContext.taskAttemptId());
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    void call(Iterator<Element> it, int i, long j) throws Exception {
        SchemaUtils schemaUtils = new SchemaUtils(Schema.fromJson((byte[][]) new byte[]{this.schemaAsJson}));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : schemaUtils.getGroups()) {
            hashMap2.put(str, new Path(this.groupToDirectory.get(str) + "/input-" + i + "-" + j + ".parquet"));
            hashMap.put(str, buildWriter(str, hashMap2.get(str), schemaUtils));
        }
        writeData(it, i, j, hashMap);
        renameFiles(i, j, schemaUtils.getGroups(), hashMap2);
    }

    private void writeData(Iterator<Element> it, int i, long j, Map<String, ParquetWriter<Element>> map) throws IOException {
        while (it.hasNext()) {
            Element next = it.next();
            map.get(next.getGroup()).write(next);
        }
        LOGGER.info("Finished writing elements for partition id {} and task attempt id {} to {}", Integer.valueOf(i), Long.valueOf(j), this.groupToDirectory);
        for (ParquetWriter<Element> parquetWriter : map.values()) {
            LOGGER.debug("Closing writer {}", parquetWriter);
            parquetWriter.close();
        }
    }

    private void renameFiles(int i, long j, Set<String> set, Map<String, Path> map) throws Exception {
        LOGGER.info("Renaming output files from {} to {}", "input-" + i + "-" + j + ".parquet", "input-" + i);
        FileContext fileContext = FileContext.getFileContext(new Configuration());
        for (String str : set) {
            Path path = map.get(str);
            Path path2 = new Path(this.groupToDirectory.get(str) + "/" + ("input-" + i + ".parquet"));
            try {
                fileContext.rename(path, path2, new Options.Rename[]{Options.Rename.NONE});
                LOGGER.debug("Renamed {} to {}", path, path2);
            } catch (FileAlreadyExistsException e) {
                LOGGER.debug("Not renaming {} to {} as the destination already exists", path, path2);
            }
        }
    }

    private ParquetWriter<Element> buildWriter(String str, Path path, SchemaUtils schemaUtils) throws IOException {
        LOGGER.info("Creating a new writer for group {} at path {}", str, path);
        return ((ParquetElementWriter.Builder) new ParquetElementWriter.Builder(path).withType(schemaUtils.getParquetSchema(str)).usingConverter(schemaUtils.getConverter(str)).withCompressionCodec(this.compressionCodecName)).withSparkSchema(schemaUtils.getSparkSchema(str)).build();
    }
}
