package spinoco.fs2.kafka.network;

import cats.Applicative;
import cats.Applicative$;
import cats.Monad;
import cats.effect.concurrent.Ref;
import cats.syntax.package$all$;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.Pull;
import fs2.Pull$;
import fs2.Segment;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$InvariantOps$;
import fs2.Stream$ToPull$;
import fs2.internal.FreeC;
import scala.Enumeration;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$$eq$colon$eq$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.Map;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scodec.DecodeResult;
import scodec.Err;
import scodec.bits.BitVector;
import scodec.bits.ByteVector;
import scodec.bits.ByteVector$;
import spinoco.protocol.kafka.ApiKey$;
import spinoco.protocol.kafka.Request;
import spinoco.protocol.kafka.Request$RequiredAcks$;
import spinoco.protocol.kafka.RequestMessage;
import spinoco.protocol.kafka.Response;
import spinoco.protocol.kafka.ResponseMessage;
import spinoco.protocol.kafka.codec.MessageCodec$;

/* compiled from: BrokerConnection.scala */
/* loaded from: input_file:spinoco/fs2/kafka/network/BrokerConnection$impl$.class */
public class BrokerConnection$impl$ {
    public static BrokerConnection$impl$ MODULE$;

    static {
        new BrokerConnection$impl$();
    }

    public <F> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> sendMessages(Ref<F, Map<Object, RequestMessage>> ref, Function1<Chunk<Object>, F> function1, Applicative<F> applicative, Monad<F> monad) {
        return obj -> {
            return new Stream($anonfun$sendMessages$1(ref, function1, monad, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> receiveMessages(Ref<F, Map<Object, RequestMessage>> ref) {
        return obj -> {
            return new Stream($anonfun$receiveMessages$1(ref, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> receiveChunks() {
        return obj -> {
            return new Stream($anonfun$receiveChunks$3(((Stream) obj).fs2$Stream$$free()));
        };
    }

    public Tuple3<ByteVector, Option<Object>, Segment<ByteVector, BoxedUnit>> collectChunks(ByteVector byteVector, Option<Object> option) {
        return go$2(byteVector, option, package$.MODULE$.Vector().empty());
    }

    public <F> Function1<FreeC<?, BoxedUnit>, FreeC<?, BoxedUnit>> decodeReceived(Ref<F, Map<Object, RequestMessage>> ref) {
        return obj -> {
            return new Stream($anonfun$decodeReceived$1(ref, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public static final /* synthetic */ FreeC $anonfun$sendMessages$5(RequestMessage requestMessage, Err err) {
        return Stream$.MODULE$.raiseError(new Throwable(new StringBuilder(32).append("Failed to serialize message: ").append(err).append(" : ").append(requestMessage).toString()));
    }

    public static final /* synthetic */ FreeC $anonfun$sendMessages$6(Function1 function1, BitVector bitVector) {
        return Stream$.MODULE$.eval(function1.apply(Chunk$.MODULE$.bytes(bitVector.toByteArray())));
    }

    public static final /* synthetic */ FreeC $anonfun$sendMessages$4(Function1 function1, RequestMessage requestMessage) {
        return ((Stream) MessageCodec$.MODULE$.requestCodec().encode(requestMessage).fold(err -> {
            return new Stream($anonfun$sendMessages$5(requestMessage, err));
        }, bitVector -> {
            return new Stream($anonfun$sendMessages$6(function1, bitVector));
        })).fs2$Stream$$free();
    }

    public static final /* synthetic */ FreeC $anonfun$sendMessages$1(Ref ref, Function1 function1, Monad monad, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.evalMap$extension(freeC, requestMessage -> {
            Object as;
            Request.ProduceRequest request = requestMessage.request();
            if (request instanceof Request.ProduceRequest) {
                Enumeration.Value requiredAcks = request.requiredAcks();
                Enumeration.Value NoResponse = Request$RequiredAcks$.MODULE$.NoResponse();
                if (requiredAcks != null ? requiredAcks.equals(NoResponse) : NoResponse == null) {
                    as = Applicative$.MODULE$.apply(monad).pure(requestMessage);
                    return as;
                }
            }
            as = package$all$.MODULE$.toFunctorOps(ref.update(map -> {
                return map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(requestMessage.correlationId())), requestMessage));
            }), monad).as(requestMessage);
            return as;
        }), requestMessage2 -> {
            return new Stream($anonfun$sendMessages$4(function1, requestMessage2));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$receiveMessages$1(Ref ref, FreeC freeC) {
        return Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(freeC, MODULE$.receiveChunks()), MODULE$.decodeReceived(ref));
    }

    public static final /* synthetic */ FreeC $anonfun$receiveChunks$2(FreeC freeC, ByteVector byteVector, Option option) {
        return go$1(byteVector, option, freeC);
    }

    public static final /* synthetic */ FreeC $anonfun$receiveChunks$1(ByteVector byteVector, Option option, Option option2) {
        FreeC raiseError;
        Tuple2 tuple2;
        if ((option2 instanceof Some) && (tuple2 = (Tuple2) ((Some) option2).value()) != null) {
            Chunk chunk = (Chunk) tuple2._1();
            FreeC fs2$Stream$$free = ((Stream) tuple2._2()).fs2$Stream$$free();
            Chunk.Bytes bytes = chunk.toBytes(Predef$$eq$colon$eq$.MODULE$.tpEquals());
            Tuple3<ByteVector, Option<Object>, Segment<ByteVector, BoxedUnit>> collectChunks = MODULE$.collectChunks(byteVector.$plus$plus(ByteVector$.MODULE$.view(bytes.values(), bytes.offset(), bytes.size())), option);
            if (collectChunks == null) {
                throw new MatchError(collectChunks);
            }
            Tuple3 tuple3 = new Tuple3((ByteVector) collectChunks._1(), (Option) collectChunks._2(), (Segment) collectChunks._3());
            ByteVector byteVector2 = (ByteVector) tuple3._1();
            Option option3 = (Option) tuple3._2();
            raiseError = Pull$.MODULE$.$greater$greater$extension(Pull$.MODULE$.segment((Segment) tuple3._3(), Pull$.MODULE$.segment$default$2(), Pull$.MODULE$.segment$default$3()), () -> {
                return new Pull($anonfun$receiveChunks$2(fs2$Stream$$free, byteVector2, option3));
            });
        } else {
            if (!None$.MODULE$.equals(option2)) {
                throw new MatchError(option2);
            }
            raiseError = byteVector.nonEmpty() ? Pull$.MODULE$.raiseError(new Throwable(new StringBuilder(54).append("Input terminated before all data were consumed. Buff: ").append(byteVector).toString())) : Pull$.MODULE$.done();
        }
        return raiseError;
    }

    private static final FreeC go$1(ByteVector byteVector, Option option, FreeC freeC) {
        return Pull$.MODULE$.flatMap$extension(Stream$ToPull$.MODULE$.unconsChunk$extension(Stream$InvariantOps$.MODULE$.pull$extension(Stream$.MODULE$.InvariantOps(freeC))), option2 -> {
            return new Pull($anonfun$receiveChunks$1(byteVector, option, option2));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$receiveChunks$3(FreeC freeC) {
        return Pull$.MODULE$.stream$extension(go$1(ByteVector$.MODULE$.empty(), None$.MODULE$, freeC));
    }

    /* JADX WARN: Code restructure failed: missing block: B:22:0x0176, code lost:
    
        return r12;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private final scala.Tuple3 go$2(scodec.bits.ByteVector r8, scala.Option r9, scala.collection.immutable.Vector r10) {
        /*
            Method dump skipped, instructions count: 375
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: spinoco.fs2.kafka.network.BrokerConnection$impl$.go$2(scodec.bits.ByteVector, scala.Option, scala.collection.immutable.Vector):scala.Tuple3");
    }

    public static final /* synthetic */ FreeC $anonfun$decodeReceived$5(RequestMessage requestMessage, ByteVector byteVector, Err err) {
        return Stream$.MODULE$.raiseError(new Throwable(new StringBuilder(44).append("Failed to decode response to request: ").append(err).append(" : ").append(requestMessage).append(" : ").append(byteVector).toString()));
    }

    public static final /* synthetic */ FreeC $anonfun$decodeReceived$6(int i, DecodeResult decodeResult) {
        return Stream$.MODULE$.emit(new ResponseMessage(i, (Response) decodeResult.value()));
    }

    public static final /* synthetic */ FreeC $anonfun$decodeReceived$4(int i, ByteVector byteVector, Map map) {
        FreeC fs2$Stream$$free;
        Some some = map.get(BoxesRunTime.boxToInteger(i));
        if (None$.MODULE$.equals(some)) {
            fs2$Stream$$free = Stream$.MODULE$.raiseError(new Throwable(new StringBuilder(71).append("Received message correlationId for message that does not exists: ").append(i).append(" : ").append(byteVector).append(" : ").append(map).toString()));
        } else {
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            RequestMessage requestMessage = (RequestMessage) some.value();
            fs2$Stream$$free = ((Stream) MessageCodec$.MODULE$.responseCodecFor(requestMessage.version(), ApiKey$.MODULE$.forRequest(requestMessage.request())).decode(byteVector.drop(4L).bits()).fold(err -> {
                return new Stream($anonfun$decodeReceived$5(requestMessage, byteVector, err));
            }, decodeResult -> {
                return new Stream($anonfun$decodeReceived$6(i, decodeResult));
            })).fs2$Stream$$free();
        }
        return fs2$Stream$$free;
    }

    public static final /* synthetic */ FreeC $anonfun$decodeReceived$2(Ref ref, ByteVector byteVector) {
        if (byteVector.size() < 4) {
            return Stream$.MODULE$.raiseError(new Throwable(new StringBuilder(53).append("Message chunk does not have correlation id included: ").append(byteVector).toString()));
        }
        ByteVector take = byteVector.take(4L);
        int i = take.toInt(take.toInt$default$1(), take.toInt$default$2());
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(ref.modify(map -> {
            return new Tuple2(map.$minus(BoxesRunTime.boxToInteger(i)), map);
        })), map2 -> {
            return new Stream($anonfun$decodeReceived$4(i, byteVector, map2));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$decodeReceived$1(Ref ref, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(freeC, byteVector -> {
            return new Stream($anonfun$decodeReceived$2(ref, byteVector));
        });
    }

    public BrokerConnection$impl$() {
        MODULE$ = this;
    }
}
