package eu.stratosphere.api.common.io;

import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FSDataOutputStream;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/api/common/io/FileOutputFormat.class */
public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
    private static final long serialVersionUID = 1;
    private static final Log LOG = LogFactory.getLog(FileOutputFormat.class);
    public static final String FILE_PARAMETER_KEY = "pact.output.file";
    public static final String OUTPUT_STREAM_OPEN_TIMEOUT_KEY = "pact.output.file.timeout";
    protected Path outputFilePath;
    protected FSDataOutputStream stream;
    private long openTimeout;

    /* loaded from: input_file:eu/stratosphere/api/common/io/FileOutputFormat$AbstractConfigBuilder.class */
    public static abstract class AbstractConfigBuilder<T> {
        protected final Configuration config;

        protected AbstractConfigBuilder(Configuration configuration) {
            this.config = configuration;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public T openingTimeout(int i) {
            this.config.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, i);
            return this;
        }
    }

    /* loaded from: input_file:eu/stratosphere/api/common/io/FileOutputFormat$ConfigBuilder.class */
    public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
        protected ConfigBuilder(Configuration configuration) {
            super(configuration);
        }
    }

    /* loaded from: input_file:eu/stratosphere/api/common/io/FileOutputFormat$OutputPathOpenThread.class */
    private static final class OutputPathOpenThread extends Thread {
        private final Path path;
        private final int taskIndex;
        private final long timeoutMillies;
        private volatile FSDataOutputStream fdos;
        private volatile Throwable error;
        private volatile boolean aborted;

        public OutputPathOpenThread(Path path, int i, long j) {
            this.path = path;
            this.timeoutMillies = j;
            this.taskIndex = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Path path = this.path;
                FileSystem fileSystem = path.getFileSystem();
                if (fileSystem.exists(path) && fileSystem.getFileStatus(path).isDir()) {
                    path = path.suffix(Path.SEPARATOR + this.taskIndex);
                }
                if (fileSystem.exists(path)) {
                    fileSystem.delete(path, false);
                }
                this.fdos = fileSystem.create(path, true);
                if (this.aborted) {
                    FSDataOutputStream fSDataOutputStream = this.fdos;
                    this.fdos = null;
                    fSDataOutputStream.close();
                }
            } catch (Throwable th) {
                this.error = th;
            }
        }

        public FSDataOutputStream waitForCompletion() throws Exception {
            long currentTimeMillis;
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = this.timeoutMillies;
            do {
                try {
                    join(j);
                    if (this.error != null || this.fdos != null) {
                        break;
                    }
                    currentTimeMillis = (this.timeoutMillies + currentTimeMillis2) - System.currentTimeMillis();
                    j = currentTimeMillis;
                } catch (InterruptedException e) {
                    abortWait();
                    throw e;
                }
            } while (currentTimeMillis > 0);
            if (this.error != null) {
                throw new IOException("Opening the file output stream failed" + (this.error.getMessage() == null ? Path.CUR_DIR : ": " + this.error.getMessage()), this.error);
            }
            if (this.fdos != null) {
                return this.fdos;
            }
            abortWait();
            boolean isAlive = isAlive();
            StringBuilder sb = new StringBuilder(256);
            for (StackTraceElement stackTraceElement : getStackTrace()) {
                sb.append("\tat ").append(stackTraceElement.toString()).append('\n');
            }
            throw new IOException("Output opening request timed out. Opener was " + (isAlive ? "" : "NOT ") + " alive. Stack:\n" + sb.toString());
        }

        private final void abortWait() {
            this.aborted = true;
            FSDataOutputStream fSDataOutputStream = this.fdos;
            this.fdos = null;
            if (fSDataOutputStream != null) {
                try {
                    fSDataOutputStream.close();
                } catch (Throwable th) {
                }
            }
        }
    }

    @Override // eu.stratosphere.api.common.io.OutputFormat
    public void configure(Configuration configuration) {
        String string = configuration.getString(FILE_PARAMETER_KEY, null);
        if (string == null) {
            throw new IllegalArgumentException("Configuration file FileOutputFormat does not contain the file path.");
        }
        try {
            this.outputFilePath = new Path(string);
            this.openTimeout = configuration.getLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, FileInputFormat.DEFAULT_OPENING_TIMEOUT);
            if (this.openTimeout >= 0) {
                if (this.openTimeout == 0) {
                    this.openTimeout = Long.MAX_VALUE;
                }
            } else {
                this.openTimeout = FileInputFormat.DEFAULT_OPENING_TIMEOUT;
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Ignoring invalid parameter for stream opening timeout (requires a positive value or zero=infinite): " + this.openTimeout);
                }
            }
        } catch (RuntimeException e) {
            throw new RuntimeException("Could not create a valid URI from the given file path name: " + e.getMessage());
        }
    }

    @Override // eu.stratosphere.api.common.io.OutputFormat
    public void open(int i) throws IOException {
        OutputPathOpenThread outputPathOpenThread = new OutputPathOpenThread(this.outputFilePath, i, this.openTimeout);
        outputPathOpenThread.start();
        try {
            this.stream = outputPathOpenThread.waitForCompletion();
        } catch (Exception e) {
            throw new RuntimeException("Stream to output file could not be opened: " + e.getMessage(), e);
        }
    }

    @Override // eu.stratosphere.api.common.io.OutputFormat
    public void close() throws IOException {
        FSDataOutputStream fSDataOutputStream = this.stream;
        if (fSDataOutputStream != null) {
            this.stream = null;
            fSDataOutputStream.close();
        }
    }

    public static ConfigBuilder configureFileFormat(FileDataSink fileDataSink) {
        return new ConfigBuilder(fileDataSink.getParameters());
    }
}
