package org.springframework.integration.x.twitter;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.net.URI;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.social.support.URIBuilder;
import org.springframework.social.twitter.api.impl.TwitterTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RequestCallback;
import org.springframework.web.client.ResponseExtractor;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:org/springframework/integration/x/twitter/TwitterStreamChannelAdapter.class */
public class TwitterStreamChannelAdapter extends MessageProducerSupport {
    private static final String API_URL_BASE = "https://stream.twitter.com/1.1/";
    private final TwitterTemplate twitter;
    private ScheduledFuture<?> task;
    private boolean delimited;
    private boolean stallWarnings;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicInteger linearBackOff = new AtomicInteger(250);
    private final AtomicInteger httpErrorBackOff = new AtomicInteger(5000);
    private final AtomicInteger rateLimitBackOff = new AtomicInteger(60000);
    private String filterLevel = "none";
    private String language = "";
    private String track = "";
    private String follow = "";
    private String locations = "";
    private final Object monitor = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/x/twitter/TwitterStreamChannelAdapter$StreamReadingTask.class */
    public class StreamReadingTask implements Runnable {
        private StreamReadingTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (TwitterStreamChannelAdapter.this.running.get()) {
                try {
                    readStream(TwitterStreamChannelAdapter.this.twitter.getRestTemplate());
                } catch (Exception e) {
                    TwitterStreamChannelAdapter.this.logger.warn("Exception while reading stream; Add debug logging for exception trace.");
                    if (TwitterStreamChannelAdapter.this.logger.isDebugEnabled()) {
                        TwitterStreamChannelAdapter.this.logger.debug("Exception while reading stream.", e);
                    }
                    TwitterStreamChannelAdapter.this.waitLinearBackoff();
                } catch (HttpStatusCodeException e2) {
                    if (e2.getStatusCode() == HttpStatus.UNAUTHORIZED) {
                        TwitterStreamChannelAdapter.this.logger.error("Twitter authentication failed: " + e2.getMessage());
                        TwitterStreamChannelAdapter.this.running.set(false);
                    } else if (e2.getStatusCode() == HttpStatus.METHOD_FAILURE) {
                        TwitterStreamChannelAdapter.this.waitRateLimitBackoff();
                    } else {
                        TwitterStreamChannelAdapter.this.waitHttpErrorBackoff();
                    }
                }
            }
        }

        private void readStream(RestTemplate restTemplate) {
            restTemplate.execute(TwitterStreamChannelAdapter.this.buildUri(), HttpMethod.GET, new RequestCallback() { // from class: org.springframework.integration.x.twitter.TwitterStreamChannelAdapter.StreamReadingTask.1
                public void doWithRequest(ClientHttpRequest clientHttpRequest) throws IOException {
                }
            }, new ResponseExtractor<String>() { // from class: org.springframework.integration.x.twitter.TwitterStreamChannelAdapter.StreamReadingTask.2
                /* renamed from: extractData, reason: merged with bridge method [inline-methods] */
                public String m0extractData(ClientHttpResponse clientHttpResponse) throws IOException {
                    LineNumberReader lineNumberReader = null;
                    try {
                        lineNumberReader = new LineNumberReader(new InputStreamReader(clientHttpResponse.getBody()));
                        TwitterStreamChannelAdapter.this.resetBackOffs();
                        while (TwitterStreamChannelAdapter.this.running.get()) {
                            String readLine = lineNumberReader.readLine();
                            if (!StringUtils.hasText(readLine)) {
                                break;
                            }
                            TwitterStreamChannelAdapter.this.sendMessage(MessageBuilder.withPayload(readLine).build());
                        }
                        if (lineNumberReader == null) {
                            return null;
                        }
                        lineNumberReader.close();
                        return null;
                    } catch (Throwable th) {
                        if (lineNumberReader != null) {
                            lineNumberReader.close();
                        }
                        throw th;
                    }
                }
            });
        }
    }

    public TwitterStreamChannelAdapter(TwitterTemplate twitterTemplate) {
        this.twitter = twitterTemplate;
        twitterTemplate.getRestTemplate().setErrorHandler(new DefaultResponseErrorHandler());
        setPhase(Integer.MAX_VALUE);
    }

    public void setReadTimeout(int i) {
        ((SimpleClientHttpRequestFactory) new DirectFieldAccessor(this.twitter.getRestTemplate().getRequestFactory()).getPropertyValue("requestFactory")).setReadTimeout(i);
    }

    public void setConnectTimeout(int i) {
        ((SimpleClientHttpRequestFactory) new DirectFieldAccessor(this.twitter.getRestTemplate().getRequestFactory()).getPropertyValue("requestFactory")).setConnectTimeout(i);
    }

    public void setDelimited(boolean z) {
        this.delimited = z;
    }

    public void setStallWarnings(boolean z) {
        this.stallWarnings = z;
    }

    public void setFilterLevel(String str) {
        this.filterLevel = str;
    }

    public void setLanguage(String str) {
        this.language = str;
    }

    public void setTrack(String str) {
        this.track = str;
    }

    public void setFollow(String str) {
        this.follow = str;
    }

    public void setLocations(String str) {
        this.locations = str;
    }

    public String getComponentType() {
        return "twitter:gardenhose-channel-adapter";
    }

    protected void doStart() {
        synchronized (this.monitor) {
            if (this.running.get()) {
                return;
            }
            this.running.set(true);
            StreamReadingTask streamReadingTask = new StreamReadingTask();
            TaskScheduler taskScheduler = getTaskScheduler();
            if (taskScheduler != null) {
                this.task = taskScheduler.schedule(streamReadingTask, new Date());
            } else {
                Executors.newSingleThreadExecutor().execute(streamReadingTask);
            }
        }
    }

    protected void doStop() {
        if (this.task != null) {
            this.task.cancel(true);
            this.task = null;
        }
        this.running.set(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URI buildUri() {
        URIBuilder fromUri = URIBuilder.fromUri(API_URL_BASE + ((StringUtils.hasText(this.track) || StringUtils.hasText(this.follow) || StringUtils.hasText(this.locations)) ? "statuses/filter.json" : "statuses/sample.json"));
        if (this.delimited) {
            fromUri.queryParam("delimited", "length");
        }
        if (this.stallWarnings) {
            fromUri.queryParam("stall_warnings", "true");
        }
        if (!"none".equals(this.filterLevel)) {
            fromUri.queryParam("filter_level", this.filterLevel);
        }
        if (StringUtils.hasText(this.language)) {
            fromUri.queryParam("language", this.language);
        }
        if (StringUtils.hasText(this.track)) {
            fromUri.queryParam("track", this.track);
        }
        if (StringUtils.hasText(this.follow)) {
            fromUri.queryParam("follow", this.follow);
        }
        if (StringUtils.hasText(this.locations)) {
            fromUri.queryParam("locations", this.locations);
        }
        return fromUri.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetBackOffs() {
        this.linearBackOff.set(250);
        this.rateLimitBackOff.set(60000);
        this.httpErrorBackOff.set(5000);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitLinearBackoff() {
        int i = this.linearBackOff.get();
        this.logger.warn("Exception while reading stream, waiting for " + i + " ms before restarting");
        wait(i);
        if (i < 16000) {
            this.linearBackOff.set(i + 250);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitRateLimitBackoff() {
        int i = this.rateLimitBackOff.get();
        this.logger.warn("Rate limit error, waiting for " + (i / 1000) + " seconds before restarting");
        wait(i);
        this.rateLimitBackOff.set(i * 2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitHttpErrorBackoff() {
        int i = this.httpErrorBackOff.get();
        this.logger.warn("Http error, waiting for " + (i / 1000) + " seconds before restarting");
        wait(i);
        if (i < 320000) {
            this.httpErrorBackOff.set(i * 2);
        }
    }

    private void wait(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }
}
