package zio.kafka.consumer.internal;

import org.apache.kafka.common.TopicPartition;
import scala.$less$colon$less$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ScalaRunTime$;
import zio.LogAnnotation;
import zio.Promise$;
import zio.Queue;
import zio.Queue$;
import zio.Ref$;
import zio.ZIO;
import zio.ZIO$;
import zio.kafka.consumer.diagnostics.DiagnosticEvent;
import zio.kafka.consumer.diagnostics.Diagnostics;
import zio.kafka.consumer.internal.RunloopCommand;
import zio.stream.ZStream$;

/* compiled from: PartitionStreamControl.scala */
/* loaded from: input_file:zio/kafka/consumer/internal/PartitionStreamControl$.class */
public final class PartitionStreamControl$ {
    public static final PartitionStreamControl$ MODULE$ = new PartitionStreamControl$();

    public ZIO<Object, Nothing$, PartitionStreamControl> newPartitionStream(TopicPartition topicPartition, Queue<RunloopCommand> queue, Diagnostics diagnostics) {
        return ZIO$.MODULE$.logDebug(() -> {
            return new StringBuilder(26).append("Creating partition stream ").append(topicPartition.toString()).toString();
        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:60)").flatMap(boxedUnit -> {
            return Promise$.MODULE$.make("zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:61)").flatMap(promise -> {
                return Promise$.MODULE$.make("zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:62)").flatMap(promise -> {
                    return Queue$.MODULE$.unbounded("zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:63)").flatMap(queue2 -> {
                        return Ref$.MODULE$.make(() -> {
                            return 0;
                        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:64)").map(ref -> {
                            ZIO flatMap = queue.offer(new RunloopCommand.Request(topicPartition), "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.requestAndAwaitData(PartitionStreamControl.scala:67)").flatMap(obj -> {
                                return $anonfun$newPartitionStream$8(diagnostics, topicPartition, queue2, BoxesRunTime.unboxToBoolean(obj));
                            }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.requestAndAwaitData(PartitionStreamControl.scala:67)");
                            return new PartitionStreamControl(topicPartition, ZStream$.MODULE$.logAnnotate(() -> {
                                return new LogAnnotation("topic", topicPartition.topic());
                            }, ScalaRunTime$.MODULE$.wrapRefArray(new LogAnnotation[]{new LogAnnotation("partition", Integer.toString(topicPartition.partition()))}), "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:72)").$times$greater(() -> {
                                return ZStream$.MODULE$.finalizer(() -> {
                                    return promise.succeed(BoxedUnit.UNIT, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:77)").$less$times(() -> {
                                        return ZIO$.MODULE$.logDebug(() -> {
                                            return new StringBuilder(27).append("Partition stream ").append(topicPartition.toString()).append(" has ended").toString();
                                        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:78)");
                                    }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:77)");
                                }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:76)");
                            }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:75)").$times$greater(() -> {
                                return ZStream$.MODULE$.repeatZIOChunk(() -> {
                                    return queue2.takeAll("zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:83)").flatMap(chunk -> {
                                        return chunk.isEmpty() ? flatMap : ZIO$.MODULE$.succeed(() -> {
                                            return chunk;
                                        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:83)");
                                    }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:83)");
                                }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:80)").flattenTake($less$colon$less$.MODULE$.refl(), "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:84)").chunksWith(zStream -> {
                                    return zStream.tap(chunk -> {
                                        return ref.update(i -> {
                                            return i - chunk.size();
                                        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:85)");
                                    }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:85)");
                                }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:85)").interruptWhen(promise, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:86)");
                            }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.stream(PartitionStreamControl.scala:79)"), queue2, promise, promise, ref);
                        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:64)");
                    }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:63)");
                }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:62)");
            }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:61)");
        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream(PartitionStreamControl.scala:60)");
    }

    public static final /* synthetic */ ZIO $anonfun$newPartitionStream$8(Diagnostics diagnostics, TopicPartition topicPartition, Queue queue, boolean z) {
        return diagnostics.emit(() -> {
            return new DiagnosticEvent.Request(topicPartition);
        }).flatMap(boxedUnit -> {
            return queue.takeBetween(1, Integer.MAX_VALUE, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.requestAndAwaitData(PartitionStreamControl.scala:69)");
        }, "zio.kafka.consumer.internal.PartitionStreamControl.newPartitionStream.requestAndAwaitData(PartitionStreamControl.scala:68)");
    }

    private PartitionStreamControl$() {
    }
}
