package com.digitalpebble.stormcrawler.persistence;

import com.digitalpebble.stormcrawler.persistence.urlbuffer.URLBuffer;
import com.digitalpebble.stormcrawler.util.CollectionMetric;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/persistence/AbstractQueryingSpout.class */
public abstract class AbstractQueryingSpout extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractQueryingSpout.class);
    protected static final String StatusTTLPurgatory = "spout.ttl.purgatory";
    protected static final String StatusMinDelayParamName = "spout.min.delay.queries";
    protected static final String StatusMaxDelayParamName = "spout.max.delay.queries";
    protected static final String resetFetchDateParamName = "spout.reset.fetchdate.after";
    protected Instant lastTimeResetToNOW;
    protected MultiCountMetric eventCounter;
    protected URLBuffer buffer;
    protected SpoutOutputCollector _collector;
    protected CollectionMetric queryTimes;
    protected InProcessMap<String, Object> beingProcessed;
    private boolean active;
    protected long minDelayBetweenQueries = 2000;
    protected long maxDelayBetweenQueries = 20000;
    protected int resetFetchDateAfterNSecs = 120;
    private long timeLastQuerySent = 0;
    private long timeLastQueryReceived = 0;
    private long timestampEmptyBuffer = -1;
    protected AtomicBoolean isInQuery = new AtomicBoolean(false);

    /* loaded from: input_file:com/digitalpebble/stormcrawler/persistence/AbstractQueryingSpout$InProcessMap.class */
    public class InProcessMap<K, V> extends HashMap<K, V> {
        private final Cache<K, Optional<V>> deletionCache;

        public InProcessMap(long j, TimeUnit timeUnit) {
            this.deletionCache = Caffeine.newBuilder().expireAfterWrite(j, timeUnit).build();
        }

        @Override // java.util.HashMap, java.util.AbstractMap, java.util.Map
        public boolean containsKey(Object obj) {
            boolean containsKey = super.containsKey(obj);
            if (!containsKey) {
                containsKey = this.deletionCache.getIfPresent(obj) != null;
            }
            return containsKey;
        }

        @Override // java.util.HashMap, java.util.AbstractMap, java.util.Map
        public V remove(Object obj) {
            this.deletionCache.put(obj, Optional.empty());
            return (V) super.remove(obj);
        }

        public long inCache() {
            return this.deletionCache.estimatedSize();
        }
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        int i = ConfUtils.getInt(map, StatusTTLPurgatory, 30);
        this.minDelayBetweenQueries = ConfUtils.getLong(map, StatusMinDelayParamName, 2000L);
        this.maxDelayBetweenQueries = ConfUtils.getLong(map, StatusMaxDelayParamName, this.maxDelayBetweenQueries);
        this.beingProcessed = new InProcessMap<>(i, TimeUnit.SECONDS);
        this.eventCounter = topologyContext.registerMetric("counters", new MultiCountMetric(), 10);
        this.buffer = URLBuffer.createInstance(map);
        topologyContext.registerMetric("buffer_size", () -> {
            return Integer.valueOf(this.buffer.size());
        }, 10);
        topologyContext.registerMetric("numQueues", () -> {
            return Integer.valueOf(this.buffer.numQueues());
        }, 10);
        topologyContext.registerMetric("beingProcessed", () -> {
            return Integer.valueOf(this.beingProcessed.size());
        }, 10);
        topologyContext.registerMetric("inPurgatory", () -> {
            return Long.valueOf(this.beingProcessed.inCache());
        }, 10);
        this.queryTimes = new CollectionMetric();
        topologyContext.registerMetric("spout_query_time_msec", this.queryTimes, 10);
        this.resetFetchDateAfterNSecs = ConfUtils.getInt(map, resetFetchDateParamName, this.resetFetchDateAfterNSecs);
        this._collector = spoutOutputCollector;
    }

    protected abstract void populateBuffer();

    public void nextTuple() {
        if (this.active) {
            if (!this.isInQuery.get() && triggerQueries()) {
                populateBuffer();
                this.timeLastQuerySent = System.currentTimeMillis();
            }
            if (this.buffer.hasNext()) {
                if (this.timestampEmptyBuffer != -1) {
                    this.eventCounter.scope("empty.buffer").incrBy(System.currentTimeMillis() - this.timestampEmptyBuffer);
                    this.timestampEmptyBuffer = -1L;
                }
                Values next = this.buffer.next();
                String obj = next.get(0).toString();
                this._collector.emit(next, obj);
                this.beingProcessed.put(obj, null);
                this.eventCounter.scope("emitted").incrBy(1L);
                return;
            }
            if (this.timestampEmptyBuffer == -1) {
                this.timestampEmptyBuffer = System.currentTimeMillis();
            }
            if (this.isInQuery.get() || throttleQueries() > 0) {
                LOG.trace("isInQuery {}", this.isInQuery);
                Utils.sleep(10L);
            } else {
                populateBuffer();
                this.timeLastQuerySent = System.currentTimeMillis();
            }
        }
    }

    private long throttleQueries() {
        if (this.timeLastQuerySent == 0) {
            return -1L;
        }
        long currentTimeMillis = System.currentTimeMillis() - this.timeLastQuerySent;
        if (currentTimeMillis < this.minDelayBetweenQueries) {
            return this.minDelayBetweenQueries - currentTimeMillis;
        }
        return -1L;
    }

    private boolean triggerQueries() {
        return this.timeLastQueryReceived != 0 && this.maxDelayBetweenQueries > 0 && System.currentTimeMillis() - this.timeLastQueryReceived > this.maxDelayBetweenQueries;
    }

    protected long getTimeLastQuerySent() {
        return this.timeLastQuerySent;
    }

    protected void markQueryReceivedNow() {
        this.isInQuery.set(false);
        LOG.trace("{} isInquery set to false");
        this.timeLastQueryReceived = System.currentTimeMillis();
    }

    public void activate() {
        this.active = true;
    }

    public void deactivate() {
        this.active = false;
    }

    public void ack(Object obj) {
        this.beingProcessed.remove(obj);
        this.eventCounter.scope("acked").incrBy(1L);
        this.buffer.acked(obj.toString());
    }

    public void fail(Object obj) {
        this.beingProcessed.remove(obj);
        this.eventCounter.scope("failed").incrBy(1L);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"url", "metadata"}));
    }
}
