package net.glorat.ledger;

import cakesolutions.kafka.KafkaProducer;
import cakesolutions.kafka.KafkaProducer$;
import cakesolutions.kafka.KafkaProducer$Conf$;
import cakesolutions.kafka.KafkaProducerRecord$;
import cakesolutions.kafka.KafkaProducerRecord$Destination$;
import cakesolutions.kafka.KafkaSerializer$;
import java.util.UUID;
import net.glorat.cqrs.AggregateRoot;
import net.glorat.cqrs.CommittedEvent;
import net.glorat.cqrs.DomainEvent;
import net.glorat.cqrs.Repository;
import org.slf4j.Logger;
import scala.Function1;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Seq$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

/* compiled from: KafkaLedger.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]d\u0001B\t\u0013\u0001eA\u0001B\u000b\u0001\u0003\u0002\u0003\u0006Ia\u000b\u0005\tm\u0001\u0011\t\u0011)A\u0005W!Aq\u0007\u0001B\u0001B\u0003%\u0001\b\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003P\u0011!1\u0006A!A!\u0002\u00179\u0006\"B/\u0001\t\u0003q\u0006b\u00024\u0001\u0005\u0004%\ta\u001a\u0005\u0007W\u0002\u0001\u000b\u0011\u00025\t\u000f1\u0004!\u0019!C\u0001[\"1\u0011\u000f\u0001Q\u0001\n9DQA\u001d\u0001\u0005BMDQA \u0001\u0005B}D\u0011\"!\f\u0001\u0005\u0004%I!a\f\t\u0011\u0005}\u0002\u0001)A\u0005\u0003cA!\"!\u0011\u0001\u0011\u000b\u0007I\u0011AA\"\u0011\u001d\t)\u0007\u0001C\u0001\u0003O\u00121bS1gW\u0006dU\rZ4fe*\u00111\u0003F\u0001\u0007Y\u0016$w-\u001a:\u000b\u0005U1\u0012AB4m_J\fGOC\u0001\u0018\u0003\rqW\r^\u0002\u0001'\u0011\u0001!\u0004\t\u0014\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\r\u0005s\u0017PU3g!\t\tC%D\u0001#\u0015\t\u0019C#\u0001\u0003dcJ\u001c\u0018BA\u0013#\u0005)\u0011V\r]8tSR|'/\u001f\t\u0003O!j\u0011AE\u0005\u0003SI\u0011q\u0001T8hO&tw-A\u0004tKJ4XM]:\u0011\u00051\u001adBA\u00172!\tqC$D\u00010\u0015\t\u0001\u0004$\u0001\u0004=e>|GOP\u0005\u0003eq\ta\u0001\u0015:fI\u00164\u0017B\u0001\u001b6\u0005\u0019\u0019FO]5oO*\u0011!\u0007H\u0001\u0006i>\u0004\u0018nY\u0001\u0011gR\u0014X-Y7U_J+g/[:j_:\u00042aG\u001d<\u0013\tQDD\u0001\u0004PaRLwN\u001c\t\u00057qr4*\u0003\u0002>9\tIa)\u001e8di&|g.\r\t\u0003\u007f!s!\u0001\u0011$\u000f\u0005\u0005+eB\u0001\"E\u001d\tq3)C\u0001\u0018\u0013\t)b#\u0003\u0002$)%\u0011qII\u0001\ba\u0006\u001c7.Y4f\u0013\tI%J\u0001\u0003H+&#%BA$#!\tYB*\u0003\u0002N9\t\u0019\u0011J\u001c;\u0002\u0011I,w-[:uef\u0004Ba\u0007\u001fQ'B\u0011\u0011%U\u0005\u0003%\n\u00121\u0002R8nC&tWI^3oiB\u0011\u0011\u0005V\u0005\u0003+\n\u0012Q\"Q4he\u0016<\u0017\r^3S_>$\u0018AA3d!\tA6,D\u0001Z\u0015\tQF$\u0001\u0006d_:\u001cWO\u001d:f]RL!\u0001X-\u0003!\u0015CXmY;uS>t7i\u001c8uKb$\u0018A\u0002\u001fj]&$h\bF\u0003`E\u000e$W\r\u0006\u0002aCB\u0011q\u0005\u0001\u0005\u0006-\u001a\u0001\u001da\u0016\u0005\u0006U\u0019\u0001\ra\u000b\u0005\u0006m\u0019\u0001\ra\u000b\u0005\u0006o\u0019\u0001\r\u0001\u000f\u0005\u0006\u001d\u001a\u0001\raT\u0001\u000bK:$\u0018\u000e^=WS\u0016<X#\u00015\u0011\u0005\u001dJ\u0017B\u00016\u0013\u0005))e\u000e^5usZKWm^\u0001\fK:$\u0018\u000e^=WS\u0016<\b%\u0001\u0005eSN\u0004\u0018\r^2i+\u0005q\u0007CA\u0014p\u0013\t\u0001(C\u0001\u000bLC\u001a\\\u0017-\u0012<f]R$\u0015n\u001d9bi\u000eDWM]\u0001\nI&\u001c\b/\u0019;dQ\u0002\nAa]1wKR\u0019AO\u001f?\u0011\u0007a+x/\u0003\u0002w3\n1a)\u001e;ve\u0016\u0004\"a\u0007=\n\u0005ed\"\u0001B+oSRDQa_\u0006A\u0002M\u000b\u0011\"Y4he\u0016<\u0017\r^3\t\u000bu\\\u0001\u0019A&\u0002\u001f\u0015D\b/Z2uK\u00124VM]:j_:\fqaZ3u\u0005fLE-\u0006\u0003\u0002\u0002\u0005%ACBA\u0002\u0003K\tI\u0003\u0006\u0003\u0002\u0006\u0005U\u0001\u0003BA\u0004\u0003\u0013a\u0001\u0001B\u0004\u0002\f1\u0011\r!!\u0004\u0003\u0003Q\u000b2!a\u0004T!\rY\u0012\u0011C\u0005\u0004\u0003'a\"a\u0002(pi\"Lgn\u001a\u0005\n\u0003/a\u0011\u0011!a\u0002\u00033\t!\"\u001a<jI\u0016t7-\u001a\u00132!\u0019\tY\"!\t\u0002\u00065\u0011\u0011Q\u0004\u0006\u0004\u0003?a\u0012a\u0002:fM2,7\r^\u0005\u0005\u0003G\tiB\u0001\u0005DY\u0006\u001c8\u000fV1h\u0011\u0019\t9\u0003\u0004a\u0001}\u0005\u0011\u0011\u000e\u001a\u0005\b\u0003Wa\u0001\u0019AA\u0003\u0003\u0011!X\u000e\u001d7\u0002!M$(/\u001b8h'\u0016\u0014\u0018.\u00197ju\u0016\u0014XCAA\u0019!\u0015YBhKA\u001a!\u0015Y\u0012QGA\u001d\u0013\r\t9\u0004\b\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u00047\u0005m\u0012bAA\u001f9\t!!)\u001f;f\u0003E\u0019HO]5oON+'/[1mSj,'\u000fI\u0001\f_:,\u0007K]8ek\u000e,'/\u0006\u0002\u0002FA9\u0011qIA)W\u0005USBAA%\u0015\u0011\tY%!\u0014\u0002\u000b-\fgm[1\u000b\u0005\u0005=\u0013!D2bW\u0016\u001cx\u000e\\;uS>t7/\u0003\u0003\u0002T\u0005%#!D&bM.\f\u0007K]8ek\u000e,'\u000f\u0005\u0003\u0002X\u0005\u0005TBAA-\u0015\u0011\tY&!\u0018\u0002\t1\fgn\u001a\u0006\u0003\u0003?\nAA[1wC&!\u00111MA-\u0005\u0019y%M[3di\u0006q\u0001O]8ek\u000e,'oQ8oM&<WCAA5!\u001d\tY'!\u001d,\u0003+rA!a\u0012\u0002n%!\u0011qNA%\u00035Y\u0015MZ6b!J|G-^2fe&!\u00111OA;\u0005\u0011\u0019uN\u001c4\u000b\t\u0005=\u0014\u0011\n")
/* loaded from: input_file:net/glorat/ledger/KafkaLedger.class */
public class KafkaLedger implements Repository, Logging {
    private KafkaProducer<String, Object> oneProducer;
    private final String servers;
    private final String topic;
    private final Option<Function1<UUID, Object>> streamToRevision;
    private final ExecutionContext ec;
    private final EntityView entityView;
    private final KafkaEventDispatcher dispatch;
    private final Function1<String, byte[]> stringSerializer;
    private Logger log;
    private volatile byte bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [net.glorat.ledger.KafkaLedger] */
    private Logger log$lzycompute() {
        Logger log;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                log = log();
                this.log = log;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.log;
    }

    @Override // net.glorat.ledger.Logging
    public Logger log() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? log$lzycompute() : this.log;
    }

    public EntityView entityView() {
        return this.entityView;
    }

    public KafkaEventDispatcher dispatch() {
        return this.dispatch;
    }

    @Override // net.glorat.cqrs.Repository
    public Future<BoxedUnit> save(AggregateRoot aggregateRoot, int i) {
        int unboxToInt;
        if (this.streamToRevision.isDefined() && i < (unboxToInt = BoxesRunTime.unboxToInt(((Function1) this.streamToRevision.get()).apply(aggregateRoot.id())))) {
            throw new ConcurrencyException(new StringBuilder(58).append("Trying to save aggregate from version ").append(i).append(" when ").append(unboxToInt).append(" already in DB").toString());
        }
        Iterable<DomainEvent> uncommittedChanges = aggregateRoot.getUncommittedChanges();
        IntRef create = IntRef.create(i);
        Iterable iterable = (Iterable) uncommittedChanges.map(domainEvent -> {
            create.elem++;
            return new CommittedEvent(domainEvent, aggregateRoot.id(), create.elem);
        }, Iterable$.MODULE$.canBuildFrom());
        log().debug(new StringBuilder(30).append("Persisting ").append(iterable.size()).append(" events to Kafka...").toString());
        Future$.MODULE$.sequence((Iterable) iterable.map(committedEvent -> {
            return this.oneProducer().send(KafkaProducerRecord$.MODULE$.apply(KafkaProducerRecord$Destination$.MODULE$.apply(this.topic, 0), new Some("key"), committedEvent, KafkaProducerRecord$.MODULE$.apply$default$4()));
        }, Iterable$.MODULE$.canBuildFrom()), Iterable$.MODULE$.canBuildFrom(), this.ec).map(iterable2 -> {
            $anonfun$save$3(iterable2);
            return BoxedUnit.UNIT;
        }, this.ec);
        oneProducer().flush();
        return dispatch().pollEventStream();
    }

    @Override // net.glorat.cqrs.Repository
    public <T extends AggregateRoot> T getById(UUID uuid, T t, ClassTag<T> classTag) {
        return (T) entityView().getById(uuid, t, classTag);
    }

    private Function1<String, byte[]> stringSerializer() {
        return this.stringSerializer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [net.glorat.ledger.KafkaLedger] */
    private KafkaProducer<String, Object> oneProducer$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.oneProducer = KafkaProducer$.MODULE$.apply(producerConfig());
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.oneProducer;
    }

    public KafkaProducer<String, Object> oneProducer() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? oneProducer$lzycompute() : this.oneProducer;
    }

    public KafkaProducer.Conf<String, Object> producerConfig() {
        return KafkaProducer$Conf$.MODULE$.apply(KafkaSerializer$.MODULE$.apply(stringSerializer()), KafkaSerializer$.MODULE$.apply(BinarySerializer$.MODULE$.serializer()), this.servers, KafkaProducer$Conf$.MODULE$.apply$default$4(), KafkaProducer$Conf$.MODULE$.apply$default$5(), KafkaProducer$Conf$.MODULE$.apply$default$6(), KafkaProducer$Conf$.MODULE$.apply$default$7(), KafkaProducer$Conf$.MODULE$.apply$default$8(), KafkaProducer$Conf$.MODULE$.apply$default$9(), KafkaProducer$Conf$.MODULE$.apply$default$10());
    }

    public static final /* synthetic */ void $anonfun$save$3(Iterable iterable) {
    }

    public KafkaLedger(String str, String str2, Option<Function1<UUID, Object>> option, Function1<DomainEvent, AggregateRoot> function1, ExecutionContext executionContext) {
        this.servers = str;
        this.topic = str2;
        this.streamToRevision = option;
        this.ec = executionContext;
        Logging.$init$(this);
        this.entityView = new EntityView(function1);
        this.dispatch = new KafkaEventDispatcher(str, str2, Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new EntityView[]{entityView()})), executionContext);
        this.stringSerializer = str3 -> {
            return str3.getBytes();
        };
    }
}
