package eu.dicodeproject.twitterstream;

import eu.dicodeproject.twitterstream.sink.TweetSink;
import eu.dicodeproject.twitterstream.source.TweetSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import twitter4j.Status;

/* loaded from: input_file:eu/dicodeproject/twitterstream/StreamingBroker.class */
public class StreamingBroker {
    private final Iterator<Status> sourceIt;
    private final TweetSink sink;
    private final ExecutorService executor;
    private boolean routing;
    private List<Future<Boolean>> results;
    private TweetSource source;
    private int streamBrokerPoolSize;

    /* loaded from: input_file:eu/dicodeproject/twitterstream/StreamingBroker$Plane.class */
    private static final class Plane implements Callable<Boolean> {
        private final Iterator<Status> source;
        private final TweetSink sink;

        public Plane(Iterator<Status> it, TweetSink tweetSink) {
            this.source = it;
            this.sink = tweetSink;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws InterruptedException, IOException {
            while (this.source.hasNext()) {
                Status next = this.source.next();
                if (next != null) {
                    this.sink.store(next);
                }
                Thread.sleep(200L);
            }
            return Boolean.TRUE;
        }
    }

    public StreamingBroker(TweetSource tweetSource, TweetSink tweetSink, int i) {
        this.routing = false;
        this.results = new ArrayList();
        this.sink = tweetSink;
        this.source = tweetSource;
        this.sourceIt = tweetSource.iterator();
        this.executor = Executors.newFixedThreadPool(i);
        this.streamBrokerPoolSize = i;
    }

    public StreamingBroker(TweetSource tweetSource, TweetSink tweetSink) {
        this(tweetSource, tweetSink, 10);
    }

    public synchronized void start() {
        this.source.triggerHarvesting();
        if (this.routing) {
            return;
        }
        int i = this.streamBrokerPoolSize;
        for (int i2 = 0; i2 < i; i2++) {
            this.results.add(this.executor.submit(new Plane(this.sourceIt, this.sink)));
        }
        this.routing = true;
    }

    public synchronized void stop() {
        if (this.routing) {
            Iterator<Future<Boolean>> it = this.results.iterator();
            while (it.hasNext()) {
                it.next().cancel(true);
            }
        }
    }

    public boolean busy() {
        for (Future<Boolean> future : this.results) {
            if (!future.isDone() && !future.isCancelled()) {
                return true;
            }
        }
        return false;
    }
}
