package com.digitalpebble.stormcrawler.persistence;

import com.digitalpebble.stormcrawler.Constants;
import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.digitalpebble.stormcrawler.util.MetadataTransfer;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang.time.DateUtils;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/persistence/AbstractStatusUpdaterBolt.class */
public abstract class AbstractStatusUpdaterBolt extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractStatusUpdaterBolt.class);
    public static String useCacheParamName = "status.updater.use.cache";
    public static String maxFetchErrorsParamName = "max.fetch.errors";
    public static String cacheConfigParamName = "status.updater.cache.spec";
    public static String roundDateParamName = "status.updater.unit.round.date";
    public static final String AS_IS_NEXTFETCHDATE_METADATA = "status.store.as.is.with.nextfetchdate";
    protected OutputCollector _collector;
    private Scheduler scheduler;
    private MetadataTransfer mdTransfer;
    private Cache<Object, Object> cache;
    private boolean useCache = true;
    private int maxFetchErrors = 3;
    private long cacheHits = 0;
    private long cacheMisses = 0;
    private int roundDateUnit = 13;

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._collector = outputCollector;
        this.scheduler = Scheduler.getInstance(map);
        this.mdTransfer = MetadataTransfer.getInstance(map);
        this.useCache = ConfUtils.getBoolean(map, useCacheParamName, true);
        if (this.useCache) {
            this.cache = Caffeine.from(ConfUtils.getString(map, cacheConfigParamName)).build();
            topologyContext.registerMetric("cache", new IMetric() { // from class: com.digitalpebble.stormcrawler.persistence.AbstractStatusUpdaterBolt.1
                public Object getValueAndReset() {
                    HashMap hashMap = new HashMap();
                    hashMap.put("hits", Long.valueOf(AbstractStatusUpdaterBolt.this.cacheHits));
                    hashMap.put("misses", Long.valueOf(AbstractStatusUpdaterBolt.this.cacheMisses));
                    hashMap.put("size", Long.valueOf(AbstractStatusUpdaterBolt.this.cache.estimatedSize()));
                    AbstractStatusUpdaterBolt.this.cacheHits = 0L;
                    AbstractStatusUpdaterBolt.this.cacheMisses = 0L;
                    return hashMap;
                }
            }, 30);
        }
        this.maxFetchErrors = ConfUtils.getInt(map, maxFetchErrorsParamName, 3);
        String string = ConfUtils.getString(map, roundDateParamName, "SECOND");
        if (string.equalsIgnoreCase("MINUTE")) {
            this.roundDateUnit = 12;
        } else if (string.equalsIgnoreCase("HOUR")) {
            this.roundDateUnit = 10;
        }
    }

    public void execute(Tuple tuple) {
        String stringByField = tuple.getStringByField("url");
        Status status = (Status) tuple.getValueByField(Constants.StatusStreamName);
        if (status.equals(Status.DISCOVERED) && this.useCache) {
            if (this.cache.getIfPresent(stringByField) != null) {
                LOG.debug("URL {} already in cache", stringByField);
                this.cacheHits++;
                this._collector.ack(tuple);
                return;
            }
            LOG.debug("URL {} not in cache", stringByField);
            this.cacheMisses++;
        }
        Metadata metadata = (Metadata) tuple.getValueByField("metadata");
        String firstValue = metadata.getFirstValue(AS_IS_NEXTFETCHDATE_METADATA);
        if (firstValue != null) {
            try {
                store(stringByField, status, this.mdTransfer.filter(metadata), Optional.of(Date.from(Instant.parse(firstValue))), tuple);
                return;
            } catch (Exception e) {
                LOG.error("Exception caught when storing", e);
                this._collector.fail(tuple);
                return;
            }
        }
        String instant = Instant.now().toString();
        if (status.equals(Status.DISCOVERED)) {
            metadata.setValue("discoveryDate", instant);
        } else {
            metadata.setValue("lastProcessedDate", instant);
        }
        if (status.equals(Status.FETCH_ERROR)) {
            int i = 0;
            try {
                i = Integer.parseInt(metadata.getFirstValue(Constants.fetchErrorCountParamName));
            } catch (NumberFormatException e2) {
            }
            int i2 = i + 1;
            if (i2 >= this.maxFetchErrors) {
                status = Status.ERROR;
                metadata.setValue(Constants.STATUS_ERROR_CAUSE, "maxFetchErrors");
            } else {
                metadata.setValue(Constants.fetchErrorCountParamName, Integer.toString(i2));
            }
        }
        if (!status.equals(Status.FETCH_ERROR)) {
            metadata.remove(Constants.fetchErrorCountParamName);
        }
        if (status.equals(Status.FETCHED) || status.equals(Status.REDIRECTION)) {
            metadata.remove(Constants.STATUS_ERROR_CAUSE);
            metadata.remove(Constants.STATUS_ERROR_MESSAGE);
            metadata.remove(Constants.STATUS_ERROR_SOURCE);
        } else if (status == Status.ERROR) {
            this._collector.emit(Constants.DELETION_STREAM_NAME, new Values(new Object[]{stringByField, metadata}));
        }
        Optional<Date> schedule = this.scheduler.schedule(status, metadata);
        Metadata filter = this.mdTransfer.filter(metadata);
        if (schedule.isPresent()) {
            schedule = Optional.of(DateUtils.round(schedule.get(), this.roundDateUnit));
        }
        try {
            store(stringByField, status, filter, schedule, tuple);
        } catch (Exception e3) {
            LOG.error("Exception caught when storing", e3);
            this._collector.fail(tuple);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void ack(Tuple tuple, String str) {
        if (this.useCache) {
            this.cache.put(str, "");
        }
        this._collector.ack(tuple);
    }

    protected abstract void store(String str, Status status, Metadata metadata, Optional<Date> optional, Tuple tuple) throws Exception;

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(Constants.DELETION_STREAM_NAME, new Fields(new String[]{"url", "metadata"}));
    }
}
