package eu.stratosphere.nephele.io.library;

import eu.stratosphere.core.fs.FSDataInputStream;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractFileInputTask;
import eu.stratosphere.nephele.types.FileRecord;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/io/library/DirectoryReader.class */
public class DirectoryReader extends AbstractFileInputTask {
    private RecordWriter<FileRecord> output = null;
    private byte[] buffer;
    private static final Log LOG = LogFactory.getLog(DirectoryReader.class);

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void invoke() throws Exception {
        Iterator<FileInputSplit> fileInputSplits = getFileInputSplits();
        FileRecord fileRecord = null;
        while (fileInputSplits.hasNext()) {
            FileInputSplit next = fileInputSplits.next();
            long start = next.getStart();
            long length = start + next.getLength();
            if (this.buffer == null || this.buffer.length < length - start) {
                this.buffer = new byte[(int) (length - start)];
            }
            if (fileRecord == null || fileRecord.getFileName().compareTo(next.getPath().getName()) != 0) {
                if (fileRecord != null) {
                    try {
                        this.output.emit(fileRecord);
                    } catch (InterruptedException e) {
                        LOG.error(e);
                    }
                }
                fileRecord = new FileRecord(next.getPath().getName());
            }
            FSDataInputStream open = FileSystem.get(next.getPath().toUri()).open(next.getPath());
            open.seek(next.getStart());
            int read = open.read(this.buffer, 0, this.buffer.length);
            if (read != -1) {
                fileRecord.append(this.buffer, 0, read);
                if (read != length - start) {
                    System.err.println("Unexpected number of bytes read! Expected: " + (length - start) + " Read: " + read);
                }
            }
        }
        if (fileRecord != null) {
            try {
                this.output.emit(fileRecord);
            } catch (InterruptedException e2) {
                LOG.error(e2);
            }
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void registerInputOutput() {
        this.output = new RecordWriter<>(this, FileRecord.class);
    }
}
