package com.digitalpebble.stormcrawler.bolt;

import com.digitalpebble.stormcrawler.Constants;
import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.protocol.HttpHeaders;
import com.digitalpebble.stormcrawler.protocol.Protocol;
import com.digitalpebble.stormcrawler.protocol.ProtocolFactory;
import com.digitalpebble.stormcrawler.protocol.ProtocolResponse;
import com.digitalpebble.stormcrawler.protocol.RobotRules;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.PerSecondReducer;
import crawlercommons.domains.PaidLevelDomain;
import crawlercommons.robots.BaseRobotRules;
import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/FetcherBolt.class */
public class FetcherBolt extends StatusEmitterBolt {
    private static final Logger LOG = LoggerFactory.getLogger(FetcherBolt.class);
    private static final String SITEMAP_DISCOVERY_PARAM_KEY = "sitemap.discovery";
    public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue";
    private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";
    private FetchItemQueues fetchQueues;
    private MultiCountMetric eventCounter;
    private MultiReducedMetric averagedMetrics;
    private ProtocolFactory protocolFactory;
    private MultiReducedMetric perSecMetrics;
    private File debugfiletrigger;
    private String[] beingFetched;
    private final AtomicInteger activeThreads = new AtomicInteger(0);
    private final AtomicInteger spinWaiting = new AtomicInteger(0);
    private int taskID = -1;
    boolean sitemapsAutoDiscovery = false;
    private int maxNumberURLsInQueues = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/FetcherBolt$FetchItem.class */
    public static class FetchItem {
        String queueID;
        String url;
        Tuple t;
        long creationTime = System.currentTimeMillis();

        private FetchItem(String str, Tuple tuple, String str2) {
            this.url = str;
            this.queueID = str2;
            this.t = tuple;
        }

        public static FetchItem create(URL url, String str, Tuple tuple, String str2) {
            String host;
            String str3 = null;
            if (tuple.contains("key")) {
                str3 = tuple.getStringByField("key");
            }
            if (StringUtils.isNotBlank(str3)) {
                return new FetchItem(str, tuple, str3.toLowerCase(Locale.ROOT));
            }
            if ("byIP".equalsIgnoreCase(str2)) {
                try {
                    host = InetAddress.getByName(url.getHost()).getHostAddress();
                } catch (UnknownHostException e) {
                    FetcherBolt.LOG.warn("Unable to resolve IP for {}, using hostname as key.", url.getHost());
                    host = url.getHost();
                }
            } else if ("byDomain".equalsIgnoreCase(str2)) {
                host = PaidLevelDomain.getPLD(url.getHost());
                if (host == null) {
                    FetcherBolt.LOG.warn("Unknown domain for url: {}, using hostname as key", str);
                    host = url.getHost();
                }
            } else {
                host = url.getHost();
            }
            if (host == null) {
                FetcherBolt.LOG.warn("Unknown host for url: {}, using URL string as key", str);
                host = url.toExternalForm();
            }
            return new FetchItem(str, tuple, host.toLowerCase(Locale.ROOT));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/FetcherBolt$FetchItemQueue.class */
    public static class FetchItemQueue {
        final BlockingDeque<FetchItem> queue;
        private final AtomicInteger inProgress = new AtomicInteger();
        private final AtomicLong nextFetchTime = new AtomicLong();
        private long minCrawlDelay;
        private final int maxThreads;
        long crawlDelay;

        public FetchItemQueue(int i, long j, long j2, int i2) {
            this.maxThreads = i;
            this.crawlDelay = j;
            this.minCrawlDelay = j2;
            this.queue = new LinkedBlockingDeque(i2);
            setNextFetchTime(System.currentTimeMillis(), true);
        }

        public int getQueueSize() {
            return this.queue.size();
        }

        public int getInProgressSize() {
            return this.inProgress.get();
        }

        public void finishFetchItem(FetchItem fetchItem, boolean z) {
            if (fetchItem != null) {
                this.inProgress.decrementAndGet();
                setNextFetchTime(System.currentTimeMillis(), z);
            }
        }

        public boolean addFetchItem(FetchItem fetchItem) {
            return this.queue.offer(fetchItem);
        }

        public FetchItem getFetchItem() {
            if (this.inProgress.get() >= this.maxThreads || this.nextFetchTime.get() > System.currentTimeMillis()) {
                return null;
            }
            FetchItem pollFirst = this.queue.pollFirst();
            if (pollFirst != null) {
                this.inProgress.incrementAndGet();
            }
            return pollFirst;
        }

        private void setNextFetchTime(long j, boolean z) {
            if (z) {
                this.nextFetchTime.set(j);
            } else {
                this.nextFetchTime.set(j + (this.maxThreads > 1 ? this.minCrawlDelay : this.crawlDelay));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/FetcherBolt$FetchItemQueues.class */
    public static class FetchItemQueues {
        final int defaultMaxThread;
        final long crawlDelay;
        final long minCrawlDelay;
        int maxQueueSize;
        final Config conf;
        public static final String QUEUE_MODE_HOST = "byHost";
        public static final String QUEUE_MODE_DOMAIN = "byDomain";
        public static final String QUEUE_MODE_IP = "byIP";
        String queueMode;
        final Map<String, FetchItemQueue> queues = Collections.synchronizedMap(new LinkedHashMap());
        AtomicInteger inQueues = new AtomicInteger(0);
        final Map<Pattern, Integer> customMaxThreads = new HashMap();

        public FetchItemQueues(Config config) {
            this.conf = config;
            this.defaultMaxThread = ConfUtils.getInt(config, "fetcher.threads.per.queue", 1);
            this.queueMode = ConfUtils.getString(config, "fetcher.queue.mode", "byHost");
            if (!this.queueMode.equals("byIP") && !this.queueMode.equals("byDomain") && !this.queueMode.equals("byHost")) {
                FetcherBolt.LOG.error("Unknown partition mode : {} - forcing to byHost", this.queueMode);
                this.queueMode = "byHost";
            }
            FetcherBolt.LOG.info("Using queue mode : {}", this.queueMode);
            this.crawlDelay = ConfUtils.getFloat(config, "fetcher.server.delay", 1.0f) * 1000.0f;
            this.minCrawlDelay = ConfUtils.getFloat(config, "fetcher.server.min.delay", 0.0f) * 1000.0f;
            this.maxQueueSize = ConfUtils.getInt(config, "fetcher.max.queue.size", -1);
            if (this.maxQueueSize == -1) {
                this.maxQueueSize = Integer.MAX_VALUE;
            }
            for (Map.Entry entry : config.entrySet()) {
                String str = (String) entry.getKey();
                if (str.startsWith("fetcher.maxThreads.")) {
                    this.customMaxThreads.put(Pattern.compile(str.substring("fetcher.maxThreads.".length())), Integer.valueOf(((Number) entry.getValue()).intValue()));
                }
            }
        }

        public synchronized boolean addFetchItem(URL url, String str, Tuple tuple) {
            FetchItem create = FetchItem.create(url, str, tuple, this.queueMode);
            boolean addFetchItem = getFetchItemQueue(create.queueID, (Metadata) tuple.getValueByField("metadata")).addFetchItem(create);
            if (addFetchItem) {
                this.inQueues.incrementAndGet();
            }
            FetcherBolt.LOG.debug("{} added to queue {}", str, create.queueID);
            return addFetchItem;
        }

        public synchronized void finishFetchItem(FetchItem fetchItem, boolean z) {
            FetchItemQueue fetchItemQueue = this.queues.get(fetchItem.queueID);
            if (fetchItemQueue == null) {
                FetcherBolt.LOG.warn("Attempting to finish item from unknown queue: {}", fetchItem.queueID);
            } else {
                fetchItemQueue.finishFetchItem(fetchItem, z);
            }
        }

        public synchronized FetchItemQueue getFetchItemQueue(String str, Metadata metadata) {
            FetchItemQueue fetchItemQueue = this.queues.get(str);
            long parseLong = (metadata == null || metadata.getFirstValue(FetcherBolt.CRAWL_DELAY_KEY_NAME) == null) ? this.minCrawlDelay : Long.parseLong(metadata.getFirstValue(FetcherBolt.CRAWL_DELAY_KEY_NAME));
            if (fetchItemQueue == null) {
                int i = this.defaultMaxThread;
                Iterator<Map.Entry<Pattern, Integer>> it = this.customMaxThreads.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry<Pattern, Integer> next = it.next();
                    if (next.getKey().matcher(str).matches()) {
                        i = next.getValue().intValue();
                        break;
                    }
                }
                fetchItemQueue = new FetchItemQueue(i, this.crawlDelay, parseLong, this.maxQueueSize);
                this.queues.put(str, fetchItemQueue);
            }
            if (fetchItemQueue.minCrawlDelay < parseLong) {
                fetchItemQueue.minCrawlDelay = parseLong;
            }
            return fetchItemQueue;
        }

        public synchronized FetchItem getFetchItem() {
            Map.Entry<String, FetchItemQueue> next;
            if (this.queues.isEmpty()) {
                return null;
            }
            FetchItemQueue fetchItemQueue = null;
            do {
                Iterator<Map.Entry<String, FetchItemQueue>> it = this.queues.entrySet().iterator();
                if (!it.hasNext() || (next = it.next()) == null) {
                    return null;
                }
                FetchItemQueue value = next.getValue();
                it.remove();
                if (value.getQueueSize() != 0 || value.getInProgressSize() != 0) {
                    this.queues.put(next.getKey(), next.getValue());
                    if (fetchItemQueue == null) {
                        fetchItemQueue = value;
                    } else if (value == fetchItemQueue) {
                        return null;
                    }
                    FetchItem fetchItem = value.getFetchItem();
                    if (fetchItem != null) {
                        this.inQueues.decrementAndGet();
                        return fetchItem;
                    }
                }
            } while (!this.queues.isEmpty());
            return null;
        }
    }

    /* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/FetcherBolt$FetcherThread.class */
    private class FetcherThread extends Thread {
        private final long maxCrawlDelay;
        private final boolean maxCrawlDelayForce;
        private final boolean crawlDelayForce;
        private final int threadNum;
        private long timeoutInQueues;
        private String protocolMDprefix;

        public FetcherThread(Config config, int i) {
            this.timeoutInQueues = -1L;
            this.protocolMDprefix = "";
            setDaemon(true);
            setName("FetcherThread #" + i);
            this.maxCrawlDelay = ConfUtils.getInt(config, "fetcher.max.crawl.delay", 30) * 1000;
            this.maxCrawlDelayForce = ConfUtils.getBoolean(config, "fetcher.max.crawl.delay.force", false);
            this.crawlDelayForce = ConfUtils.getBoolean(config, "fetcher.server.delay.force", false);
            this.threadNum = i;
            this.timeoutInQueues = ConfUtils.getLong(config, FetcherBolt.QUEUED_TIMEOUT_PARAM_KEY, this.timeoutInQueues);
            this.protocolMDprefix = ConfUtils.getString(config, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, this.protocolMDprefix);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            String str;
            while (true) {
                FetchItem fetchItem = FetcherBolt.this.fetchQueues.getFetchItem();
                if (fetchItem == null) {
                    FetcherBolt.LOG.trace("{} spin-waiting ...", getName());
                    FetcherBolt.this.spinWaiting.incrementAndGet();
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        FetcherBolt.LOG.error("{} caught interrupted exception", getName());
                        Thread.currentThread().interrupt();
                    }
                    FetcherBolt.this.spinWaiting.decrementAndGet();
                } else {
                    FetcherBolt.this.activeThreads.incrementAndGet();
                    FetcherBolt.this.beingFetched[this.threadNum] = fetchItem.url;
                    FetcherBolt.LOG.debug("[Fetcher #{}] {}  => activeThreads={}, spinWaiting={}, queueID={}", new Object[]{Integer.valueOf(FetcherBolt.this.taskID), getName(), FetcherBolt.this.activeThreads, FetcherBolt.this.spinWaiting, fetchItem.queueID});
                    FetcherBolt.LOG.debug("[Fetcher #{}] {} : Fetching {}", new Object[]{Integer.valueOf(FetcherBolt.this.taskID), getName(), fetchItem.url});
                    Metadata metadata = fetchItem.t.contains("metadata") ? (Metadata) fetchItem.t.getValueByField("metadata") : null;
                    if (metadata == null) {
                        metadata = new Metadata();
                    }
                    metadata.remove("fetch.exception");
                    try {
                        try {
                            URL url = new URL(fetchItem.url);
                            Protocol protocol = FetcherBolt.this.protocolFactory.getProtocol(url);
                            if (protocol == null) {
                                throw new RuntimeException("No protocol implementation found for " + fetchItem.url);
                                break;
                            }
                            BaseRobotRules robotRules = protocol.getRobotRules(fetchItem.url);
                            boolean z = false;
                            if ((robotRules instanceof RobotRules) && ((RobotRules) robotRules).getContentLengthFetched().length == 0) {
                                z = true;
                                FetcherBolt.this.eventCounter.scope("robots.fromCache").incrBy(1L);
                            } else {
                                FetcherBolt.this.eventCounter.scope("robots.fetched").incrBy(1L);
                            }
                            String firstValue = metadata.getFirstValue(FetcherBolt.SITEMAP_DISCOVERY_PARAM_KEY);
                            boolean z2 = "true".equalsIgnoreCase(firstValue) ? true : "false".equalsIgnoreCase(firstValue) ? false : FetcherBolt.this.sitemapsAutoDiscovery;
                            if (!z && z2) {
                                for (String str2 : robotRules.getSitemaps()) {
                                    if (robotRules.isAllowed(str2)) {
                                        FetcherBolt.this.emitOutlink(fetchItem.t, url, str2, metadata, SiteMapParserBolt.isSitemapKey, "true");
                                    }
                                }
                            }
                            metadata.setValue(SiteMapParserBolt.foundSitemapKey, Boolean.toString(robotRules.getSitemaps().size() > 0));
                            if (robotRules.isAllowed(fetchItem.url)) {
                                FetchItemQueue fetchItemQueue = FetcherBolt.this.fetchQueues.getFetchItemQueue(fetchItem.queueID, metadata);
                                if (robotRules.getCrawlDelay() > 0 && robotRules.getCrawlDelay() != fetchItemQueue.crawlDelay) {
                                    if (robotRules.getCrawlDelay() > this.maxCrawlDelay && this.maxCrawlDelay >= 0) {
                                        boolean z3 = false;
                                        Object obj = "skipping";
                                        if (this.maxCrawlDelayForce) {
                                            z3 = true;
                                            obj = "using value of fetcher.max.crawl.delay instead";
                                        }
                                        FetcherBolt.LOG.info("Crawl-Delay for {} too long ({}), {}", new Object[]{fetchItem.url, Long.valueOf(robotRules.getCrawlDelay()), obj});
                                        if (z3) {
                                            fetchItemQueue.crawlDelay = this.maxCrawlDelay;
                                        } else {
                                            metadata.setValue(Constants.STATUS_ERROR_CAUSE, "crawl_delay");
                                            FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.ERROR}));
                                            FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                                            FetcherBolt.this.activeThreads.decrementAndGet();
                                            FetcherBolt.this.collector.ack(fetchItem.t);
                                            FetcherBolt.this.beingFetched[this.threadNum] = "";
                                        }
                                    } else if (robotRules.getCrawlDelay() >= FetcherBolt.this.fetchQueues.crawlDelay || !this.crawlDelayForce) {
                                        fetchItemQueue.crawlDelay = robotRules.getCrawlDelay();
                                        FetcherBolt.LOG.info("Crawl delay for queue: {}  is set to {} as per robots.txt. url: {}", new Object[]{fetchItem.queueID, Long.valueOf(fetchItemQueue.crawlDelay), fetchItem.url});
                                    } else {
                                        fetchItemQueue.crawlDelay = FetcherBolt.this.fetchQueues.crawlDelay;
                                        FetcherBolt.LOG.info("Crawl delay for {} too short ({}), set to fetcher.server.delay", fetchItem.url, Long.valueOf(robotRules.getCrawlDelay()));
                                    }
                                }
                                long currentTimeMillis = System.currentTimeMillis();
                                long j = currentTimeMillis - fetchItem.creationTime;
                                if (this.timeoutInQueues == -1 || j <= this.timeoutInQueues * 1000) {
                                    ProtocolResponse protocolOutput = protocol.getProtocolOutput(fetchItem.url, metadata);
                                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                    int length = protocolOutput.getContent().length;
                                    protocolOutput.getMetadata().keySet().stream().filter(str3 -> {
                                        return str3.startsWith("metrics.");
                                    }).forEach(str4 -> {
                                        FetcherBolt.this.averagedMetrics.scope(str4.substring(8)).update(Long.valueOf(Long.parseLong(protocolOutput.getMetadata().getFirstValue(str4))));
                                    });
                                    FetcherBolt.this.averagedMetrics.scope("fetch_time").update(Long.valueOf(currentTimeMillis2));
                                    FetcherBolt.this.averagedMetrics.scope("time_in_queues").update(Long.valueOf(j));
                                    FetcherBolt.this.averagedMetrics.scope("bytes_fetched").update(Integer.valueOf(length));
                                    FetcherBolt.this.perSecMetrics.scope("bytes_fetched_perSec").update(Integer.valueOf(length));
                                    FetcherBolt.this.perSecMetrics.scope("fetched_perSec").update(1);
                                    FetcherBolt.this.eventCounter.scope("fetched").incrBy(1L);
                                    FetcherBolt.this.eventCounter.scope("bytes_fetched").incrBy(length);
                                    FetcherBolt.LOG.info("[Fetcher #{}] Fetched {} with status {} in msec {}", new Object[]{Integer.valueOf(FetcherBolt.this.taskID), fetchItem.url, Integer.valueOf(protocolOutput.getStatusCode()), Long.valueOf(currentTimeMillis2)});
                                    Metadata metadata2 = new Metadata();
                                    metadata2.putAll(metadata);
                                    metadata2.putAll(protocolOutput.getMetadata(), this.protocolMDprefix);
                                    metadata2.setValue("fetch.statusCode", Integer.toString(protocolOutput.getStatusCode()));
                                    metadata2.setValue("fetch.byteLength", Integer.toString(length));
                                    metadata2.setValue("fetch.loadingTime", Long.toString(currentTimeMillis2));
                                    metadata2.setValue("fetch.timeInQueues", Long.toString(j));
                                    Status fromHTTPCode = Status.fromHTTPCode(protocolOutput.getStatusCode());
                                    FetcherBolt.this.eventCounter.scope("status_" + protocolOutput.getStatusCode()).incrBy(1L);
                                    Values values = new Values(new Object[]{fetchItem.url, metadata2, fromHTTPCode});
                                    if (fromHTTPCode.equals(Status.FETCHED)) {
                                        if (protocolOutput.getStatusCode() == 304) {
                                            FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, values);
                                        } else {
                                            FetcherBolt.this.collector.emit("default", fetchItem.t, new Values(new Object[]{fetchItem.url, protocolOutput.getContent(), metadata2}));
                                        }
                                    } else if (fromHTTPCode.equals(Status.REDIRECTION)) {
                                        String firstValue2 = protocolOutput.getMetadata().getFirstValue(HttpHeaders.LOCATION);
                                        if (StringUtils.isNotBlank(firstValue2)) {
                                            metadata2.setValue("_redirTo", firstValue2);
                                        }
                                        if (FetcherBolt.this.allowRedirs() && StringUtils.isNotBlank(firstValue2)) {
                                            FetcherBolt.this.emitOutlink(fetchItem.t, url, firstValue2, metadata2, new String[0]);
                                        }
                                        FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, values);
                                    } else {
                                        FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, values);
                                    }
                                    FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, false);
                                    FetcherBolt.this.activeThreads.decrementAndGet();
                                    FetcherBolt.this.collector.ack(fetchItem.t);
                                    FetcherBolt.this.beingFetched[this.threadNum] = "";
                                } else {
                                    FetcherBolt.LOG.info("[Fetcher #{}] Waited in queue for too long - {}", Integer.valueOf(FetcherBolt.this.taskID), fetchItem.url);
                                    FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                                    FetcherBolt.this.activeThreads.decrementAndGet();
                                    FetcherBolt.this.collector.ack(fetchItem.t);
                                    FetcherBolt.this.beingFetched[this.threadNum] = "";
                                }
                            } else {
                                FetcherBolt.LOG.info("Denied by robots.txt: {}", fetchItem.url);
                                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");
                                FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.ERROR}));
                                FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, true);
                                FetcherBolt.this.activeThreads.decrementAndGet();
                                FetcherBolt.this.collector.ack(fetchItem.t);
                                FetcherBolt.this.beingFetched[this.threadNum] = "";
                            }
                        } catch (Exception e2) {
                            String message = e2.getMessage();
                            if (message == null) {
                                message = "";
                            }
                            if ((e2.getCause() instanceof TimeoutException) || message.contains(" timed out")) {
                                FetcherBolt.LOG.info("Socket timeout fetching {}", fetchItem.url);
                                str = "Socket timeout fetching";
                            } else if ((e2.getCause() instanceof UnknownHostException) || (e2 instanceof UnknownHostException)) {
                                FetcherBolt.LOG.info("Unknown host {}", fetchItem.url);
                                str = "Unknown host";
                            } else {
                                str = e2.getClass().getName();
                                if (FetcherBolt.LOG.isDebugEnabled()) {
                                    FetcherBolt.LOG.debug("Exception while fetching {}", fetchItem.url, e2);
                                } else {
                                    FetcherBolt.LOG.info("Exception while fetching {} -> {}", fetchItem.url, str);
                                }
                            }
                            if (metadata.size() == 0) {
                                metadata = new Metadata();
                            }
                            metadata.setValue("fetch.exception", str);
                            FetcherBolt.this.collector.emit(Constants.StatusStreamName, fetchItem.t, new Values(new Object[]{fetchItem.url, metadata, Status.FETCH_ERROR}));
                            FetcherBolt.this.eventCounter.scope("exception").incrBy(1L);
                            FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, false);
                            FetcherBolt.this.activeThreads.decrementAndGet();
                            FetcherBolt.this.collector.ack(fetchItem.t);
                            FetcherBolt.this.beingFetched[this.threadNum] = "";
                        }
                    } catch (Throwable th) {
                        FetcherBolt.this.fetchQueues.finishFetchItem(fetchItem, false);
                        FetcherBolt.this.activeThreads.decrementAndGet();
                        FetcherBolt.this.collector.ack(fetchItem.t);
                        FetcherBolt.this.beingFetched[this.threadNum] = "";
                        throw th;
                    }
                }
            }
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.put("topology.tick.tuple.freq.secs", 5);
        return config;
    }

    private void checkConfiguration(Config config) {
        String str = (String) config.get("http.agent.name");
        if (str == null || str.trim().length() == 0) {
            LOG.error("Fetcher: No agents listed in 'http.agent.name' property.");
            throw new IllegalArgumentException("Fetcher: No agents listed in 'http.agent.name' property.");
        }
    }

    @Override // com.digitalpebble.stormcrawler.bolt.StatusEmitterBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        Config config = new Config();
        config.putAll(map);
        checkConfiguration(config);
        LOG.info("[Fetcher #{}] : starting at {}", Integer.valueOf(this.taskID), Instant.now());
        int i = ConfUtils.getInt(config, "fetcher.metrics.time.bucket.secs", 10);
        this.eventCounter = topologyContext.registerMetric("fetcher_counter", new MultiCountMetric(), i);
        topologyContext.registerMetric("activethreads", () -> {
            return Integer.valueOf(this.activeThreads.get());
        }, i);
        topologyContext.registerMetric("in_queues", () -> {
            return Integer.valueOf(this.fetchQueues.inQueues.get());
        }, i);
        topologyContext.registerMetric("num_queues", () -> {
            return Integer.valueOf(this.fetchQueues.queues.size());
        }, i);
        this.averagedMetrics = topologyContext.registerMetric("fetcher_average_perdoc", new MultiReducedMetric(new MeanReducer()), i);
        this.perSecMetrics = topologyContext.registerMetric("fetcher_average_persec", new MultiReducedMetric(new PerSecondReducer()), i);
        this.protocolFactory = ProtocolFactory.getInstance(config);
        this.fetchQueues = new FetchItemQueues(config);
        this.taskID = topologyContext.getThisTaskId();
        int i2 = ConfUtils.getInt(config, "fetcher.threads.number", 10);
        for (int i3 = 0; i3 < i2; i3++) {
            new FetcherThread(config, i3).start();
        }
        this.beingFetched = new String[i2];
        Arrays.fill(this.beingFetched, "");
        this.sitemapsAutoDiscovery = ConfUtils.getBoolean(map, SITEMAP_DISCOVERY_PARAM_KEY, false);
        this.maxNumberURLsInQueues = ConfUtils.getInt(config, "fetcher.max.urls.in.queues", -1);
        String string = ConfUtils.getString(config, "fetcherbolt.queue.debug.filepath");
        if (StringUtils.isNotBlank(string)) {
            this.debugfiletrigger = new File(string.replaceAll("\\{port\\}", Integer.toString(topologyContext.getThisWorkerPort().intValue())));
        }
    }

    @Override // com.digitalpebble.stormcrawler.bolt.StatusEmitterBolt
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        super.declareOutputFields(outputFieldsDeclarer);
        outputFieldsDeclarer.declare(new Fields(new String[]{"url", "content", "metadata"}));
    }

    public void cleanup() {
        this.protocolFactory.cleanup();
    }

    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            if (this.debugfiletrigger == null || !this.debugfiletrigger.exists()) {
                return;
            }
            LOG.info("Found trigger file {}", this.debugfiletrigger);
            logQueuesContent();
            this.debugfiletrigger.delete();
            return;
        }
        if (this.maxNumberURLsInQueues != -1) {
            while (this.activeThreads.get() + this.fetchQueues.inQueues.get() >= this.maxNumberURLsInQueues) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    LOG.error("Interrupted exception caught in execute method");
                    Thread.currentThread().interrupt();
                }
                LOG.debug("[Fetcher #{}] Threads : {}\tqueues : {}\tin_queues : {}", new Object[]{Integer.valueOf(this.taskID), Integer.valueOf(this.activeThreads.get()), Integer.valueOf(this.fetchQueues.queues.size()), Integer.valueOf(this.fetchQueues.inQueues.get())});
            }
        }
        String stringByField = tuple.getStringByField("url");
        if (StringUtils.isBlank(stringByField)) {
            LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", Integer.valueOf(this.taskID), tuple);
            this.collector.ack(tuple);
            return;
        }
        LOG.debug("Received in Fetcher {}", stringByField);
        try {
            if (this.fetchQueues.addFetchItem(new URL(stringByField), stringByField, tuple)) {
                return;
            }
            this.collector.fail(tuple);
        } catch (MalformedURLException e2) {
            LOG.error("{} is a malformed URL", stringByField);
            Metadata metadata = (Metadata) tuple.getValueByField("metadata");
            if (metadata == null) {
                metadata = new Metadata();
            }
            metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
            this.collector.emit(Constants.StatusStreamName, tuple, new Values(new Object[]{stringByField, metadata, Status.ERROR}));
            this.collector.ack(tuple);
        }
    }

    private void logQueuesContent() {
        StringBuilder sb = new StringBuilder();
        synchronized (this.fetchQueues.queues) {
            sb.append("\nNum queues : ").append(this.fetchQueues.queues.size());
            for (Map.Entry<String, FetchItemQueue> entry : this.fetchQueues.queues.entrySet()) {
                sb.append("\nQueue ID : ").append(entry.getKey());
                FetchItemQueue value = entry.getValue();
                sb.append("\t size : ").append(value.getQueueSize());
                sb.append("\t in progress : ").append(value.getInProgressSize());
                Iterator<FetchItem> it = value.queue.iterator();
                while (it.hasNext()) {
                    sb.append("\n\t").append(it.next().url);
                }
            }
            LOG.info("Dumping queue content {}", sb.toString());
            StringBuilder sb2 = new StringBuilder("\n");
            for (int i = 0; i < this.beingFetched.length; i++) {
                if (this.beingFetched[i].length() > 0) {
                    sb2.append("\n\tThread #").append(i).append(": ").append(this.beingFetched[i]);
                }
            }
            LOG.info("URLs being fetched {}", sb2.toString());
        }
    }
}
