package com.digitalpebble.stormcrawler.bolt;

import com.digitalpebble.stormcrawler.Constants;
import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.protocol.HttpHeaders;
import com.digitalpebble.stormcrawler.protocol.Protocol;
import com.digitalpebble.stormcrawler.protocol.ProtocolFactory;
import com.digitalpebble.stormcrawler.protocol.ProtocolResponse;
import com.digitalpebble.stormcrawler.protocol.RobotRules;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.PerSecondReducer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import crawlercommons.domains.PaidLevelDomain;
import crawlercommons.robots.BaseRobotRules;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/bolt/SimpleFetcherBolt.class */
public class SimpleFetcherBolt extends StatusEmitterBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleFetcherBolt.class);
    private static final String SITEMAP_DISCOVERY_PARAM_KEY = "sitemap.discovery";
    public static final String QUEUE_MODE_HOST = "byHost";
    public static final String QUEUE_MODE_DOMAIN = "byDomain";
    public static final String QUEUE_MODE_IP = "byIP";
    private Config conf;
    private MultiCountMetric eventCounter;
    private MultiReducedMetric averagedMetrics;
    private MultiReducedMetric perSecMetrics;
    private ProtocolFactory protocolFactory;
    private String queueMode;
    private int taskID = -1;
    boolean sitemapsAutoDiscovery = false;
    private Cache<String, Long> throttler = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.SECONDS).build();
    private long crawlDelay = 1000;
    private long maxCrawlDelay = 30000;

    private void checkConfiguration() {
        String str = (String) getConf().get("http.agent.name");
        if (str == null || str.trim().length() == 0) {
            LOG.error("Fetcher: No agents listed in 'http.agent.name' property.");
            throw new IllegalArgumentException("Fetcher: No agents listed in 'http.agent.name' property.");
        }
    }

    private Config getConf() {
        return this.conf;
    }

    @Override // com.digitalpebble.stormcrawler.bolt.StatusEmitterBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.conf = new Config();
        this.conf.putAll(map);
        checkConfiguration();
        this.taskID = topologyContext.getThisTaskId();
        LOG.info("[Fetcher #{}] : starting at {}", Integer.valueOf(this.taskID), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH).format(Long.valueOf(System.currentTimeMillis())));
        int i = ConfUtils.getInt(this.conf, "fetcher.metrics.time.bucket.secs", 10);
        this.eventCounter = topologyContext.registerMetric("fetcher_counter", new MultiCountMetric(), i);
        this.averagedMetrics = topologyContext.registerMetric("fetcher_average", new MultiReducedMetric(new MeanReducer()), i);
        this.perSecMetrics = topologyContext.registerMetric("fetcher_average_persec", new MultiReducedMetric(new PerSecondReducer()), i);
        topologyContext.registerMetric("throttler_size", new IMetric() { // from class: com.digitalpebble.stormcrawler.bolt.SimpleFetcherBolt.1
            public Object getValueAndReset() {
                return Long.valueOf(SimpleFetcherBolt.this.throttler.size());
            }
        }, i);
        this.protocolFactory = new ProtocolFactory(this.conf);
        this.sitemapsAutoDiscovery = ConfUtils.getBoolean(map, SITEMAP_DISCOVERY_PARAM_KEY, false);
        this.queueMode = ConfUtils.getString(this.conf, "fetcher.queue.mode", "byHost");
        if (!this.queueMode.equals("byIP") && !this.queueMode.equals("byDomain") && !this.queueMode.equals("byHost")) {
            LOG.error("Unknown partition mode : {} - forcing to byHost", this.queueMode);
            this.queueMode = "byHost";
        }
        LOG.info("Using queue mode : {}", this.queueMode);
        this.crawlDelay = ConfUtils.getFloat(this.conf, "fetcher.server.delay", 1.0f) * 1000.0f;
        this.maxCrawlDelay = ConfUtils.getInt(this.conf, "fetcher.max.crawl.delay", 30) * 1000;
    }

    @Override // com.digitalpebble.stormcrawler.bolt.StatusEmitterBolt
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        super.declareOutputFields(outputFieldsDeclarer);
        outputFieldsDeclarer.declare(new Fields(new String[]{"url", "content", "metadata"}));
    }

    public void cleanup() {
        this.protocolFactory.cleanup();
    }

    public void execute(Tuple tuple) {
        String str;
        Protocol protocol;
        BaseRobotRules robotRules;
        String stringByField = tuple.getStringByField("url");
        if (StringUtils.isBlank(stringByField)) {
            LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", Integer.valueOf(this.taskID), tuple);
            this.collector.ack(tuple);
            return;
        }
        Metadata metadata = null;
        if (tuple.contains("metadata")) {
            metadata = (Metadata) tuple.getValueByField("metadata");
        }
        if (metadata == null) {
            metadata = Metadata.empty;
        }
        try {
            URL url = new URL(stringByField);
            String politenessKey = getPolitenessKey(url);
            long j = 0;
            try {
                protocol = this.protocolFactory.getProtocol(url);
                robotRules = protocol.getRobotRules(stringByField);
                boolean z = false;
                if ((robotRules instanceof RobotRules) && ((RobotRules) robotRules).getContentLengthFetched().length == 0) {
                    z = true;
                    this.eventCounter.scope("robots.fromCache").incrBy(1L);
                } else {
                    this.eventCounter.scope("robots.fetched").incrBy(1L);
                }
                boolean z2 = this.sitemapsAutoDiscovery;
                String firstValue = metadata.getFirstValue(SITEMAP_DISCOVERY_PARAM_KEY);
                if ("true".equalsIgnoreCase(firstValue)) {
                    z2 = true;
                } else if ("false".equalsIgnoreCase(firstValue)) {
                    z2 = false;
                }
                if (!z && z2) {
                    Iterator it = robotRules.getSitemaps().iterator();
                    while (it.hasNext()) {
                        emitOutlink(tuple, url, (String) it.next(), metadata, SiteMapParserBolt.isSitemapKey, "true");
                    }
                }
            } catch (Exception e) {
                String message = e.getMessage();
                if (message == null) {
                    message = "";
                }
                if ((e.getCause() instanceof TimeoutException) || message.contains(" timed out")) {
                    LOG.error("Socket timeout fetching {}", stringByField);
                    str = "Socket timeout fetching";
                } else if ((e.getCause() instanceof UnknownHostException) || (e instanceof UnknownHostException)) {
                    LOG.error("Unknown host {}", stringByField);
                    str = "Unknown host";
                } else {
                    LOG.error("Exception while fetching {}", stringByField, e);
                    str = e.getClass().getName();
                }
                this.eventCounter.scope("exception").incrBy(1L);
                if (metadata.size() == 0) {
                    metadata = new Metadata();
                }
                metadata.setValue("fetch.exception", str);
                this.collector.emit(Constants.StatusStreamName, tuple, new Values(new Object[]{stringByField, metadata, Status.FETCH_ERROR}));
            }
            if (!robotRules.isAllowed(stringByField)) {
                LOG.info("Denied by robots.txt: {}", stringByField);
                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");
                this.collector.emit(Constants.StatusStreamName, tuple, new Values(new Object[]{stringByField, metadata, Status.ERROR}));
                this.collector.ack(tuple);
                return;
            }
            long j2 = 0;
            Long l = (Long) this.throttler.getIfPresent(politenessKey);
            if (l != null) {
                long longValue = l.longValue() - System.currentTimeMillis();
                if (longValue > 0) {
                    j2 = longValue;
                    try {
                        Thread.sleep(longValue);
                    } catch (InterruptedException e2) {
                        LOG.error("[Fetcher #{}] caught InterruptedException caught while waiting");
                        Thread.currentThread().interrupt();
                    }
                }
            }
            j = this.crawlDelay;
            long crawlDelay = robotRules.getCrawlDelay();
            if (crawlDelay > 0) {
                if (crawlDelay > this.maxCrawlDelay) {
                    LOG.debug("Delay from robots capped at {} for {}", Long.valueOf(crawlDelay), url);
                    j = this.maxCrawlDelay;
                } else {
                    j = crawlDelay;
                }
            }
            LOG.debug("[Fetcher #{}] : Fetching {}", Integer.valueOf(this.taskID), stringByField);
            long currentTimeMillis = System.currentTimeMillis();
            ProtocolResponse protocolOutput = protocol.getProtocolOutput(stringByField, metadata);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            int length = protocolOutput.getContent().length;
            this.averagedMetrics.scope("wait_time").update(Long.valueOf(j2));
            this.averagedMetrics.scope("fetch_time").update(Long.valueOf(currentTimeMillis2));
            this.averagedMetrics.scope("bytes_fetched").update(Integer.valueOf(length));
            this.eventCounter.scope("fetched").incrBy(1L);
            this.eventCounter.scope("bytes_fetched").incrBy(length);
            this.perSecMetrics.scope("bytes_fetched_perSec").update(Integer.valueOf(length));
            this.perSecMetrics.scope("fetched_perSec").update(1);
            LOG.info("[Fetcher #{}] Fetched {} with status {} in {} after waiting {}", new Object[]{Integer.valueOf(this.taskID), stringByField, Integer.valueOf(protocolOutput.getStatusCode()), Long.valueOf(currentTimeMillis2), Long.valueOf(j2)});
            protocolOutput.getMetadata().putAll(metadata);
            protocolOutput.getMetadata().setValue("fetch.statusCode", Integer.toString(protocolOutput.getStatusCode()));
            protocolOutput.getMetadata().setValue("fetch.loadingTime", Long.toString(currentTimeMillis2));
            Status fromHTTPCode = Status.fromHTTPCode(protocolOutput.getStatusCode());
            Values values = new Values(new Object[]{stringByField, protocolOutput.getMetadata(), fromHTTPCode});
            if (fromHTTPCode.equals(Status.FETCHED)) {
                if (protocolOutput.getStatusCode() == 304) {
                    this.collector.emit(Constants.StatusStreamName, tuple, values);
                } else {
                    this.collector.emit("default", tuple, new Values(new Object[]{stringByField, protocolOutput.getContent(), protocolOutput.getMetadata()}));
                }
            } else if (fromHTTPCode.equals(Status.REDIRECTION)) {
                String firstValue2 = protocolOutput.getMetadata().getFirstValue(HttpHeaders.LOCATION);
                if (StringUtils.isNotBlank(firstValue2)) {
                    protocolOutput.getMetadata().setValue("_redirTo", firstValue2);
                }
                if (allowRedirs() && StringUtils.isNotBlank(firstValue2)) {
                    emitOutlink(tuple, url, firstValue2, protocolOutput.getMetadata(), new String[0]);
                }
                this.collector.emit(Constants.StatusStreamName, tuple, values);
            } else {
                this.collector.emit(Constants.StatusStreamName, tuple, values);
            }
            this.throttler.put(politenessKey, Long.valueOf(System.currentTimeMillis() + j));
            this.collector.ack(tuple);
        } catch (MalformedURLException e3) {
            LOG.error("{} is a malformed URL", stringByField);
            if (metadata == Metadata.empty) {
                metadata = new Metadata();
            }
            metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
            this.collector.emit(Constants.StatusStreamName, tuple, new Values(new Object[]{stringByField, metadata, Status.ERROR}));
            this.collector.ack(tuple);
        }
    }

    private String getPolitenessKey(URL url) {
        String hostAddress;
        if ("byIP".equalsIgnoreCase(this.queueMode)) {
            try {
                hostAddress = InetAddress.getByName(url.getHost()).getHostAddress();
            } catch (UnknownHostException e) {
                LOG.warn("Unable to resolve: {}, skipping.", url.getHost());
                return null;
            }
        } else if ("byDomain".equalsIgnoreCase(this.queueMode)) {
            hostAddress = PaidLevelDomain.getPLD(url.getHost());
            if (hostAddress == null) {
                LOG.warn("Unknown domain for url: {}, using hostname as key", url.toExternalForm());
                hostAddress = url.getHost();
            }
        } else {
            hostAddress = url.getHost();
            if (hostAddress == null) {
                LOG.warn("Unknown host for url: {}, using URL string as key", url.toExternalForm());
                hostAddress = url.toExternalForm();
            }
        }
        return hostAddress.toLowerCase(Locale.ROOT);
    }
}
