package eu.dicodeproject.twitterstream.source;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

/* loaded from: input_file:eu/dicodeproject/twitterstream/source/TweetSamplingStream.class */
public class TweetSamplingStream implements TweetSource, StatusListener {
    private static final Logger LOG = LoggerFactory.getLogger(TweetSamplingStream.class);
    private BlockingQueue<Status> tweetQueue;
    private String streamConsumerkey;
    private String streamConsumersecret;
    private String streamAccesstoken;
    private String streamAccesstokensecret;
    private final TweetSamplingStreamIterator iterator = new TweetSamplingStreamIterator();
    private final AtomicInteger failures = new AtomicInteger(0);
    private int streamTweetReceiveTimeout = 100;
    private int streamTweetTakeTimeout = 10;
    private int streamTweetQueueSize = 200;
    private int streamTweetErrorThreshold = 300;

    /* loaded from: input_file:eu/dicodeproject/twitterstream/source/TweetSamplingStream$TweetSamplingStreamIterator.class */
    private class TweetSamplingStreamIterator implements Iterator<Status> {
        private TweetSamplingStreamIterator() {
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (TweetSamplingStream.this.failures.get() >= TweetSamplingStream.this.streamTweetErrorThreshold) {
                TweetSamplingStream.LOG.error("Stopping twitter streaming. Too many errors: " + TweetSamplingStream.this.failures);
            }
            return TweetSamplingStream.this.failures.get() < TweetSamplingStream.this.streamTweetErrorThreshold;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Status next() {
            try {
                return (Status) TweetSamplingStream.this.tweetQueue.poll(TweetSamplingStream.this.streamTweetTakeTimeout, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                TweetSamplingStream.LOG.info("Failed to retrieve any tweets in " + TweetSamplingStream.this.streamTweetTakeTimeout + " seconds. Increasing failure counter by one.");
                TweetSamplingStream.this.failures.incrementAndGet();
                return null;
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Remove on twitter streams not possible.");
        }
    }

    @Override // eu.dicodeproject.twitterstream.source.TweetSource
    public void triggerHarvesting() {
        Preconditions.checkNotNull(this.streamConsumerkey, "streamConsumerkey missing!");
        Preconditions.checkNotNull(this.streamConsumersecret, "streamConsumersecret missing!");
        Preconditions.checkNotNull(this.streamAccesstoken, "streamAccesstoken missing!");
        Preconditions.checkNotNull(this.streamAccesstokensecret, "streamAccesstokensecret missing!");
        initQueue();
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.setOAuthAccessToken(this.streamAccesstoken);
        configurationBuilder.setOAuthAccessTokenSecret(this.streamAccesstokensecret);
        configurationBuilder.setOAuthConsumerKey(this.streamConsumerkey);
        configurationBuilder.setOAuthConsumerSecret(this.streamConsumersecret);
        TwitterStream twitterStreamFactory = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
        twitterStreamFactory.addListener(this);
        twitterStreamFactory.sample();
    }

    void initQueue() {
        this.tweetQueue = new LinkedBlockingQueue(this.streamTweetQueueSize);
    }

    @Override // java.lang.Iterable
    public Iterator<Status> iterator() {
        return this.iterator;
    }

    public void onStatus(Status status) {
        try {
            if (!this.tweetQueue.offer(status, this.streamTweetReceiveTimeout, TimeUnit.SECONDS)) {
                LOG.warn("Failed to add current tweet content to queue: Dropping as queue seems full.");
            }
        } catch (InterruptedException e) {
            this.failures.incrementAndGet();
            LOG.warn("Could not process current tweet due to exception. Increasing failure count by one and continuing to process. " + e.getMessage());
        }
    }

    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
    }

    public void onTrackLimitationNotice(int i) {
    }

    public void onException(Exception exc) {
        this.failures.incrementAndGet();
    }

    public void onScrubGeo(long j, long j2) {
    }

    public void setStreamConsumerkey(String str) {
        this.streamConsumerkey = str;
    }

    public void setStreamConsumersecret(String str) {
        this.streamConsumersecret = str;
    }

    public void setStreamAccesstoken(String str) {
        this.streamAccesstoken = str;
    }

    public void setStreamAccesstokensecret(String str) {
        this.streamAccesstokensecret = str;
    }

    public void setStreamTweetReceiveTimeout(int i) {
        this.streamTweetReceiveTimeout = i;
    }

    public void setStreamTweetTakeTimeout(int i) {
        this.streamTweetTakeTimeout = i;
    }

    public void setStreamTweetQueueSize(int i) {
        this.streamTweetQueueSize = i;
    }

    public void setStreamTweetErrorThreshold(int i) {
        this.streamTweetErrorThreshold = i;
    }
}
