package nl.vroste.zio.kinesis.client.zionative.fetcher;

import io.github.vigoo.zioaws.kinesis.model.package;
import io.github.vigoo.zioaws.kinesis.model.package$GetShardIteratorRequest$;
import java.time.Duration;
import nl.vroste.zio.kinesis.client.Util$;
import nl.vroste.zio.kinesis.client.zionative.Consumer$;
import nl.vroste.zio.kinesis.client.zionative.DiagnosticEvent;
import nl.vroste.zio.kinesis.client.zionative.FetchMode;
import nl.vroste.zio.kinesis.client.zionative.Fetcher;
import nl.vroste.zio.kinesis.client.zionative.Fetcher$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.UninitializedFieldError;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import zio.CanFail$;
import zio.Has;
import zio.NeedsEnv$;
import zio.Ref$;
import zio.Schedule$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZManaged;
import zio.clock.package;
import zio.duration.package$;
import zio.logging.log$;
import zio.stream.ZStream;
import zio.stream.ZStream$;

/* compiled from: PollingFetcher.scala */
/* loaded from: input_file:nl/vroste/zio/kinesis/client/zionative/fetcher/PollingFetcher$.class */
public final class PollingFetcher$ {
    public static final PollingFetcher$ MODULE$ = new PollingFetcher$();
    private static final int getShardIteratorRateLimit = 5;
    private static final int getRecordsRateLimit = 5;
    private static volatile byte bitmap$init$0;

    static {
        bitmap$init$0 = (byte) (bitmap$init$0 | 1);
        bitmap$init$0 = (byte) (bitmap$init$0 | 2);
    }

    public ZManaged<Has<package.Clock.Service>, Throwable, Fetcher> make(String str, FetchMode.Polling polling, Function1<DiagnosticEvent, ZIO<Object, Nothing$, BoxedUnit>> function1) {
        return ZIO$.MODULE$.environment().toManaged_().flatMap(has -> {
            return Util$.MODULE$.throttledFunction(MODULE$.getShardIteratorRateLimit(), package$.MODULE$.durationInt(1).second(), getShardIteratorRequest -> {
                return io.github.vigoo.zioaws.kinesis.package$.MODULE$.getShardIterator(getShardIteratorRequest);
            }).map(function12 -> {
                return Fetcher$.MODULE$.apply((str2, startingPosition) -> {
                    return ZStream$.MODULE$.unwrapManaged(log$.MODULE$.info(() -> {
                        return new StringBuilder(58).append("Creating PollingFetcher for shard ").append(str2).append(" with starting position ").append(startingPosition).toString();
                    }).toManaged_().flatMap(boxedUnit -> {
                        return ((ZIO) function12.apply(new package.GetShardIteratorRequest(str, str2, startingPosition.type(), startingPosition.sequenceNumber(), package$GetShardIteratorRequest$.MODULE$.apply$default$5()))).map(readOnly -> {
                            return (String) readOnly.shardIteratorValue().get();
                        }).mapError(awsError -> {
                            return awsError.toThrowable();
                        }, CanFail$.MODULE$.canFail()).retry(Consumer$.MODULE$.retryOnThrottledWithSchedule(polling.throttlingBackoff()), CanFail$.MODULE$.canFail()).mapError(th -> {
                            return scala.package$.MODULE$.Left().apply(th);
                        }, CanFail$.MODULE$.canFail()).toManaged_().flatMap(str2 -> {
                            return Util$.MODULE$.throttledFunction(MODULE$.getRecordsRateLimit(), package$.MODULE$.durationInt(1).second(), getRecordsRequest -> {
                                return io.github.vigoo.zioaws.kinesis.package$.MODULE$.getRecords(getRecordsRequest);
                            }).flatMap(function12 -> {
                                return Ref$.MODULE$.make(new Some(str2)).toManaged_().map(zRef -> {
                                    ZIO flatMap = zRef.get().flatMap(option -> {
                                        return ZIO$.MODULE$.fromOption(() -> {
                                            return option;
                                        }).flatMap(str2 -> {
                                            return ((ZIO) function12.apply(new package.GetRecordsRequest(str2, new Some(BoxesRunTime.boxToInteger(polling.batchSize()))))).mapError(awsError2 -> {
                                                return awsError2.toThrowable();
                                            }, CanFail$.MODULE$.canFail()).tapError(th2 -> {
                                                return log$.MODULE$.warn(() -> {
                                                    return new StringBuilder(29).append("Error GetRecords for shard ").append(str2).append(": ").append(th2).toString();
                                                });
                                            }, CanFail$.MODULE$.canFail()).retry(Consumer$.MODULE$.retryOnThrottledWithSchedule(polling.throttlingBackoff()), CanFail$.MODULE$.canFail()).retry(Schedule$.MODULE$.fixed(package$.MODULE$.durationInt(100).millis()).$amp$amp(Schedule$.MODULE$.recurs(3)), CanFail$.MODULE$.canFail()).asSomeError().timed().map(tuple2 -> {
                                                if (tuple2 == null) {
                                                    throw new MatchError(tuple2);
                                                }
                                                Tuple3 tuple3 = new Tuple3(tuple2, (Duration) tuple2._1(), (package.GetRecordsResponse.ReadOnly) tuple2._2());
                                                Tuple2 tuple2 = (Tuple2) tuple3._1();
                                                return new Tuple2(tuple2, tuple2);
                                            }).flatMap(tuple22 -> {
                                                Tuple2 tuple22;
                                                if (tuple22 == null || (tuple22 = (Tuple2) tuple22._2()) == null) {
                                                    throw new MatchError(tuple22);
                                                }
                                                Duration duration = (Duration) tuple22._1();
                                                package.GetRecordsResponse.ReadOnly readOnly2 = (package.GetRecordsResponse.ReadOnly) tuple22._2();
                                                return zRef.set(readOnly2.nextShardIteratorValue()).flatMap(boxedUnit -> {
                                                    return readOnly2.millisBehindLatest().mapError(awsError3 -> {
                                                        return new Some(awsError3.toThrowable());
                                                    }, CanFail$.MODULE$.canFail()).flatMap(obj -> {
                                                        return $anonfun$make$24(function1, str2, readOnly2, duration, BoxesRunTime.unboxToLong(obj));
                                                    });
                                                });
                                            });
                                        });
                                    });
                                    return new Tuple3(zRef, flatMap, ZStream$.MODULE$.repeatEffectWith(flatMap, polling.pollSchedule()).catchAll(option2 -> {
                                        ZStream $times$greater;
                                        if (None$.MODULE$.equals(option2)) {
                                            $times$greater = ZStream$.MODULE$.empty();
                                        } else {
                                            if (!(option2 instanceof Some)) {
                                                throw new MatchError(option2);
                                            }
                                            Throwable th2 = (Throwable) ((Some) option2).value();
                                            $times$greater = ZStream$.MODULE$.fromEffect(log$.MODULE$.warn(() -> {
                                                return new StringBuilder(36).append("Error in PollingFetcher for shard ").append(str2).append(": ").append(th2).toString();
                                            })).$times$greater(ZStream$.MODULE$.fail(() -> {
                                                return th2;
                                            }));
                                        }
                                        return $times$greater;
                                    }, CanFail$.MODULE$.canFail()).retry(polling.throttlingBackoff()).buffer(polling.bufferNrBatches()).mapError(th2 -> {
                                        return scala.package$.MODULE$.Left().apply(th2);
                                    }).flatMap(readOnly2 -> {
                                        return (((IterableOnceOps) readOnly2.childShardsValue().toList().flatten(Predef$.MODULE$.$conforms())).nonEmpty() && readOnly2.nextShardIteratorValue().isEmpty()) ? ZStream$.MODULE$.succeed(() -> {
                                            return readOnly2;
                                        }).$plus$plus(() -> {
                                            return ZStream$.MODULE$.fromEffect(log$.MODULE$.debug(() -> {
                                                return new StringBuilder(38).append("PollingFetcher found end of shard for ").append(str2).toString();
                                            })).$times$greater(ZStream$.MODULE$.fail(() -> {
                                                return scala.package$.MODULE$.Right().apply(new Fetcher.EndOfShard(((List) readOnly2.childShardsValue().toList().flatten(Predef$.MODULE$.$conforms())).map(readOnly2 -> {
                                                    return Consumer$.MODULE$.childShardToShard(readOnly2);
                                                })));
                                            }));
                                        }) : ZStream$.MODULE$.succeed(() -> {
                                            return readOnly2;
                                        });
                                    }).mapConcat(readOnly3 -> {
                                        return readOnly3.recordsValue();
                                    }));
                                }).map(tuple3 -> {
                                    if (tuple3 != null) {
                                        return (ZStream) tuple3._3();
                                    }
                                    throw new MatchError(tuple3);
                                });
                            });
                        });
                    })).ensuring(log$.MODULE$.debug(() -> {
                        return new StringBuilder(32).append("PollingFetcher for shard ").append(str2).append(" closed").toString();
                    })).provide(has, NeedsEnv$.MODULE$.needsEnv());
                });
            });
        });
    }

    private int getShardIteratorRateLimit() {
        if (((byte) (bitmap$init$0 & 1)) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /root/project/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/fetcher/PollingFetcher.scala: 126");
        }
        int i = getShardIteratorRateLimit;
        return getShardIteratorRateLimit;
    }

    private int getRecordsRateLimit() {
        if (((byte) (bitmap$init$0 & 2)) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /root/project/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/fetcher/PollingFetcher.scala: 127");
        }
        int i = getRecordsRateLimit;
        return getRecordsRateLimit;
    }

    public static final /* synthetic */ ZIO $anonfun$make$24(Function1 function1, String str, package.GetRecordsResponse.ReadOnly readOnly, Duration duration, long j) {
        return ((ZIO) function1.apply(new DiagnosticEvent.PollComplete(str, readOnly.recordsValue().size(), package$.MODULE$.durationLong(j).millis(), duration))).map(boxedUnit -> {
            return readOnly;
        });
    }

    private PollingFetcher$() {
    }
}
