package com.digitalpebble.storm.crawler.bolt;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.metric.api.MultiReducedMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.digitalpebble.storm.crawler.Constants;
import com.digitalpebble.storm.crawler.Metadata;
import com.digitalpebble.storm.crawler.filtering.URLFilters;
import com.digitalpebble.storm.crawler.persistence.Status;
import com.digitalpebble.storm.crawler.protocol.HttpHeaders;
import com.digitalpebble.storm.crawler.protocol.Protocol;
import com.digitalpebble.storm.crawler.protocol.ProtocolFactory;
import com.digitalpebble.storm.crawler.protocol.ProtocolResponse;
import com.digitalpebble.storm.crawler.util.ConfUtils;
import com.digitalpebble.storm.crawler.util.MetadataTransfer;
import com.digitalpebble.storm.crawler.util.PerSecondReducer;
import com.digitalpebble.storm.crawler.util.URLUtil;
import crawlercommons.robots.BaseRobotRules;
import crawlercommons.url.PaidLevelDomain;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.guava.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/storm/crawler/bolt/FetcherBolt.class */
public class FetcherBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(FetcherBolt.class);
    private FetchItemQueues fetchQueues;
    private OutputCollector _collector;
    private MultiCountMetric eventCounter;
    private MultiReducedMetric averagedMetrics;
    private ProtocolFactory protocolFactory;
    private URLFilters urlFilters;
    private boolean allowRedirs;
    private MetadataTransfer metadataTransfer;
    private MultiReducedMetric perSecMetrics;
    private final AtomicInteger activeThreads = new AtomicInteger(0);
    private final AtomicInteger spinWaiting = new AtomicInteger(0);
    private final List<Tuple> ackQueue = Collections.synchronizedList(new LinkedList());
    private final List<Object[]> emitQueue = Collections.synchronizedList(new LinkedList());
    private int taskIndex = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/digitalpebble/storm/crawler/bolt/FetcherBolt$FetchItem.class */
    public static class FetchItem {
        String queueID;
        String url;
        URL u;
        Tuple t;

        public FetchItem(String str, URL url, Tuple tuple, String str2) {
            this.url = str;
            this.u = url;
            this.queueID = str2;
            this.t = tuple;
        }

        public static FetchItem create(URL url, Tuple tuple, String str) {
            String host;
            String externalForm = url.toExternalForm();
            String str2 = null;
            if (tuple.contains("key")) {
                str2 = tuple.getStringByField("key");
            }
            if (StringUtils.isNotBlank(str2)) {
                return new FetchItem(externalForm, url, tuple, str2.toLowerCase(Locale.ROOT));
            }
            if ("byIP".equalsIgnoreCase(str)) {
                try {
                    host = InetAddress.getByName(url.getHost()).getHostAddress();
                } catch (UnknownHostException e) {
                    FetcherBolt.LOG.warn("Unable to resolve IP for {}, using hostname as key.", url.getHost());
                    host = url.getHost();
                }
            } else if ("byDomain".equalsIgnoreCase(str)) {
                host = PaidLevelDomain.getPLD(url.getHost());
                if (host == null) {
                    FetcherBolt.LOG.warn("Unknown domain for url: {}, using hostname as key", externalForm);
                    host = url.getHost();
                }
            } else {
                host = url.getHost();
            }
            if (host == null) {
                FetcherBolt.LOG.warn("Unknown host for url: {}, using URL string as key", externalForm);
                host = url.toExternalForm();
            }
            return new FetchItem(externalForm, url, tuple, host.toLowerCase(Locale.ROOT));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/digitalpebble/storm/crawler/bolt/FetcherBolt$FetchItemQueue.class */
    public static class FetchItemQueue {
        Deque<FetchItem> queue = new LinkedBlockingDeque();
        AtomicInteger inProgress = new AtomicInteger();
        AtomicLong nextFetchTime = new AtomicLong();
        long crawlDelay;
        final long minCrawlDelay;
        final int maxThreads;

        public FetchItemQueue(Config config, int i, long j, long j2) {
            this.maxThreads = i;
            this.crawlDelay = j;
            this.minCrawlDelay = j2;
            setEndTime(System.currentTimeMillis() - j);
        }

        public int getQueueSize() {
            return this.queue.size();
        }

        public int getInProgressSize() {
            return this.inProgress.get();
        }

        public void finishFetchItem(FetchItem fetchItem, boolean z) {
            if (fetchItem != null) {
                this.inProgress.decrementAndGet();
                setEndTime(System.currentTimeMillis(), z);
            }
        }

        public void addFetchItem(FetchItem fetchItem) {
            this.queue.add(fetchItem);
        }

        public FetchItem getFetchItem() {
            if (this.inProgress.get() >= this.maxThreads) {
                return null;
            }
            if (this.nextFetchTime.get() > System.currentTimeMillis()) {
                return null;
            }
            FetchItem fetchItem = null;
            if (this.queue.size() == 0) {
                return null;
            }
            try {
                fetchItem = this.queue.removeFirst();
                this.inProgress.incrementAndGet();
            } catch (Exception e) {
                FetcherBolt.LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
            }
            return fetchItem;
        }

        private void setEndTime(long j) {
            setEndTime(j, false);
        }

        private void setEndTime(long j, boolean z) {
            if (z) {
                this.nextFetchTime.set(j);
            } else {
                this.nextFetchTime.set(j + (this.maxThreads > 1 ? this.minCrawlDelay : this.crawlDelay));
            }
        }
    }

    /* loaded from: input_file:com/digitalpebble/storm/crawler/bolt/FetcherBolt$FetchItemQueues.class */
    private static class FetchItemQueues {
        Map<String, FetchItemQueue> queues = new LinkedHashMap();
        Iterator<String> it = Iterables.cycle(this.queues.keySet()).iterator();
        AtomicInteger inQueues = new AtomicInteger(0);
        final int defaultMaxThread;
        final long crawlDelay;
        final long minCrawlDelay;
        final Config conf;
        public static final String QUEUE_MODE_HOST = "byHost";
        public static final String QUEUE_MODE_DOMAIN = "byDomain";
        public static final String QUEUE_MODE_IP = "byIP";
        String queueMode;

        public FetchItemQueues(Config config) {
            this.conf = config;
            this.defaultMaxThread = ConfUtils.getInt(config, "fetcher.threads.per.queue", 1);
            this.queueMode = ConfUtils.getString(config, "fetcher.queue.mode", "byHost");
            if (!this.queueMode.equals("byIP") && !this.queueMode.equals("byDomain") && !this.queueMode.equals("byHost")) {
                FetcherBolt.LOG.error("Unknown partition mode : {} - forcing to byHost", this.queueMode);
                this.queueMode = "byHost";
            }
            FetcherBolt.LOG.info("Using queue mode : {}", this.queueMode);
            this.crawlDelay = ConfUtils.getFloat(config, "fetcher.server.delay", 1.0f) * 1000.0f;
            this.minCrawlDelay = ConfUtils.getFloat(config, "fetcher.server.min.delay", 0.0f) * 1000.0f;
        }

        public synchronized void addFetchItem(URL url, Tuple tuple) {
            FetchItem create = FetchItem.create(url, tuple, this.queueMode);
            getFetchItemQueue(create.queueID).addFetchItem(create);
            this.inQueues.incrementAndGet();
        }

        public synchronized void finishFetchItem(FetchItem fetchItem, boolean z) {
            FetchItemQueue fetchItemQueue = this.queues.get(fetchItem.queueID);
            if (fetchItemQueue == null) {
                FetcherBolt.LOG.warn("Attempting to finish item from unknown queue: {}", fetchItem.queueID);
            } else {
                fetchItemQueue.finishFetchItem(fetchItem, z);
            }
        }

        public synchronized FetchItemQueue getFetchItemQueue(String str) {
            FetchItemQueue fetchItemQueue = this.queues.get(str);
            if (fetchItemQueue == null) {
                fetchItemQueue = new FetchItemQueue(this.conf, ConfUtils.getInt(this.conf, "fetcher.maxThreads." + str, this.defaultMaxThread), this.crawlDelay, this.minCrawlDelay);
                this.queues.put(str, fetchItemQueue);
                this.it = Iterables.cycle(this.queues.keySet()).iterator();
            }
            return fetchItemQueue;
        }

        public synchronized FetchItem getFetchItem() {
            if (this.queues.isEmpty() || !this.it.hasNext()) {
                return null;
            }
            FetchItemQueue fetchItemQueue = null;
            do {
                FetchItemQueue fetchItemQueue2 = this.queues.get(this.it.next());
                if (fetchItemQueue2.getQueueSize() == 0 && fetchItemQueue2.getInProgressSize() == 0) {
                    this.it.remove();
                } else {
                    if (fetchItemQueue == null) {
                        fetchItemQueue = fetchItemQueue2;
                    } else if (fetchItemQueue == fetchItemQueue2) {
                        return null;
                    }
                    FetchItem fetchItem = fetchItemQueue2.getFetchItem();
                    if (fetchItem != null) {
                        this.inQueues.decrementAndGet();
                        return fetchItem;
                    }
                }
            } while (this.it.hasNext());
            return null;
        }
    }

    /* loaded from: input_file:com/digitalpebble/storm/crawler/bolt/FetcherBolt$FetcherThread.class */
    private class FetcherThread extends Thread {
        private final long maxCrawlDelay;

        public FetcherThread(Config config) {
            setDaemon(true);
            setName("FetcherThread");
            this.maxCrawlDelay = ConfUtils.getInt(config, "fetcher.max.crawl.delay", 30) * 1000;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                FetchItem fetchItem = FetcherBolt.this.fetchQueues.getFetchItem();
                if (fetchItem == null) {
                    FetcherBolt.LOG.debug("{} spin-waiting ...", getName());
                    FetcherBolt.this.spinWaiting.incrementAndGet();
                    try {
                        Thread.sleep(100L);
                    } catch (Exception e) {
                    }
                    FetcherBolt.this.spinWaiting.decrementAndGet();
                } else {
                    FetcherBolt.this.activeThreads.incrementAndGet();
                    FetcherBolt.LOG.info("[Fetcher #{}] {}  => activeThreads={}, spinWaiting={}, queueID={}", new Object[]{Integer.valueOf(FetcherBolt.this.taskIndex), getName(), FetcherBolt.this.activeThreads, FetcherBolt.this.spinWaiting, fetchItem.queueID});
                    Metadata metadata = null;
                    if (fetchItem.t.contains("metadata")) {
                        metadata = (Metadata) fetchItem.t.getValueByField("metadata");
                    }
                    if (metadata == null) {
                        metadata = Metadata.empty;
                    }
                    try {
                        try {
                            Protocol protocol = FetcherBolt.this.protocolFactory.getProtocol(new URL(fetchItem.url));
                            if (protocol == null) {
                                throw new RuntimeException("No protocol implementation found for " + fetchItem.url);
                                break;
                            }
                            BaseRobotRules robotRules = protocol.getRobotRules(fetchItem.url);
                            if (robotRules.isAllowed(fetchItem.u.toString())) {
                                if (robotRules.getCrawlDelay() > 0) {
                                    if (robotRules.getCrawlDelay() <= this.maxCrawlDelay || this.maxCrawlDelay < 0) {
                                        FetchItemQueue fetchItemQueue = FetcherBolt.this.fetchQueues.getFetchItemQueue(fetchItem.queueID);
                                        fetchItemQueue.crawlDelay = robotRules.getCrawlDelay();
                                        FetcherBolt.LOG.info("Crawl delay for queue: {}  is set to {} as per robots.txt. url: {}", new Object[]{fetchItem.queueID, Long.valueOf(fetchItemQueue.crawlDelay), fetchItem.url});
                                    } else {
                                        FetcherBolt.LOG.info("Crawl-Delay for {} too long ({}), skipping", fetchItem.url, Long.valueOf(robotRules.getCrawlDelay()));
                                        metadata.setValue("error.cause", "crawl_delay");
                                        FetcherBolt.this.emitQueue.add(new Object[]{Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.ERROR})});
                                        FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                                        FetcherBolt.this.activeThreads.decrementAndGet();
                                        synchronized (FetcherBolt.this.ackQueue) {
                                            FetcherBolt.this.ackQueue.add(fetchItem.t);
                                        }
                                    }
                                }
                                long currentTimeMillis = System.currentTimeMillis();
                                ProtocolResponse protocolOutput = protocol.getProtocolOutput(fetchItem.url, metadata);
                                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                FetcherBolt.this.averagedMetrics.scope("fetch_time").update(Long.valueOf(currentTimeMillis2));
                                FetcherBolt.this.averagedMetrics.scope("bytes_fetched").update(Integer.valueOf(protocolOutput.getContent().length));
                                FetcherBolt.this.perSecMetrics.scope("bytes_fetched_perSec").update(Integer.valueOf(protocolOutput.getContent().length));
                                FetcherBolt.this.perSecMetrics.scope("fetched_perSec").update(1);
                                FetcherBolt.this.eventCounter.scope("fetched").incrBy(1L);
                                FetcherBolt.LOG.info("[Fetcher #{}] Fetched {} with status {} in msec {}", new Object[]{Integer.valueOf(FetcherBolt.this.taskIndex), fetchItem.url, Integer.valueOf(protocolOutput.getStatusCode()), Long.valueOf(currentTimeMillis2)});
                                protocolOutput.getMetadata().setValue("fetch.statusCode", Integer.toString(protocolOutput.getStatusCode()));
                                protocolOutput.getMetadata().putAll(metadata);
                                Status fromHTTPCode = Status.fromHTTPCode(protocolOutput.getStatusCode());
                                if (fromHTTPCode.equals(Status.FETCHED)) {
                                    if (protocolOutput.getStatusCode() != 304) {
                                        FetcherBolt.this.emitQueue.add(new Object[]{"default", fetchItem.t, new Values(new Object[]{fetchItem.url, protocolOutput.getContent(), protocolOutput.getMetadata()})});
                                    }
                                } else if (fromHTTPCode.equals(Status.REDIRECTION)) {
                                    FetcherBolt.this.emitQueue.add(new Object[]{Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, protocolOutput.getMetadata(), fromHTTPCode})});
                                    String[] values = protocolOutput.getMetadata().getValues(HttpHeaders.LOCATION);
                                    if (FetcherBolt.this.allowRedirs && values != null && values.length != 0 && values[0] != null) {
                                        FetcherBolt.this.handleRedirect(fetchItem.t, fetchItem.url, values[0], protocolOutput.getMetadata());
                                    }
                                } else {
                                    FetcherBolt.this.emitQueue.add(new Object[]{Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, protocolOutput.getMetadata(), fromHTTPCode})});
                                }
                                FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, false);
                                FetcherBolt.this.activeThreads.decrementAndGet();
                                synchronized (FetcherBolt.this.ackQueue) {
                                    FetcherBolt.this.ackQueue.add(fetchItem.t);
                                }
                            } else {
                                FetcherBolt.LOG.info("Denied by robots.txt: {}", fetchItem.url);
                                metadata.setValue("error.cause", "robots.txt");
                                FetcherBolt.this.emitQueue.add(new Object[]{Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.ERROR})});
                                FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                                FetcherBolt.this.activeThreads.decrementAndGet();
                                synchronized (FetcherBolt.this.ackQueue) {
                                    FetcherBolt.this.ackQueue.add(fetchItem.t);
                                }
                            }
                        } catch (Exception e2) {
                            String message = e2.getMessage();
                            if (message == null) {
                                message = "";
                            }
                            if (e2.getCause() instanceof TimeoutException) {
                                FetcherBolt.LOG.error("Socket timeout fetching {}", fetchItem.url);
                            } else if (message.contains(" timed out")) {
                                FetcherBolt.LOG.error("Socket timeout fetching {}", fetchItem.url);
                            } else {
                                FetcherBolt.LOG.error("Exception while fetching {}", fetchItem.url, e2);
                            }
                            if (metadata.size() == 0) {
                                metadata = new Metadata();
                            }
                            metadata.setValue("fetch.exception", message);
                            FetcherBolt.this.emitQueue.add(new Object[]{Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.FETCH_ERROR})});
                            FetcherBolt.this.eventCounter.scope("exception").incrBy(1L);
                            FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                            FetcherBolt.this.activeThreads.decrementAndGet();
                            synchronized (FetcherBolt.this.ackQueue) {
                                FetcherBolt.this.ackQueue.add(fetchItem.t);
                            }
                        }
                    } catch (Throwable th) {
                        FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                        FetcherBolt.this.activeThreads.decrementAndGet();
                        synchronized (FetcherBolt.this.ackQueue) {
                            FetcherBolt.this.ackQueue.add(fetchItem.t);
                            throw th;
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRedirect(Tuple tuple, String str, String str2, Metadata metadata) {
        try {
            URL url = new URL(str);
            str2 = URLUtil.resolveURL(url, str2).toExternalForm();
            if (this.urlFilters != null) {
                str2 = this.urlFilters.filter(url, metadata, str2);
            }
            if (str2 == null) {
                return;
            }
            this.emitQueue.add(new Object[]{Constants.StatusStreamName, tuple, new Values(new Object[]{str2, this.metadataTransfer.getMetaForOutlink(str2, str, metadata), Status.DISCOVERED})});
        } catch (MalformedURLException e) {
            LOG.debug("MalformedURLException on {} or {}: {}", new Object[]{str, str2, e});
        }
    }

    private void checkConfiguration(Config config) {
        String str = (String) config.get("http.agent.name");
        if (str == null || str.trim().length() == 0) {
            LOG.error("Fetcher: No agents listed in 'http.agent.name' property.");
            throw new IllegalArgumentException("Fetcher: No agents listed in 'http.agent.name' property.");
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._collector = outputCollector;
        Config config = new Config();
        config.putAll(map);
        checkConfiguration(config);
        LOG.info("[Fetcher #{}] : starting at {}", Integer.valueOf(this.taskIndex), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH).format(Long.valueOf(System.currentTimeMillis())));
        this.eventCounter = topologyContext.registerMetric("fetcher_counter", new MultiCountMetric(), 10);
        topologyContext.registerMetric("activethreads", new IMetric() { // from class: com.digitalpebble.storm.crawler.bolt.FetcherBolt.1
            public Object getValueAndReset() {
                return Integer.valueOf(FetcherBolt.this.activeThreads.get());
            }
        }, 10);
        topologyContext.registerMetric("in_queues", new IMetric() { // from class: com.digitalpebble.storm.crawler.bolt.FetcherBolt.2
            public Object getValueAndReset() {
                return Integer.valueOf(FetcherBolt.this.fetchQueues.inQueues.get());
            }
        }, 10);
        topologyContext.registerMetric("num_queues", new IMetric() { // from class: com.digitalpebble.storm.crawler.bolt.FetcherBolt.3
            public Object getValueAndReset() {
                return Integer.valueOf(FetcherBolt.this.fetchQueues.queues.size());
            }
        }, 10);
        this.averagedMetrics = topologyContext.registerMetric("fetcher_average_perdoc", new MultiReducedMetric(new MeanReducer()), 10);
        this.perSecMetrics = topologyContext.registerMetric("fetcher_average_persec", new MultiReducedMetric(new PerSecondReducer()), 10);
        this.protocolFactory = new ProtocolFactory(config);
        this.fetchQueues = new FetchItemQueues(config);
        this.taskIndex = topologyContext.getThisTaskIndex();
        int i = ConfUtils.getInt(config, "fetcher.threads.number", 10);
        for (int i2 = 0; i2 < i; i2++) {
            new FetcherThread(config).start();
        }
        String string = ConfUtils.getString(config, "urlfilters.config.file", "urlfilters.json");
        if (string != null) {
            try {
                this.urlFilters = new URLFilters(config, string);
            } catch (IOException e) {
                LOG.error("Exception caught while loading the URLFilters");
                throw new RuntimeException("Exception caught while loading the URLFilters", e);
            }
        }
        this.allowRedirs = ConfUtils.getBoolean(map, Constants.AllowRedirParamName, true);
        this.metadataTransfer = MetadataTransfer.getInstance(map);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"url", "content", "metadata"}));
        outputFieldsDeclarer.declareStream(Constants.StatusStreamName, new Fields(new String[]{"url", "metadata", Constants.StatusStreamName}));
    }

    private boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals("__system") && tuple.getSourceStreamId().equals("__tick");
    }

    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.put("topology.tick.tuple.freq.secs", 1);
        return config;
    }

    private void flushQueues() {
        int size;
        int size2;
        synchronized (this.emitQueue) {
            for (Object[] objArr : this.emitQueue) {
                String str = (String) objArr[0];
                Tuple tuple = (Tuple) objArr[1];
                Values values = (Values) objArr[2];
                if (tuple == null) {
                    this._collector.emit(str, values);
                } else {
                    this._collector.emit(str, Arrays.asList(tuple), values);
                }
            }
            size = this.emitQueue.size();
            this.emitQueue.clear();
        }
        synchronized (this.ackQueue) {
            Iterator<Tuple> it = this.ackQueue.iterator();
            while (it.hasNext()) {
                this._collector.ack(it.next());
            }
            size2 = this.ackQueue.size();
            this.ackQueue.clear();
        }
        if (size2 + size > 0) {
            LOG.info("[Fetcher #{}] Acked : {}\tEmitted : {}", new Object[]{Integer.valueOf(this.taskIndex), Integer.valueOf(size2), Integer.valueOf(size)});
        }
    }

    public void execute(Tuple tuple) {
        flushQueues();
        LOG.info("[Fetcher #{}] Threads : {}\tqueues : {}\tin_queues : {}", new Object[]{Integer.valueOf(this.taskIndex), Integer.valueOf(this.activeThreads.get()), Integer.valueOf(this.fetchQueues.queues.size()), Integer.valueOf(this.fetchQueues.inQueues.get())});
        if (isTickTuple(tuple)) {
            this._collector.ack(tuple);
            return;
        }
        String stringByField = tuple.getStringByField("url");
        if (StringUtils.isBlank(stringByField)) {
            LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", Integer.valueOf(this.taskIndex), tuple);
            this._collector.ack(tuple);
            return;
        }
        try {
            this.fetchQueues.addFetchItem(new URL(stringByField), tuple);
        } catch (MalformedURLException e) {
            LOG.error("{} is a malformed URL", stringByField);
            Metadata metadata = (Metadata) tuple.getValueByField("metadata");
            if (metadata == null) {
                metadata = new Metadata();
            }
            metadata.setValue("error.cause", "malformed URL");
            this._collector.emit(Constants.StatusStreamName, tuple, new Values(new Object[]{stringByField, metadata, Status.ERROR}));
            this._collector.ack(tuple);
        }
    }
}
