package com.digitalpebble.stormcrawler.persistence.urlbuffer;

import com.digitalpebble.stormcrawler.persistence.urlbuffer.AbstractURLBuffer;
import com.digitalpebble.stormcrawler.util.ConfUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.collect.EvictingQueue;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.storm.tuple.Values;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/digitalpebble/stormcrawler/persistence/urlbuffer/SchedulingURLBuffer.class */
public class SchedulingURLBuffer extends AbstractURLBuffer implements RemovalListener<String, Object[]> {
    static final Logger LOG = LoggerFactory.getLogger(SchedulingURLBuffer.class);
    public static final String MAXTIMEPARAM = "priority.buffer.max.time.msec";
    private int maxTimeMSec = 30000;
    private int historySize = 5;
    private Cache<String, Object[]> unacked;
    private Cache<String, Queue<Long>> timings;
    private Cache<String, Instant> lastReleased;

    @Override // com.digitalpebble.stormcrawler.persistence.urlbuffer.AbstractURLBuffer, com.digitalpebble.stormcrawler.persistence.urlbuffer.URLBuffer
    public void configure(Map<String, Object> map) {
        super.configure(map);
        this.maxTimeMSec = ConfUtils.getInt(map, MAXTIMEPARAM, this.maxTimeMSec);
        this.unacked = Caffeine.newBuilder().expireAfterWrite(this.maxTimeMSec, TimeUnit.MILLISECONDS).removalListener(this).build();
        this.timings = Caffeine.newBuilder().expireAfterAccess(10L, TimeUnit.MINUTES).build();
        this.lastReleased = Caffeine.newBuilder().expireAfterAccess(10L, TimeUnit.MINUTES).build();
    }

    @Override // com.digitalpebble.stormcrawler.persistence.urlbuffer.URLBuffer
    public synchronized Values next() {
        do {
            Iterator<Map.Entry<String, Queue<AbstractURLBuffer.URLMetadata>>> it = this.queues.entrySet().iterator();
            if (!it.hasNext()) {
                LOG.trace("Empty iterator");
                return null;
            }
            Map.Entry<String, Queue<AbstractURLBuffer.URLMetadata>> next = it.next();
            Queue<AbstractURLBuffer.URLMetadata> value = next.getValue();
            String key = next.getKey();
            it.remove();
            LOG.trace("Next queue {}", key);
            AbstractURLBuffer.URLMetadata uRLMetadata = null;
            if (canRelease(key)) {
                uRLMetadata = value.poll();
                LOG.trace("Item {}", uRLMetadata.url);
            } else {
                LOG.trace("Queue {} not ready to release yet", key);
            }
            if (!value.isEmpty()) {
                LOG.debug("Adding to the back of the queue {}", key);
                this.queues.put(key, value);
            } else if (this.listener != null) {
                this.listener.emptyQueue(key);
            }
            if (uRLMetadata != null) {
                this.lastReleased.put(key, Instant.now());
                this.unacked.put(uRLMetadata.url, new Object[]{Instant.now(), key});
                this.in_buffer.remove(uRLMetadata.url);
                return new Values(new Object[]{uRLMetadata.url, uRLMetadata.metadata});
            }
        } while (!this.queues.isEmpty());
        return null;
    }

    private boolean canRelease(String str) {
        Queue queue = (Queue) this.timings.getIfPresent(str);
        if (queue == null || queue.size() < this.historySize) {
            return true;
        }
        long j = 0;
        Iterator it = queue.iterator();
        while (it.hasNext()) {
            j += ((Long) it.next()).longValue();
        }
        long j2 = j / this.historySize;
        LOG.trace("Average for {}: {} msec", str, Long.valueOf(j2));
        Instant instant = (Instant) this.lastReleased.getIfPresent(str);
        if (instant == null) {
            return true;
        }
        return instant.plusMillis(j2).isBefore(Instant.now());
    }

    @Override // com.digitalpebble.stormcrawler.persistence.urlbuffer.URLBuffer
    public void acked(String str) {
        Object[] objArr = (Object[]) this.unacked.getIfPresent(str);
        if (objArr == null) {
            return;
        }
        Instant instant = (Instant) objArr[0];
        String str2 = (String) objArr[1];
        long epochMilli = Instant.now().toEpochMilli() - instant.toEpochMilli();
        LOG.trace("Adding new timing for {}: {} msec - {}", new Object[]{str2, Long.valueOf(epochMilli), str});
        addTiming(epochMilli, str2);
    }

    void addTiming(long j, String str) {
        ((Queue) this.timings.get(str, str2 -> {
            return EvictingQueue.create(this.historySize);
        })).add(Long.valueOf(j));
    }

    public void onRemoval(@Nullable String str, Object[] objArr, @NotNull RemovalCause removalCause) {
        addTiming(this.maxTimeMSec, str);
    }
}
