package com.danielasfregola.twitter4s.http.clients;

import akka.Done;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import com.danielasfregola.twitter4s.entities.streaming.StreamingMessage;
import org.json4s.native.Serialization$;
import scala.MatchError;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.StringContext;
import scala.concurrent.Future;
import scala.reflect.Manifest;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: StreamingClient.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005baB\u0001\u0003!\u0003\r\t!\u0004\u0002\u0010'R\u0014X-Y7j]\u001e\u001cE.[3oi*\u00111\u0001B\u0001\bG2LWM\u001c;t\u0015\t)a!\u0001\u0003iiR\u0004(BA\u0004\t\u0003%!x/\u001b;uKJ$4O\u0003\u0002\n\u0015\u0005yA-\u00198jK2\f7O\u001a:fO>d\u0017MC\u0001\f\u0003\r\u0019w.\\\u0002\u0001'\r\u0001a\u0002\u0006\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0002#\u0005)1oY1mC&\u00111\u0003\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005U1R\"\u0001\u0002\n\u0005]\u0011!aC(BkRD7\t\\5f]RDQ!\u0007\u0001\u0005\u0002i\ta\u0001J5oSR$C#A\u000e\u0011\u0005=a\u0012BA\u000f\u0011\u0005\u0011)f.\u001b;\t\u000f}\u0001!\u0019!C\u0001A\u0005qq/\u001b;i\u0019><'+Z9vKN$X#A\u0011\u0011\u0005=\u0011\u0013BA\u0012\u0011\u0005\u001d\u0011un\u001c7fC:Dq!\n\u0001C\u0002\u0013\u0005\u0001%\u0001\fxSRDGj\\4SKF,Xm\u001d;SKN\u0004xN\\:f\r\u00159\u0003!\u0001\u0004)\u0005a\u0011\u0016n\u00195TiJ,\u0017-\\5oO\"#H\u000f\u001d*fcV,7\u000f^\n\u0003M9A\u0001B\u000b\u0014\u0003\u0006\u0004%\taK\u0001\be\u0016\fX/Z:u+\u0005a\u0003CA\u00176\u001b\u0005q#BA\u00181\u0003\u0015iw\u000eZ3m\u0015\t\t$'\u0001\u0005tG\u0006d\u0017\rZ:m\u0015\t)1GC\u00015\u0003\u0011\t7n[1\n\u0005Yr#a\u0003%uiB\u0014V-];fgRD\u0001\u0002\u000f\u0014\u0003\u0002\u0003\u0006I\u0001L\u0001\te\u0016\fX/Z:uA!)!H\nC\u0001w\u00051A(\u001b8jiz\"\"\u0001\u0010 \u0011\u0005u2S\"\u0001\u0001\t\u000b)J\u0004\u0019\u0001\u0017\t\u000b\u00013C\u0011A!\u0002\u001bA\u0014xnY3tgN#(/Z1n+\t\u0011\u0015\f\u0006\u0002DOR\u0011AI\u0013\t\u0004\u000b\"[R\"\u0001$\u000b\u0005\u001d\u0003\u0012AC2p]\u000e,(O]3oi&\u0011\u0011J\u0012\u0002\u0007\rV$XO]3\t\u000f-{\u0014\u0011!a\u0002\u0019\u0006QQM^5eK:\u001cW\rJ\u0019\u0011\u00075#vK\u0004\u0002O%B\u0011q\nE\u0007\u0002!*\u0011\u0011\u000bD\u0001\u0007yI|w\u000e\u001e \n\u0005M\u0003\u0012A\u0002)sK\u0012,g-\u0003\u0002V-\nAQ*\u00198jM\u0016\u001cHO\u0003\u0002T!A\u0011\u0001,\u0017\u0007\u0001\t\u0015QvH1\u0001\\\u0005\u0005!\u0016C\u0001/`!\tyQ,\u0003\u0002_!\t9aj\u001c;iS:<\u0007C\u00011f\u001b\u0005\t'B\u00012d\u0003%\u0019HO]3b[&twM\u0003\u0002e\r\u0005AQM\u001c;ji&,7/\u0003\u0002gC\n\u00012\u000b\u001e:fC6LgnZ'fgN\fw-\u001a\u0005\u0006Q~\u0002\r![\u0001\u0002MB!qB[,\u001c\u0013\tY\u0007CA\bQCJ$\u0018.\u00197Gk:\u001cG/[8o\u0011!i\u0007!!A\u0005\u0004\u0019q\u0017\u0001\u0007*jG\"\u001cFO]3b[&tw\r\u0013;uaJ+\u0017/^3tiR\u0011Ah\u001c\u0005\u0006U1\u0004\r\u0001\f\u0005\u0006c\u0002!\tB]\u0001\u0015aJ|7-Z:t'R\u0014X-Y7SKF,Xm\u001d;\u0016\u0005MTHC\u0001;~)\t)8\u0010\u0006\u0002Em\"9q\u000f]A\u0001\u0002\bA\u0018AC3wS\u0012,gnY3%eA\u0019Q\nV=\u0011\u0005aSH!\u0002.q\u0005\u0004Y\u0006\"\u00025q\u0001\u0004a\b\u0003B\bksnAQA\u000b9A\u00021Baa \u0001\u0005\u0012\u0005\u0005\u0011aD;o[\u0006\u00148\u000f[1m'R\u0014X-Y7\u0016\t\u0005\r\u0011q\u0002\u000b\u0007\u0003\u000b\t\u0019\"!\b\u0015\u000b\u0011\u000b9!!\u0005\t\u0013\u0005%a0!AA\u0004\u0005-\u0011AC3wS\u0012,gnY3%gA!Q\nVA\u0007!\rA\u0016q\u0002\u0003\u00065z\u0014\ra\u0017\u0005\u0006Uy\u0004\u001d\u0001\f\u0005\b\u0003+q\b\u0019AA\f\u0003!\u0011Xm\u001d9p]N,\u0007cA\u0017\u0002\u001a%\u0019\u00111\u0004\u0018\u0003\u0019!#H\u000f\u001d*fgB|gn]3\t\r!t\b\u0019AA\u0010!\u0015y!.!\u0004\u001c\u0001")
/* loaded from: input_file:com/danielasfregola/twitter4s/http/clients/StreamingClient.class */
public interface StreamingClient extends OAuthClient {

    /* compiled from: StreamingClient.scala */
    /* loaded from: input_file:com/danielasfregola/twitter4s/http/clients/StreamingClient$RichStreamingHttpRequest.class */
    public class RichStreamingHttpRequest {
        private final HttpRequest request;
        public final /* synthetic */ StreamingClient $outer;

        public HttpRequest request() {
            return this.request;
        }

        public <T extends StreamingMessage> Future<BoxedUnit> processStream(PartialFunction<T, BoxedUnit> partialFunction, Manifest<T> manifest) {
            return ((Future) com$danielasfregola$twitter4s$http$clients$StreamingClient$RichStreamingHttpRequest$$$outer().withOAuthHeader().apply(request())).flatMap(httpRequest -> {
                return this.com$danielasfregola$twitter4s$http$clients$StreamingClient$RichStreamingHttpRequest$$$outer().processStreamRequest(httpRequest, partialFunction, manifest).map(boxedUnit -> {
                    $anonfun$processStream$2(boxedUnit);
                    return BoxedUnit.UNIT;
                }, this.com$danielasfregola$twitter4s$http$clients$StreamingClient$RichStreamingHttpRequest$$$outer().executionContext());
            }, com$danielasfregola$twitter4s$http$clients$StreamingClient$RichStreamingHttpRequest$$$outer().executionContext());
        }

        public /* synthetic */ StreamingClient com$danielasfregola$twitter4s$http$clients$StreamingClient$RichStreamingHttpRequest$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ void $anonfun$processStream$2(BoxedUnit boxedUnit) {
        }

        public RichStreamingHttpRequest(StreamingClient streamingClient, HttpRequest httpRequest) {
            this.request = httpRequest;
            if (streamingClient == null) {
                throw null;
            }
            this.$outer = streamingClient;
        }
    }

    void com$danielasfregola$twitter4s$http$clients$StreamingClient$_setter_$withLogRequest_$eq(boolean z);

    void com$danielasfregola$twitter4s$http$clients$StreamingClient$_setter_$withLogRequestResponse_$eq(boolean z);

    @Override // com.danielasfregola.twitter4s.http.clients.CommonClient
    boolean withLogRequest();

    @Override // com.danielasfregola.twitter4s.http.clients.CommonClient
    boolean withLogRequestResponse();

    default RichStreamingHttpRequest RichStreamingHttpRequest(HttpRequest httpRequest) {
        return new RichStreamingHttpRequest(this, httpRequest);
    }

    default <T extends StreamingMessage> Future<BoxedUnit> processStreamRequest(HttpRequest httpRequest, PartialFunction<T, BoxedUnit> partialFunction, Manifest<T> manifest) {
        long currentTimeMillis = System.currentTimeMillis();
        if (withLogRequest()) {
            logRequest(httpRequest);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return (Future) Source$.MODULE$.single(httpRequest).via(connection(httpRequest)).mapAsync(1, httpResponse -> {
            return this.unmarshal(currentTimeMillis, httpResponse -> {
                return this.unmarshalStream(httpResponse, partialFunction, manifest, httpRequest);
            }, httpRequest, httpResponse);
        }).runWith(Sink$.MODULE$.head(), materializer());
    }

    default <T extends StreamingMessage> Future<BoxedUnit> unmarshalStream(HttpResponse httpResponse, PartialFunction<T, BoxedUnit> partialFunction, Manifest<T> manifest, HttpRequest httpRequest) {
        return httpResponse.entity().dataBytes().scan("", (str, byteString) -> {
            return str.contains("\r\n") ? byteString.utf8String() : str + byteString.utf8String();
        }).filter(str2 -> {
            return BoxesRunTime.boxToBoolean(str2.contains("\r\n"));
        }).map(str3 -> {
            return Try$.MODULE$.apply(() -> {
                return (StreamingMessage) Serialization$.MODULE$.read(str3, this.json4sFormats(), ManifestFactory$.MODULE$.classType(StreamingMessage.class));
            });
        }).runForeach(r10 -> {
            $anonfun$unmarshalStream$5(this, partialFunction, manifest, httpRequest, r10);
            return BoxedUnit.UNIT;
        }, materializer()).map(done -> {
            $anonfun$unmarshalStream$6(done);
            return BoxedUnit.UNIT;
        }, executionContext());
    }

    static /* synthetic */ void $anonfun$unmarshalStream$5(StreamingClient streamingClient, PartialFunction partialFunction, Manifest manifest, HttpRequest httpRequest, Try r14) {
        if (!(r14 instanceof Success)) {
            if (!(r14 instanceof Failure)) {
                throw new MatchError(r14);
            }
            streamingClient.log().error(((Failure) r14).exception(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"While processing stream ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{httpRequest.uri()})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        StreamingMessage streamingMessage = (StreamingMessage) ((Success) r14).value();
        Option unapply = manifest.unapply(streamingMessage);
        if (unapply.isEmpty() || unapply.get() == null || !partialFunction.isDefinedAt(streamingMessage)) {
            streamingClient.log().debug("Ignoring message of type {}", streamingMessage.getClass().getSimpleName());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            streamingClient.log().debug("Processing message of type {}: {}", streamingMessage.getClass().getSimpleName(), streamingMessage);
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    static /* synthetic */ void $anonfun$unmarshalStream$6(Done done) {
    }

    static void $init$(StreamingClient streamingClient) {
        streamingClient.com$danielasfregola$twitter4s$http$clients$StreamingClient$_setter_$withLogRequest_$eq(true);
        streamingClient.com$danielasfregola$twitter4s$http$clients$StreamingClient$_setter_$withLogRequestResponse_$eq(false);
    }
}
