package com.snowplowanalytics.snowplow.sources.internal;

import cats.Monad;
import cats.effect.kernel.Async;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Ref$;
import cats.effect.kernel.Ref$ApplyBuilders$;
import cats.effect.kernel.Ref$Make$;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$ExitCase$Canceled$;
import cats.effect.kernel.Resource$ExitCase$Succeeded$;
import cats.effect.kernel.Sync;
import cats.effect.kernel.Unique;
import cats.effect.kernel.Unique$;
import cats.effect.package$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.implicits$;
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig;
import com.snowplowanalytics.snowplow.sources.EventProcessingConfig$NoWindowing$;
import com.snowplowanalytics.snowplow.sources.SourceAndAck;
import com.snowplowanalytics.snowplow.sources.SourceAndAck$Disconnected$;
import com.snowplowanalytics.snowplow.sources.SourceAndAck$Healthy$;
import com.snowplowanalytics.snowplow.sources.TokenedEvents;
import com.snowplowanalytics.snowplow.sources.internal.EagerWindows;
import fs2.Chunk;
import fs2.Chunk$;
import fs2.Pull;
import fs2.Pull$;
import fs2.Pull$StreamPullOps$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$InvariantOps$;
import fs2.Stream$NestedStreamOps$;
import fs2.Stream$ToPull$;
import fs2.compat.NotGiven$;
import java.nio.ByteBuffer;
import java.time.Instant;
import org.typelevel.log4cats.Logger$;
import org.typelevel.log4cats.SelfAwareStructuredLogger;
import org.typelevel.log4cats.slf4j.Slf4jLogger$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.Map;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/* compiled from: LowLevelSource.scala */
/* loaded from: input_file:com/snowplowanalytics/snowplow/sources/internal/LowLevelSource$.class */
public final class LowLevelSource$ {
    public static final LowLevelSource$ MODULE$ = new LowLevelSource$();

    private <F> SelfAwareStructuredLogger<F> logger(Sync<F> sync) {
        return Slf4jLogger$.MODULE$.getLogger(sync, "com.snowplowanalytics.snowplow.sources.internal.LowLevelSource");
    }

    public <F, C> F toSourceAndAck(LowLevelSource<F, C> lowLevelSource, Async<F> async) {
        return (F) implicits$.MODULE$.toFlatMapOps(Ref$ApplyBuilders$.MODULE$.of$extension(Ref$.MODULE$.apply(Ref$Make$.MODULE$.concurrentInstance(async)), Option$.MODULE$.empty()), async).flatMap(ref -> {
            return implicits$.MODULE$.toFunctorOps(Ref$ApplyBuilders$.MODULE$.of$extension(Ref$.MODULE$.apply(Ref$Make$.MODULE$.concurrentInstance(async)), BoxesRunTime.boxToBoolean(false)), async).map(ref -> {
                return MODULE$.sourceAndAckImpl(lowLevelSource, ref, ref, async);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <F, C> SourceAndAck<F> sourceAndAckImpl(final LowLevelSource<F, C> lowLevelSource, final Ref<F, Option<FiniteDuration>> ref, final Ref<F, Object> ref2, final Async<F> async) {
        return new SourceAndAck<F>(async, lowLevelSource, ref2, ref) { // from class: com.snowplowanalytics.snowplow.sources.internal.LowLevelSource$$anon$1
            private final Async evidence$3$1;
            private final LowLevelSource source$2;
            private final Ref isConnectedRef$1;
            private final Ref latencyRef$2;

            @Override // com.snowplowanalytics.snowplow.sources.SourceAndAck
            public Stream<F, Nothing$> stream(EventProcessingConfig eventProcessingConfig, Function1<Stream<F, TokenedEvents>, Stream<F, Unique.Token>> function1) {
                return Stream$.MODULE$.bracket(Ref$ApplyBuilders$.MODULE$.of$extension(Ref$.MODULE$.apply(Ref$Make$.MODULE$.concurrentInstance(this.evidence$3$1)), Predef$.MODULE$.Map().empty()), ref3 -> {
                    return LowLevelSource$.MODULE$.com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$nackUnhandled(this.source$2.checkpointer(), ref3, this.evidence$3$1);
                }).flatMap(ref4 -> {
                    return this.source$2.stream().flatMap(stream -> {
                        return Stream$.MODULE$.bracket(this.isConnectedRef$1.set(BoxesRunTime.boxToBoolean(true)), boxedUnit -> {
                            return this.isConnectedRef$1.set(BoxesRunTime.boxToBoolean(false));
                        }).map(boxedUnit2 -> {
                            return Stream$NestedStreamOps$.MODULE$.parJoin$extension(Stream$.MODULE$.NestedStreamOps(stream.through(LowLevelSource$.MODULE$.com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$monitorLatency(this.latencyRef$2, this.evidence$3$1)).through(LowLevelSource$.MODULE$.com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$tokened(ref4, this.evidence$3$1)).through(LowLevelSource$.MODULE$.com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$windowed(eventProcessingConfig.windowing(), this.evidence$3$1)).zip(EagerWindows$.MODULE$.pipes(control -> {
                                return CleanCancellation$.MODULE$.apply(LowLevelSource$.MODULE$.com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$messageSink(function1, ref4, this.source$2.checkpointer(), control, this.evidence$3$1), this.evidence$3$1);
                            }, this.evidence$3$1)).map(tuple2 -> {
                                if (tuple2 == null) {
                                    throw new MatchError(tuple2);
                                }
                                return (Stream) ((Function1) tuple2._2()).apply((Stream) tuple2._1());
                            })), 2, this.evidence$3$1);
                        });
                    }, NotGiven$.MODULE$.default());
                }, NotGiven$.MODULE$.default()).flatten($less$colon$less$.MODULE$.refl());
            }

            @Override // com.snowplowanalytics.snowplow.sources.SourceAndAck
            public F isHealthy(FiniteDuration finiteDuration) {
                return (F) implicits$.MODULE$.catsSyntaxTuple3Semigroupal(new Tuple3(this.isConnectedRef$1.get(), this.latencyRef$2.get(), package$.MODULE$.Sync().apply(this.evidence$3$1).realTime())).mapN((obj, option, finiteDuration2) -> {
                    return $anonfun$isHealthy$1(finiteDuration, BoxesRunTime.unboxToBoolean(obj), option, finiteDuration2);
                }, this.evidence$3$1, this.evidence$3$1);
            }

            public static final /* synthetic */ SourceAndAck.HealthStatus $anonfun$isHealthy$1(FiniteDuration finiteDuration, boolean z, Option option, FiniteDuration finiteDuration2) {
                Tuple3 tuple3 = new Tuple3(BoxesRunTime.boxToBoolean(z), option, finiteDuration2);
                if (tuple3 != null && false == BoxesRunTime.unboxToBoolean(tuple3._1())) {
                    return SourceAndAck$Disconnected$.MODULE$;
                }
                if (tuple3 != null) {
                    Some some = (Option) tuple3._2();
                    FiniteDuration finiteDuration3 = (FiniteDuration) tuple3._3();
                    if (some instanceof Some) {
                        FiniteDuration finiteDuration4 = (FiniteDuration) some.value();
                        if (finiteDuration3.$minus(finiteDuration4).$greater(finiteDuration)) {
                            return new SourceAndAck.LaggingEventProcessor(finiteDuration3.$minus(finiteDuration4));
                        }
                    }
                }
                return SourceAndAck$Healthy$.MODULE$;
            }

            {
                this.evidence$3$1 = async;
                this.source$2 = lowLevelSource;
                this.isConnectedRef$1 = ref2;
                this.latencyRef$2 = ref;
            }
        };
    }

    public <F, C> F com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$nackUnhandled(Checkpointer<F, C> checkpointer, Ref<F, Map<Unique.Token, C>> ref, Monad<F> monad) {
        return (F) implicits$.MODULE$.toFlatMapOps(ref.get(), monad).flatMap(map -> {
            return checkpointer.nack(checkpointer.combineAll(map.values()));
        });
    }

    public <F, C> Function1<Stream<F, LowLevelEvents<C>>, Stream<F, TokenedEvents>> com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$tokened(Ref<F, Map<Unique.Token, C>> ref, Sync<F> sync) {
        return stream -> {
            return stream.evalMap(lowLevelEvents -> {
                if (lowLevelEvents == null) {
                    throw new MatchError(lowLevelEvents);
                }
                Chunk<ByteBuffer> events = lowLevelEvents.events();
                Object ack = lowLevelEvents.ack();
                Option<Instant> earliestSourceTstamp = lowLevelEvents.earliestSourceTstamp();
                return implicits$.MODULE$.toFlatMapOps(Unique$.MODULE$.apply(sync).unique(), sync).flatMap(token -> {
                    return implicits$.MODULE$.toFunctorOps(ref.update(map -> {
                        return map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(token), ack));
                    }), sync).map(boxedUnit -> {
                        return new TokenedEvents(events, token, earliestSourceTstamp);
                    });
                });
            });
        };
    }

    public <F, A> Function1<Stream<F, A>, Stream<F, A>> com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$monitorLatency(Ref<F, Option<FiniteDuration>> ref, Sync<F> sync) {
        return stream -> {
            return Pull$StreamPullOps$.MODULE$.stream$extension(Pull$.MODULE$.StreamPullOps(go$1(stream, sync, ref)));
        };
    }

    public <F, C> Function1<Stream<F, TokenedEvents>, Stream<F, Nothing$>> com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$messageSink(Function1<Stream<F, TokenedEvents>, Stream<F, Unique.Token>> function1, Ref<F, Map<Unique.Token, C>> ref, Checkpointer<F, C> checkpointer, EagerWindows.Control<F> control, Async<F> async) {
        return stream -> {
            return stream.append(() -> {
                return Stream$.MODULE$.eval(control.waitForPreviousWindow()).drain();
            }).evalTap(tokenedEvents -> {
                if (tokenedEvents == null) {
                    throw new MatchError(tokenedEvents);
                }
                Chunk<ByteBuffer> events = tokenedEvents.events();
                return Logger$.MODULE$.apply(MODULE$.logger(async)).debug(() -> {
                    return new StringBuilder(48).append("Batch of ").append(events.size()).append(" events received from the source stream").toString();
                });
            }).through(function1).chunks().evalTap(chunk -> {
                return control.waitForPreviousWindow();
            }).prefetch(async).evalMap(chunk2 -> {
                return implicits$.MODULE$.toFlatMapOps(chunk2.traverse(token -> {
                    return implicits$.MODULE$.toFlatMapOps(ref.modify(map -> {
                        return new Tuple2(map.$minus(token), map.get(token));
                    }), async).flatMap(option -> {
                        if (option instanceof Some) {
                            return package$.MODULE$.Async().apply(async).pure(((Some) option).value());
                        }
                        if (None$.MODULE$.equals(option)) {
                            return package$.MODULE$.Async().apply(async).raiseError(new IllegalStateException("Missing checkpoint for token"));
                        }
                        throw new MatchError(option);
                    });
                }, async), async).flatMap(chunk2 -> {
                    return checkpointer.ack(checkpointer.combineAll(implicits$.MODULE$.toFoldableOps(chunk2, Chunk$.MODULE$.instance()).toIterable()));
                });
            }).drain().onFinalizeCase(exitCase -> {
                if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase)) {
                    return control.unblockNextWindow(EagerWindows$PreviousWindowSuccess$.MODULE$);
                }
                if (Resource$ExitCase$Canceled$.MODULE$.equals(exitCase) ? true : exitCase instanceof Resource.ExitCase.Errored) {
                    return control.unblockNextWindow(EagerWindows$PreviousWindowFailed$.MODULE$);
                }
                throw new MatchError(exitCase);
            }, async);
        };
    }

    public <F, A> Function1<Stream<F, A>, Stream<F, Stream<F, A>>> com$snowplowanalytics$snowplow$sources$internal$LowLevelSource$$windowed(EventProcessingConfig.Windowing windowing, Async<F> async) {
        if (EventProcessingConfig$NoWindowing$.MODULE$.equals(windowing)) {
            return stream -> {
                return Stream$.MODULE$.emit(stream);
            };
        }
        if (windowing instanceof EventProcessingConfig.TimedWindows) {
            return timedWindows((EventProcessingConfig.TimedWindows) windowing, async);
        }
        throw new MatchError(windowing);
    }

    private <F, A> Function1<Stream<F, A>, Stream<F, Stream<F, A>>> timedWindows(EventProcessingConfig.TimedWindows timedWindows, Async<F> async) {
        return stream -> {
            return Pull$StreamPullOps$.MODULE$.stream$extension(Pull$.MODULE$.StreamPullOps(Stream$ToPull$.MODULE$.timed$extension(Stream$InvariantOps$.MODULE$.pull$extension(Stream$.MODULE$.InvariantOps(stream)), timed -> {
                FiniteDuration timeoutForFirstWindow = MODULE$.timeoutForFirstWindow(timedWindows);
                return Pull$.MODULE$.eval(Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                    return new StringBuilder(56).append("Opening first window with randomly adjusted duration of ").append(timeoutForFirstWindow).toString();
                })).flatMap(boxedUnit -> {
                    return timed.timeout(timeoutForFirstWindow).flatMap(boxedUnit -> {
                        return go$2(timed, None$.MODULE$, async, timedWindows);
                    });
                });
            }, async))).prefetch(async);
        };
    }

    private FiniteDuration timeoutForFirstWindow(EventProcessingConfig.TimedWindows timedWindows) {
        return new package.DurationLong(scala.concurrent.duration.package$.MODULE$.DurationLong((long) (timedWindows.duration().toMillis() * timedWindows.firstWindowScaling()))).milliseconds();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Pull go$1(Stream stream, Sync sync, Ref ref) {
        return Stream$ToPull$.MODULE$.uncons1$extension(Stream$InvariantOps$.MODULE$.pull$extension(Stream$.MODULE$.InvariantOps(stream))).flatMap(option -> {
            Tuple2 tuple2;
            if (None$.MODULE$.equals(option)) {
                return Pull$.MODULE$.done();
            }
            if (!(option instanceof Some) || (tuple2 = (Tuple2) ((Some) option).value()) == null) {
                throw new MatchError(option);
            }
            Object _1 = tuple2._1();
            Stream stream2 = (Stream) tuple2._2();
            return Pull$.MODULE$.eval(package$.MODULE$.Sync().apply(sync).realTime()).flatMap(finiteDuration -> {
                return Pull$.MODULE$.eval(ref.set(new Some(finiteDuration))).flatMap(boxedUnit -> {
                    return Pull$.MODULE$.output1(_1).flatMap(boxedUnit -> {
                        return Pull$.MODULE$.eval(ref.set(None$.MODULE$)).flatMap(boxedUnit -> {
                            return go$1(stream2, sync, ref);
                        });
                    });
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Pull go$2(Pull.Timed timed, Option option, Async async, EventProcessingConfig.TimedWindows timedWindows) {
        return timed.uncons().flatMap(option2 -> {
            Tuple2 tuple2;
            boolean z = false;
            Some some = null;
            if (None$.MODULE$.equals(option2)) {
                if (None$.MODULE$.equals(option)) {
                    return Pull$.MODULE$.done();
                }
                if (option instanceof Some) {
                    return Pull$.MODULE$.eval(((Queue) ((Some) option).value()).offer(None$.MODULE$)).$greater$greater(() -> {
                        return Pull$.MODULE$.done();
                    });
                }
                throw new MatchError(option);
            }
            if (option2 instanceof Some) {
                z = true;
                some = (Some) option2;
                Tuple2 tuple22 = (Tuple2) some.value();
                if (tuple22 != null) {
                    Either either = (Either) tuple22._1();
                    Pull.Timed timed2 = (Pull.Timed) tuple22._2();
                    if (either instanceof Left) {
                        Pull $greater$greater = Pull$.MODULE$.eval(Logger$.MODULE$.apply(MODULE$.logger(async)).info(() -> {
                            return new StringBuilder(33).append("Opening new window with duration ").append(timedWindows.duration()).toString();
                        })).$greater$greater(() -> {
                            return timed2.timeout(timedWindows.duration());
                        });
                        if (None$.MODULE$.equals(option)) {
                            return $greater$greater.$greater$greater(() -> {
                                return go$2(timed2, None$.MODULE$, async, timedWindows);
                            });
                        }
                        if (!(option instanceof Some)) {
                            throw new MatchError(option);
                        }
                        Queue queue = (Queue) ((Some) option).value();
                        return $greater$greater.$greater$greater(() -> {
                            return Pull$.MODULE$.eval(queue.offer(None$.MODULE$));
                        }).$greater$greater(() -> {
                            return go$2(timed2, None$.MODULE$, async, timedWindows);
                        });
                    }
                }
            }
            if (z && (tuple2 = (Tuple2) some.value()) != null) {
                Right right = (Either) tuple2._1();
                Pull.Timed timed3 = (Pull.Timed) tuple2._2();
                if (right instanceof Right) {
                    Chunk chunk = (Chunk) right.value();
                    if (None$.MODULE$.equals(option)) {
                        return Pull$.MODULE$.eval(Queue$.MODULE$.synchronous(async)).flatMap(queue2 -> {
                            return Pull$.MODULE$.output1(Stream$.MODULE$.fromQueueNoneTerminated(queue2, Stream$.MODULE$.fromQueueNoneTerminated$default$2(), async)).flatMap(boxedUnit -> {
                                return Pull$.MODULE$.eval(chunk.traverse(obj -> {
                                    return queue2.offer(new Some(obj));
                                }, async)).flatMap(chunk2 -> {
                                    return go$2(timed3, new Some(queue2), async, timedWindows);
                                });
                            });
                        });
                    }
                    if (!(option instanceof Some)) {
                        throw new MatchError(option);
                    }
                    Queue queue3 = (Queue) ((Some) option).value();
                    return Pull$.MODULE$.eval(chunk.traverse(obj -> {
                        return queue3.offer(new Some(obj));
                    }, async)).$greater$greater(() -> {
                        return go$2(timed3, new Some(queue3), async, timedWindows);
                    });
                }
            }
            throw new MatchError(option2);
        });
    }

    private LowLevelSource$() {
    }
}
