package spinoco.fs2.kafka;

import fs2.Scheduler;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$StreamInvariantOps$;
import fs2.util.Async;
import fs2.util.Async$;
import fs2.util.Catchable;
import fs2.util.Lub1$;
import fs2.util.syntax$;
import fs2.util.syntax$FunctorOps$;
import fs2.util.syntax$MonadOps$;
import scala.Enumeration;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.VolatileObjectRef;
import shapeless.Typeable;
import shapeless.tag$;
import spinoco.fs2.kafka.Cpackage;
import spinoco.fs2.kafka.failure.NoBrokerAvailable$;
import spinoco.fs2.kafka.network.BrokerAddress;
import spinoco.protocol.kafka.Message;
import spinoco.protocol.kafka.ProtocolVersion$;
import spinoco.protocol.kafka.Request;
import spinoco.protocol.kafka.RequestMessage;
import spinoco.protocol.kafka.Response;
import spinoco.protocol.kafka.ResponseMessage;

/* JADX WARN: Incorrect field signature: Ljava/lang/Object; */
/* compiled from: KafkaClient.scala */
/* loaded from: input_file:spinoco/fs2/kafka/KafkaClient$impl$.class */
public class KafkaClient$impl$ {
    public static final KafkaClient$impl$ MODULE$ = null;
    private final int consumerBrokerId;

    static {
        new KafkaClient$impl$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private KafkaClient$impl$PublisherState$4$ PublisherState$2$lzycompute(VolatileObjectRef volatileObjectRef) {
        ?? r0 = this;
        synchronized (r0) {
            if (volatileObjectRef.elem == null) {
                volatileObjectRef.elem = new KafkaClient$impl$PublisherState$4$();
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return (KafkaClient$impl$PublisherState$4$) volatileObjectRef.elem;
        }
    }

    public <F> F mkClient(Set<BrokerAddress> set, Function2<String, Object, F> function2, Function2<BrokerAddress, Request.MetadataRequest, F> function22, Function1<BrokerAddress, Function1<Stream<F, Request.FetchRequest>, Stream<F, Tuple2<Request.FetchRequest, Response.FetchResponse>>>> function1, Function1<BrokerAddress, Function1<Stream<F, Request.OffsetsRequest>, Stream<F, Response.OffsetResponse>>> function12, Function1<BrokerAddress, Function1<Stream<F, Request.MetadataRequest>, Stream<F, Response.MetadataResponse>>> function13, FiniteDuration finiteDuration, Enumeration.Value value, Async<F> async, Logger<F> logger, Scheduler scheduler) {
        return (F) syntax$FunctorOps$.MODULE$.map$extension(syntax$.MODULE$.FunctorOps(mkPublishers(function2, async)), new KafkaClient$impl$$anonfun$mkClient$1(set, function22, function1, function12, function13, finiteDuration, value, async, logger, scheduler), async);
    }

    /* JADX WARN: Incorrect types in method signature: <F:Ljava/lang/Object;>(Lscala/Function2<Lspinoco/fs2/kafka/network/BrokerAddress;Lspinoco/protocol/kafka/Request$MetadataRequest;TF;>;Lscala/collection/Seq<Lspinoco/fs2/kafka/network/BrokerAddress;>;Ljava/lang/String;Ljava/lang/Object;Lfs2/util/Catchable<TF;>;)TF; */
    public Object leaderFor(Function2 function2, Seq seq, String str, int i, Catchable catchable) {
        return Stream$StreamInvariantOps$.MODULE$.runLast$extension(Stream$.MODULE$.StreamInvariantOps(Stream$.MODULE$.emits(seq).evalMap(new KafkaClient$impl$$anonfun$leaderFor$3(function2, str, catchable), Lub1$.MODULE$.id()).collect(new KafkaClient$impl$$anonfun$leaderFor$1()).map(new KafkaClient$impl$$anonfun$leaderFor$4(str, i)).collectFirst(new KafkaClient$impl$$anonfun$leaderFor$2())), catchable);
    }

    /* JADX WARN: Incorrect return type in method signature: ()Ljava/lang/Object; */
    public int consumerBrokerId() {
        return this.consumerBrokerId;
    }

    public <F> Function1<Stream<F, Request.FetchRequest>, Stream<F, Tuple2<Request.FetchRequest, Response.FetchResponse>>> fetchBrokerConnection(Function1<BrokerAddress, Function1<Stream<F, RequestMessage>, Stream<F, ResponseMessage>>> function1, Enumeration.Value value, String str, BrokerAddress brokerAddress, Async<F> async) {
        return new KafkaClient$impl$$anonfun$fetchBrokerConnection$1(function1, value, str, brokerAddress, async);
    }

    public <F> Function1<Stream<F, Request.OffsetsRequest>, Stream<F, Response.OffsetResponse>> offsetConnection(Function1<BrokerAddress, Function1<Stream<F, RequestMessage>, Stream<F, ResponseMessage>>> function1, Enumeration.Value value, String str, BrokerAddress brokerAddress, Async<F> async) {
        return new KafkaClient$impl$$anonfun$offsetConnection$1(function1, value, str, brokerAddress);
    }

    public <F> Function1<Stream<F, Request.MetadataRequest>, Stream<F, Response.MetadataResponse>> metadataConnection(Function1<BrokerAddress, Function1<Stream<F, RequestMessage>, Stream<F, ResponseMessage>>> function1, Enumeration.Value value, String str, BrokerAddress brokerAddress, Async<F> async) {
        return new KafkaClient$impl$$anonfun$metadataConnection$1(function1, value, str, brokerAddress, async);
    }

    /* JADX WARN: Incorrect types in method signature: <F:Ljava/lang/Object;>(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;ZIILscala/concurrent/duration/FiniteDuration;Lscala/Enumeration$Value;Lscala/Function1<Lspinoco/fs2/kafka/network/BrokerAddress;Lscala/Function1<Lfs2/Stream<TF;Lspinoco/protocol/kafka/Request$FetchRequest;>;Lfs2/Stream<TF;Lscala/Tuple2<Lspinoco/protocol/kafka/Request$FetchRequest;Lspinoco/protocol/kafka/Response$FetchResponse;>;>;>;>;Lscala/Function2<Ljava/lang/String;Ljava/lang/Object;TF;>;Lscala/Function2<Ljava/lang/String;Ljava/lang/Object;TF;>;Lscala/concurrent/duration/FiniteDuration;ILfs2/util/Async<TF;>;Lfs2/Scheduler;Lspinoco/fs2/kafka/Logger<TF;>;)Lfs2/Stream<TF;Lspinoco/fs2/kafka/package$TopicMessage;>; */
    public Stream subscribePartition(String str, int i, long j, boolean z, int i2, int i3, FiniteDuration finiteDuration, Enumeration.Value value, Function1 function1, Function2 function2, Function2 function22, FiniteDuration finiteDuration2, int i4, Async async, Scheduler scheduler, Logger logger) {
        return Stream$.MODULE$.eval(async.refOf(new Tuple2(BoxesRunTime.boxToLong(j), BoxesRunTime.boxToInteger(0)))).flatMap(new KafkaClient$impl$$anonfun$subscribePartition$1(str, i, z, i2, i3, finiteDuration, value, function1, function2, function22, finiteDuration2, i4, async, scheduler, logger), Lub1$.MODULE$.id());
    }

    public Vector<Cpackage.TopicMessage> messagesFromResult(Enumeration.Value value, Response.PartitionFetchResult partitionFetchResult) {
        return (Vector) partitionFetchResult.messages().flatMap(new KafkaClient$impl$$anonfun$messagesFromResult$1(value, partitionFetchResult), Vector$.MODULE$.canBuildFrom());
    }

    /* JADX WARN: Incorrect types in method signature: <F:Ljava/lang/Object;>(Lscala/Function2<Ljava/lang/String;Ljava/lang/Object;TF;>;Lscala/Function1<Lspinoco/fs2/kafka/network/BrokerAddress;Lscala/Function1<Lfs2/Stream<TF;Lspinoco/protocol/kafka/Request$OffsetsRequest;>;Lfs2/Stream<TF;Lspinoco/protocol/kafka/Response$OffsetResponse;>;>;>;Lscala/concurrent/duration/FiniteDuration;Ljava/lang/String;Ljava/lang/Object;Lfs2/util/Async<TF;>;Lfs2/Scheduler;)TF; */
    public Object queryOffsetRange(Function2 function2, Function1 function1, FiniteDuration finiteDuration, String str, int i, Async async, Scheduler scheduler) {
        return syntax$MonadOps$.MODULE$.flatMap$extension(syntax$.MODULE$.MonadOps(function2.apply(str, BoxesRunTime.boxToInteger(i))), new KafkaClient$impl$$anonfun$queryOffsetRange$1(function1, finiteDuration, str, i, async, scheduler), async);
    }

    public <F, I extends Request, O extends Response> F requestReplyBroker(Function1<BrokerAddress, Function1<Stream<F, RequestMessage>, Stream<F, ResponseMessage>>> function1, Enumeration.Value value, String str, BrokerAddress brokerAddress, I i, Async<F> async, Typeable<O> typeable) {
        return (F) syntax$MonadOps$.MODULE$.flatMap$extension(syntax$.MODULE$.MonadOps(async.ref()), new KafkaClient$impl$$anonfun$requestReplyBroker$1(function1, value, str, brokerAddress, i, async, typeable), async);
    }

    /* JADX WARN: Incorrect types in method signature: <F:Ljava/lang/Object;>(Lscala/Function1<Lspinoco/fs2/kafka/network/BrokerAddress;Lscala/Function1<Lfs2/Stream<TF;Lspinoco/protocol/kafka/RequestMessage;>;Lfs2/Stream<TF;Lspinoco/protocol/kafka/ResponseMessage;>;>;>;Lscala/Enumeration$Value;Ljava/lang/String;Lscala/Function2<Ljava/lang/String;Ljava/lang/Object;TF;>;Lscala/concurrent/duration/FiniteDuration;Ljava/lang/String;Ljava/lang/Object;Lfs2/util/Async<TF;>;Lfs2/Scheduler;Lspinoco/fs2/kafka/Logger<TF;>;)TF; */
    public Object publishLeaderConnection(Function1 function1, Enumeration.Value value, String str, Function2 function2, FiniteDuration finiteDuration, String str2, int i, Async async, Scheduler scheduler, Logger logger) {
        return syntax$MonadOps$.MODULE$.flatMap$extension(syntax$.MODULE$.MonadOps(fs2.async.package$.MODULE$.signalOf(BoxesRunTime.boxToBoolean(false), async)), new KafkaClient$impl$$anonfun$publishLeaderConnection$1(function1, value, str, function2, finiteDuration, str2, i, async, scheduler, logger), async);
    }

    public <F> F mkPublishers(Function2<String, Object, F> function2, Async<F> async) {
        return (F) syntax$FunctorOps$.MODULE$.map$extension(syntax$.MODULE$.FunctorOps(Async$.MODULE$.refOf(PublisherState$2(VolatileObjectRef.zero()).apply(false, (Map) Predef$.MODULE$.Map().empty()), async)), new KafkaClient$impl$$anonfun$mkPublishers$1(function2, async), async);
    }

    public <F> Stream<F, Map<Tuple2<String, Object>, BrokerAddress>> leadersDiscrete(Function1<BrokerAddress, Function1<Stream<F, Request.MetadataRequest>, Stream<F, Response.MetadataResponse>>> function1, Seq<BrokerAddress> seq, FiniteDuration finiteDuration, Vector<String> vector, Async<F> async, Scheduler scheduler, Logger<F> logger) {
        return spinoco$fs2$kafka$KafkaClient$impl$$go$1(seq, false, function1, seq, finiteDuration, async, scheduler, logger, new Request.MetadataRequest(vector));
    }

    public final Vector spinoco$fs2$kafka$KafkaClient$impl$$extractCompressed$1(Vector vector, long j, Enumeration.Value value) {
        boolean z;
        boolean z2;
        Vector vector2;
        Enumeration.Value Kafka_0_8 = ProtocolVersion$.MODULE$.Kafka_0_8();
        if (Kafka_0_8 != null ? !Kafka_0_8.equals(value) : value != null) {
            Enumeration.Value Kafka_0_9 = ProtocolVersion$.MODULE$.Kafka_0_9();
            z = Kafka_0_9 != null ? Kafka_0_9.equals(value) : value == null;
        } else {
            z = true;
        }
        if (z) {
            vector2 = (Vector) vector.collect(new KafkaClient$impl$$anonfun$spinoco$fs2$kafka$KafkaClient$impl$$extractCompressed$1$1(), Vector$.MODULE$.canBuildFrom());
        } else {
            Enumeration.Value Kafka_0_10 = ProtocolVersion$.MODULE$.Kafka_0_10();
            if (Kafka_0_10 != null ? !Kafka_0_10.equals(value) : value != null) {
                Enumeration.Value Kafka_0_10_1 = ProtocolVersion$.MODULE$.Kafka_0_10_1();
                if (Kafka_0_10_1 != null ? !Kafka_0_10_1.equals(value) : value != null) {
                    Enumeration.Value Kafka_0_10_2 = ProtocolVersion$.MODULE$.Kafka_0_10_2();
                    z2 = Kafka_0_10_2 != null ? Kafka_0_10_2.equals(value) : value == null;
                } else {
                    z2 = true;
                }
            } else {
                z2 = true;
            }
            if (!z2) {
                throw new MatchError(value);
            }
            vector2 = (Vector) vector.collect(new KafkaClient$impl$$anonfun$spinoco$fs2$kafka$KafkaClient$impl$$extractCompressed$1$2((j - vector.size()) + 1), Vector$.MODULE$.canBuildFrom());
        }
        return vector2;
    }

    public final Cpackage.TopicMessage spinoco$fs2$kafka$KafkaClient$impl$$toTopicMessage$1(Message.SingleMessage singleMessage, Response.PartitionFetchResult partitionFetchResult) {
        return new Cpackage.TopicMessage(package$.MODULE$.offset(singleMessage.offset()), singleMessage.key(), singleMessage.value(), partitionFetchResult.highWMOffset());
    }

    private final KafkaClient$impl$PublisherState$4$ PublisherState$2(VolatileObjectRef volatileObjectRef) {
        return volatileObjectRef.elem == null ? PublisherState$2$lzycompute(volatileObjectRef) : (KafkaClient$impl$PublisherState$4$) volatileObjectRef.elem;
    }

    public final Map spinoco$fs2$kafka$KafkaClient$impl$$buildMap$1(Response.MetadataResponse metadataResponse) {
        return ((TraversableOnce) metadataResponse.topics().flatMap(new KafkaClient$impl$$anonfun$spinoco$fs2$kafka$KafkaClient$impl$$buildMap$1$1(((TraversableOnce) metadataResponse.brokers().map(new KafkaClient$impl$$anonfun$21(), Vector$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())), Vector$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public final Stream spinoco$fs2$kafka$KafkaClient$impl$$go$1(Seq seq, boolean z, Function1 function1, Seq seq2, FiniteDuration finiteDuration, Async async, Scheduler scheduler, Logger logger, Request.MetadataRequest metadataRequest) {
        Stream flatMap;
        while (true) {
            Some headOption = seq.headOption();
            if (None$.MODULE$.equals(headOption)) {
                if (!z) {
                    flatMap = Stream$.MODULE$.fail(NoBrokerAvailable$.MODULE$);
                    break;
                }
                z = false;
                seq = seq2;
            } else {
                if (!(headOption instanceof Some)) {
                    throw new MatchError(headOption);
                }
                flatMap = Stream$.MODULE$.eval(Async$.MODULE$.refOf(BoxesRunTime.boxToBoolean(z), async)).flatMap(new KafkaClient$impl$$anonfun$spinoco$fs2$kafka$KafkaClient$impl$$go$1$1(function1, seq2, finiteDuration, async, scheduler, logger, metadataRequest, seq, (BrokerAddress) headOption.x()), Lub1$.MODULE$.id());
            }
        }
        return flatMap;
    }

    public KafkaClient$impl$() {
        MODULE$ = this;
        this.consumerBrokerId = BoxesRunTime.unboxToInt(tag$.MODULE$.apply().apply(BoxesRunTime.boxToInteger(-1)));
    }
}
