package nl.vroste.zio.kinesis.client.zionative.fetcher;

import io.github.vigoo.zioaws.kinesis.model.package;
import io.github.vigoo.zioaws.kinesis.model.package$ShardIteratorType$AFTER_SEQUENCE_NUMBER$;
import io.github.vigoo.zioaws.kinesis.model.package$StartingPosition$;
import io.github.vigoo.zioaws.kinesis.package;
import nl.vroste.zio.kinesis.client.Util$;
import nl.vroste.zio.kinesis.client.zionative.Consumer$;
import nl.vroste.zio.kinesis.client.zionative.DiagnosticEvent;
import nl.vroste.zio.kinesis.client.zionative.FetchMode;
import nl.vroste.zio.kinesis.client.zionative.Fetcher;
import nl.vroste.zio.kinesis.client.zionative.Fetcher$;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Has;
import zio.NeedsEnv$;
import zio.Ref$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZManaged;
import zio.clock.package;
import zio.duration.package$;
import zio.stream.ZStream$;

/* compiled from: EnhancedFanOutFetcher.scala */
/* loaded from: input_file:nl/vroste/zio/kinesis/client/zionative/fetcher/EnhancedFanOutFetcher$.class */
public final class EnhancedFanOutFetcher$ {
    public static final EnhancedFanOutFetcher$ MODULE$ = new EnhancedFanOutFetcher$();

    public ZManaged<Has<package.Clock.Service>, Throwable, Fetcher> make(package.StreamDescription.ReadOnly readOnly, String str, FetchMode.EnhancedFanOut enhancedFanOut, Function1<DiagnosticEvent, ZIO<Object, Nothing$, BoxedUnit>> function1) {
        return ZIO$.MODULE$.environment().toManaged_().flatMap(has -> {
            return MODULE$.registerConsumerIfNotExists(readOnly.streamARNValue(), str).toManaged_().flatMap(str2 -> {
                return Util$.MODULE$.throttledFunctionN(enhancedFanOut.maxSubscriptionsPerSecond(), package$.MODULE$.durationInt(1).second()).apply((startingPosition, str2) -> {
                    return ZIO$.MODULE$.succeed(() -> {
                        return io.github.vigoo.zioaws.kinesis.package$.MODULE$.subscribeToShard(new package.SubscribeToShardRequest(str2, str2, startingPosition)).mapError(awsError -> {
                            return awsError.toThrowable();
                        });
                    });
                }).map(function2 -> {
                    return Fetcher$.MODULE$.apply((str3, startingPosition2) -> {
                        return ZStream$.MODULE$.unwrap(Ref$.MODULE$.make(new Some(startingPosition2)).map(zRef -> {
                            return FetchUtil$.MODULE$.repeatWhileNotNone(zRef, startingPosition2 -> {
                                return ZStream$.MODULE$.unwrap((ZIO) function2.apply(startingPosition2, str3)).tap(readOnly2 -> {
                                    return zRef.set(Option$.MODULE$.apply(readOnly2.continuationSequenceNumberValue()).map(str3 -> {
                                        return new package.StartingPosition(package$ShardIteratorType$AFTER_SEQUENCE_NUMBER$.MODULE$, new Some(str3), package$StartingPosition$.MODULE$.apply$default$3());
                                    }));
                                }).tap(readOnly3 -> {
                                    return (ZIO) function1.apply(new DiagnosticEvent.SubscribeToShardEvent(str3, readOnly3.recordsValue().size(), package$.MODULE$.durationLong(readOnly3.millisBehindLatestValue()).millis()));
                                }).catchSome(new EnhancedFanOutFetcher$$anonfun$$nestedInanonfun$make$9$1(str3)).retry(enhancedFanOut.retrySchedule());
                            }).mapError(th -> {
                                return scala.package$.MODULE$.Left().apply(th);
                            }).flatMap(readOnly2 -> {
                                return ((IterableOnceOps) readOnly2.childShardsValue().toList().flatten(Predef$.MODULE$.$conforms())).nonEmpty() ? ZStream$.MODULE$.succeed(() -> {
                                    return readOnly2;
                                }).$plus$plus(() -> {
                                    return ZStream$.MODULE$.fail(() -> {
                                        return scala.package$.MODULE$.Right().apply(new Fetcher.EndOfShard(((List) readOnly2.childShardsValue().toList().flatten(Predef$.MODULE$.$conforms())).map(readOnly2 -> {
                                            return Consumer$.MODULE$.childShardToShard(readOnly2);
                                        })));
                                    });
                                }) : ZStream$.MODULE$.succeed(() -> {
                                    return readOnly2;
                                });
                            }).mapConcat(readOnly3 -> {
                                return readOnly3.recordsValue();
                            });
                        })).provide(has, NeedsEnv$.MODULE$.needsEnv());
                    });
                });
            });
        });
    }

    private ZIO<Has<package.Kinesis.Service>, Throwable, String> registerConsumerIfNotExists(String str, String str2) {
        return io.github.vigoo.zioaws.kinesis.package$.MODULE$.registerStreamConsumer(new package.RegisterStreamConsumerRequest(str, str2)).mapError(awsError -> {
            return awsError.toThrowable();
        }, CanFail$.MODULE$.canFail()).map(readOnly -> {
            return readOnly.consumerValue().consumerARNValue();
        }).catchSome(new EnhancedFanOutFetcher$$anonfun$registerConsumerIfNotExists$3(str, str2), CanFail$.MODULE$.canFail());
    }

    private EnhancedFanOutFetcher$() {
    }
}
