package com.digitalpebble.stormcrawler.spout;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.util.StringTabScheme;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/spout/MemorySpout.class */
public class MemorySpout extends BaseRichSpout {
    private SpoutOutputCollector _collector;
    private StringTabScheme scheme = new StringTabScheme();
    private boolean active = true;
    private String[] startingURLs;
    private static final Logger LOG = LoggerFactory.getLogger(MemorySpout.class);
    private static PriorityQueue<ScheduledURL> queue = new PriorityQueue<>();

    public MemorySpout(String... strArr) {
        this.startingURLs = strArr;
    }

    public static void add(String str, Metadata metadata, Date date) {
        LOG.debug("Adding {} with md {} and nextFetch {}", new Object[]{str, metadata, date});
        ScheduledURL scheduledURL = new ScheduledURL(str, metadata, date);
        synchronized (queue) {
            queue.add(scheduledURL);
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        if (topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size() > 1) {
            throw new RuntimeException("Can't have more than one instance of the MemorySpout");
        }
        Date date = new Date();
        for (String str : this.startingURLs) {
            LOG.debug("About to deserialize {} ", str);
            List<Object> deserialize = this.scheme.deserialize(ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)));
            add((String) deserialize.get(0), (Metadata) deserialize.get(1), date);
        }
        topologyContext.registerMetric("queue_size", new IMetric() { // from class: com.digitalpebble.stormcrawler.spout.MemorySpout.1
            public Object getValueAndReset() {
                return Integer.valueOf(MemorySpout.queue.size());
            }
        }, 10);
    }

    public void nextTuple() {
        if (this.active) {
            synchronized (queue) {
                ScheduledURL poll = queue.poll();
                if (poll == null) {
                    return;
                }
                if (poll.nextFetchDate.after(new Date())) {
                    LOG.debug("Tuple {} not ready for fetching", poll.URL);
                    queue.add(poll);
                } else {
                    LinkedList linkedList = new LinkedList();
                    linkedList.add(poll.URL);
                    linkedList.add(poll.m);
                    this._collector.emit(linkedList, poll.URL);
                }
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.scheme.getOutputFields());
    }

    public void activate() {
        super.activate();
        this.active = true;
    }

    public void deactivate() {
        super.deactivate();
        this.active = false;
    }
}
