package uk.gov.gchq.palisade.service.data.avro;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.gchq.palisade.data.serialise.Serialiser;

/* loaded from: input_file:uk/gov/gchq/palisade/service/data/avro/AvroSerialiser.class */
public class AvroSerialiser<O> implements Serialiser<O> {
    private final Schema schema;
    private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerialiser.class);
    private static final int PARALLELISM = 1;
    private static final Executor EXECUTOR = Executors.newFixedThreadPool(PARALLELISM);

    @JsonCreator
    public AvroSerialiser(@JsonProperty("domainClass") Class<O> cls) {
        Objects.requireNonNull(cls, "domainClass is required");
        this.schema = ReflectData.AllowNull.get().getSchema(cls);
    }

    public Stream<O> deserialise(InputStream inputStream) {
        try {
            return StreamSupport.stream(new DataFileStream(inputStream, new ReflectDatumReader(this.schema)).spliterator(), false);
        } catch (IOException e) {
            throw new UncheckedIOException("An error occurred during deserialisaton", e);
        }
    }

    public InputStream serialise(Stream<O> stream) {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        try {
            pipedOutputStream.connect(pipedInputStream);
            EXECUTOR.execute(() -> {
                try {
                    try {
                        DataFileWriter dataFileWriter = new DataFileWriter(new ReflectDatumWriter(this.schema));
                        try {
                            if (Objects.nonNull(stream)) {
                                LOGGER.debug("Creating data file writer");
                                dataFileWriter.create(this.schema, pipedOutputStream);
                                Iterator it = stream.iterator();
                                while (it.hasNext()) {
                                    dataFileWriter.append(it.next());
                                }
                            }
                            dataFileWriter.close();
                        } catch (Throwable th) {
                            try {
                                dataFileWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    } finally {
                        try {
                            pipedOutputStream.flush();
                            pipedOutputStream.close();
                        } catch (IOException e) {
                            LOGGER.warn("Failed to close {}", pipedOutputStream.getClass(), e);
                        }
                    }
                } catch (IOException e2) {
                    throw new UncheckedIOException("An error occurred during serialisation", e2);
                }
            });
            return pipedInputStream;
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to connect input and output stream pipes", e);
        }
    }
}
