package com.digitalpebble.stormcrawler.spout;

import com.digitalpebble.stormcrawler.Constants;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.StringTabScheme;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/spout/FileSpout.class */
public class FileSpout extends BaseRichSpout {
    public static final int BATCH_SIZE = 10000;
    public static final Logger LOG = LoggerFactory.getLogger(FileSpout.class);
    protected SpoutOutputCollector _collector;
    private final Queue<String> _inputFiles;
    private BufferedReader currentBuffer;
    protected Scheme _scheme;
    protected LinkedList<byte[]> buffer;
    protected boolean active;
    private boolean withDiscoveredStatus;

    public FileSpout(String str, String str2) {
        this(str, str2, false);
    }

    public FileSpout(String... strArr) {
        this(false, strArr);
    }

    public FileSpout(String str, String str2, boolean z) {
        this._scheme = new StringTabScheme();
        this.buffer = new LinkedList<>();
        this.withDiscoveredStatus = false;
        this.withDiscoveredStatus = z;
        Path path = Paths.get(str, new String[0]);
        this._inputFiles = new LinkedList();
        LOG.info("Reading directory: {} (filter: {})", path, str2);
        try {
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(path, str2);
            try {
                Iterator<Path> it = newDirectoryStream.iterator();
                while (it.hasNext()) {
                    String path2 = it.next().toAbsolutePath().toString();
                    this._inputFiles.add(path2);
                    LOG.info("Input : {}", path2);
                }
                if (newDirectoryStream != null) {
                    newDirectoryStream.close();
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("IOException: %s%n", e);
        }
    }

    public FileSpout(boolean z, String... strArr) {
        this._scheme = new StringTabScheme();
        this.buffer = new LinkedList<>();
        this.withDiscoveredStatus = false;
        this.withDiscoveredStatus = z;
        if (strArr.length == 0) {
            throw new IllegalArgumentException("Must configure at least one inputFile");
        }
        this._inputFiles = new LinkedList();
        Collections.addAll(this._inputFiles, strArr);
    }

    public void setScheme(Scheme scheme) {
        this._scheme = scheme;
    }

    protected void populateBuffer() throws IOException {
        if (this.currentBuffer == null) {
            String poll = this._inputFiles.poll();
            if (poll == null) {
                return;
            } else {
                this.currentBuffer = new BufferedReader(new InputStreamReader(new FileInputStream(Paths.get(poll, new String[0]).toFile()), StandardCharsets.UTF_8));
            }
        }
        String str = null;
        int i = 0;
        while (i < 10000) {
            String readLine = this.currentBuffer.readLine();
            str = readLine;
            if (readLine == null) {
                break;
            }
            if (!StringUtils.isBlank(str) && !str.startsWith("#")) {
                this.buffer.add(str.trim().getBytes(StandardCharsets.UTF_8));
                i++;
            }
        }
        if (str == null) {
            this.currentBuffer.close();
            this.currentBuffer = null;
        }
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        try {
            populateBuffer();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void nextTuple() {
        if (this.active) {
            if (this.buffer.isEmpty()) {
                try {
                    populateBuffer();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.buffer.isEmpty()) {
                return;
            }
            byte[] removeFirst = this.buffer.removeFirst();
            List deserialize = this._scheme.deserialize(ByteBuffer.wrap(removeFirst));
            if (!this.withDiscoveredStatus) {
                this._collector.emit(deserialize, removeFirst);
            } else {
                deserialize.add(Status.DISCOVERED);
                this._collector.emit(Constants.StatusStreamName, deserialize, removeFirst);
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this._scheme.getOutputFields());
        if (this.withDiscoveredStatus) {
            List list = this._scheme.getOutputFields().toList();
            list.add(Constants.StatusStreamName);
            outputFieldsDeclarer.declareStream(Constants.StatusStreamName, new Fields(list));
        }
    }

    public void close() {
    }

    public void activate() {
        super.activate();
        this.active = true;
    }

    public void deactivate() {
        super.deactivate();
        this.active = false;
    }

    public void ack(Object obj) {
    }

    public void fail(Object obj) {
        if (!(obj instanceof byte[])) {
            LOG.error("Failed - unknown message ID type `{}': {}", obj.getClass().getCanonicalName(), obj);
            throw new IllegalStateException("Unknown message ID type: " + obj.getClass().getCanonicalName());
        }
        LOG.error("Failed - adding back to the queue: {}", new String((byte[]) obj));
        this.buffer.add((byte[]) obj);
    }
}
