package pl.allegro.tech.mongomigrationstream.core.synchronization;

import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.SetsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import mu.KLogger;
import org.bson.BsonDocument;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import pl.allegro.tech.mongomigrationstream.core.concurrency.MigrationExecutors;
import pl.allegro.tech.mongomigrationstream.core.mongo.DbCollection;
import pl.allegro.tech.mongomigrationstream.core.mongo.SourceToDestination;
import pl.allegro.tech.mongomigrationstream.core.performer.SynchronizationResult;
import pl.allegro.tech.mongomigrationstream.core.performer.SynchronizationSuccess;
import pl.allegro.tech.mongomigrationstream.core.performer.Synchronizer;
import pl.allegro.tech.mongomigrationstream.core.queue.EventQueue;
import pl.allegro.tech.mongomigrationstream.core.state.StateEvent;
import pl.allegro.tech.mongomigrationstream.core.state.StateInfo;

/* compiled from: SourceToLocalSynchronizer.kt */
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��~\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0010\u000e\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b��\u0018��2\u00020\u0001B;\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\f\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t\u0012\u0006\u0010\u000b\u001a\u00020\f\u0012\u0006\u0010\r\u001a\u00020\u000e¢\u0006\u0002\u0010\u000fJ\u0018\u0010\u001e\u001a\u00020\u001f2\u0006\u0010 \u001a\u00020!2\u0006\u0010\"\u001a\u00020#H\u0002J\u0018\u0010$\u001a\u00020\u001f2\u0006\u0010\u0002\u001a\u00020\u00032\u0006\u0010\"\u001a\u00020#H\u0002J\b\u0010%\u001a\u00020&H\u0016J\b\u0010'\u001a\u00020\u001fH\u0016R \u0010\u0010\u001a\u0014\u0012\u0010\u0012\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00140\u00130\u00120\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\tX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00180\u0011X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0019\u001a\u00020\u001aX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u001b\u001a\b\u0012\u0004\u0012\u00020\u001d0\u001cX\u0082\u0004¢\u0006\u0002\n��¨\u0006("}, d2 = {"Lpl/allegro/tech/mongomigrationstream/core/synchronization/SourceToLocalSynchronizer;", "Lpl/allegro/tech/mongomigrationstream/core/performer/Synchronizer;", "sourceToDestination", "Lpl/allegro/tech/mongomigrationstream/core/mongo/SourceToDestination;", "sourceDb", "Lcom/mongodb/client/MongoDatabase;", "reactiveSourceDb", "Lcom/mongodb/reactivestreams/client/MongoDatabase;", "queue", "Lpl/allegro/tech/mongomigrationstream/core/queue/EventQueue;", "Lpl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent;", "stateInfo", "Lpl/allegro/tech/mongomigrationstream/core/state/StateInfo;", "meterRegistry", "Lio/micrometer/core/instrument/MeterRegistry;", "(Lpl/allegro/tech/mongomigrationstream/core/mongo/SourceToDestination;Lcom/mongodb/client/MongoDatabase;Lcom/mongodb/reactivestreams/client/MongoDatabase;Lpl/allegro/tech/mongomigrationstream/core/queue/EventQueue;Lpl/allegro/tech/mongomigrationstream/core/state/StateInfo;Lio/micrometer/core/instrument/MeterRegistry;)V", "cursors", "", "Lcom/mongodb/client/MongoCursor;", "Lcom/mongodb/client/model/changestream/ChangeStreamDocument;", "Lorg/bson/BsonDocument;", "executor", "Ljava/util/concurrent/ExecutorService;", "reactiveSubscribers", "Lpl/allegro/tech/mongomigrationstream/core/synchronization/ChangeStreamDocumentSubscriber;", "shouldSynchronize", "Ljava/util/concurrent/atomic/AtomicBoolean;", "synchronizedOperations", "", "", "performCollectionSynchronization", "", "dbCollection", "Lpl/allegro/tech/mongomigrationstream/core/mongo/DbCollection;", "eventConsumer", "Lpl/allegro/tech/mongomigrationstream/core/synchronization/EventConsumer;", "performReactiveCollectionSynchronization", "startSynchronization", "Lpl/allegro/tech/mongomigrationstream/core/performer/SynchronizationResult;", "stop", "mongo-migration-stream-core"})
@SourceDebugExtension({"SMAP\nSourceToLocalSynchronizer.kt\nKotlin\n*S Kotlin\n*F\n+ 1 SourceToLocalSynchronizer.kt\npl/allegro/tech/mongomigrationstream/core/synchronization/SourceToLocalSynchronizer\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 fake.kt\nkotlin/jvm/internal/FakeKt\n*L\n1#1,101:1\n1855#2,2:102\n1855#2,2:104\n1#3:106\n*S KotlinDebug\n*F\n+ 1 SourceToLocalSynchronizer.kt\npl/allegro/tech/mongomigrationstream/core/synchronization/SourceToLocalSynchronizer\n*L\n91#1:102,2\n92#1:104,2\n*E\n"})
/* loaded from: input_file:pl/allegro/tech/mongomigrationstream/core/synchronization/SourceToLocalSynchronizer.class */
public final class SourceToLocalSynchronizer implements Synchronizer {

    @NotNull
    private final SourceToDestination sourceToDestination;

    @NotNull
    private final MongoDatabase sourceDb;

    @NotNull
    private final com.mongodb.reactivestreams.client.MongoDatabase reactiveSourceDb;

    @NotNull
    private final EventQueue<ChangeEvent> queue;

    @NotNull
    private final StateInfo stateInfo;

    @NotNull
    private final MeterRegistry meterRegistry;

    @NotNull
    private final ExecutorService executor;

    @NotNull
    private final Set<String> synchronizedOperations;

    @NotNull
    private final AtomicBoolean shouldSynchronize;

    @NotNull
    private final List<MongoCursor<ChangeStreamDocument<BsonDocument>>> cursors;

    @NotNull
    private final List<ChangeStreamDocumentSubscriber> reactiveSubscribers;

    public SourceToLocalSynchronizer(@NotNull SourceToDestination sourceToDestination, @NotNull MongoDatabase mongoDatabase, @NotNull com.mongodb.reactivestreams.client.MongoDatabase mongoDatabase2, @NotNull EventQueue<ChangeEvent> eventQueue, @NotNull StateInfo stateInfo, @NotNull MeterRegistry meterRegistry) {
        Intrinsics.checkNotNullParameter(sourceToDestination, "sourceToDestination");
        Intrinsics.checkNotNullParameter(mongoDatabase, "sourceDb");
        Intrinsics.checkNotNullParameter(mongoDatabase2, "reactiveSourceDb");
        Intrinsics.checkNotNullParameter(eventQueue, "queue");
        Intrinsics.checkNotNullParameter(stateInfo, "stateInfo");
        Intrinsics.checkNotNullParameter(meterRegistry, "meterRegistry");
        this.sourceToDestination = sourceToDestination;
        this.sourceDb = mongoDatabase;
        this.reactiveSourceDb = mongoDatabase2;
        this.queue = eventQueue;
        this.stateInfo = stateInfo;
        this.meterRegistry = meterRegistry;
        this.executor = MigrationExecutors.INSTANCE.createSourceToLocalExecutor(this.sourceToDestination.getSource());
        this.synchronizedOperations = SetsKt.setOf(new String[]{"insert", "update", "replace", "delete"});
        this.shouldSynchronize = new AtomicBoolean(true);
        this.cursors = new ArrayList();
        this.reactiveSubscribers = new ArrayList();
    }

    @Override // pl.allegro.tech.mongomigrationstream.core.performer.Synchronizer
    @NotNull
    public SynchronizationResult startSynchronization() {
        KLogger kLogger;
        kLogger = SourceToLocalSynchronizerKt.logger;
        kLogger.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$startSynchronization$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                SourceToDestination sourceToDestination;
                sourceToDestination = SourceToLocalSynchronizer.this.sourceToDestination;
                return "Starting SourceToLocal synchronization for collection: [" + sourceToDestination.getSource() + "]";
            }
        });
        this.stateInfo.notifyStateChange$mongo_migration_stream_core(new StateEvent.StartEvent(this.sourceToDestination));
        performReactiveCollectionSynchronization(this.sourceToDestination, new EventConsumer(this.queue, this.meterRegistry));
        this.stateInfo.notifyStateChange$mongo_migration_stream_core(new StateEvent.SourceToLocalStartEvent(this.sourceToDestination));
        return SynchronizationSuccess.INSTANCE;
    }

    private final void performReactiveCollectionSynchronization(SourceToDestination sourceToDestination, EventConsumer eventConsumer) {
        ChangeStreamPublisher fullDocument = this.reactiveSourceDb.getCollection(sourceToDestination.getSource().getCollectionName(), BsonDocument.class).watch(CollectionsKt.listOf(Aggregates.match(Filters.in("operationType", this.synchronizedOperations))), BsonDocument.class).fullDocument(FullDocument.DEFAULT);
        Intrinsics.checkNotNullExpressionValue(fullDocument, "reactiveSourceDb\n       …ent(FullDocument.DEFAULT)");
        ChangeStreamDocumentSubscriber changeStreamDocumentSubscriber = new ChangeStreamDocumentSubscriber(sourceToDestination, this.stateInfo, eventConsumer, this.meterRegistry);
        this.reactiveSubscribers.add(changeStreamDocumentSubscriber);
        fullDocument.subscribe(changeStreamDocumentSubscriber);
    }

    private final void performCollectionSynchronization(DbCollection dbCollection, EventConsumer eventConsumer) {
        MongoCursor<ChangeStreamDocument<BsonDocument>> it = this.sourceDb.getCollection(dbCollection.getCollectionName(), BsonDocument.class).watch(CollectionsKt.listOf(Aggregates.match(Filters.in("operationType", this.synchronizedOperations)))).fullDocument(FullDocument.DEFAULT).iterator();
        Intrinsics.checkNotNullExpressionValue(it, "sourceDb.getCollection(d…              .iterator()");
        this.cursors.add(it);
        this.executor.execute(() -> {
            performCollectionSynchronization$lambda$2(r1, r2, r3);
        });
    }

    @Override // pl.allegro.tech.mongomigrationstream.core.performer.Synchronizer
    public void stop() {
        KLogger kLogger;
        KLogger kLogger2;
        KLogger kLogger3;
        KLogger kLogger4;
        KLogger kLogger5;
        kLogger = SourceToLocalSynchronizerKt.logger;
        kLogger.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$stop$1
            @Nullable
            public final Object invoke() {
                return "Trying to shut down SourceToLocalSynchronizer gracefully...";
            }
        });
        try {
            try {
                this.shouldSynchronize.set(false);
                Iterator<T> it = this.cursors.iterator();
                while (it.hasNext()) {
                    ((MongoCursor) it.next()).close();
                }
                Iterator<T> it2 = this.reactiveSubscribers.iterator();
                while (it2.hasNext()) {
                    ((ChangeStreamDocumentSubscriber) it2.next()).onComplete();
                }
                this.executor.shutdown();
                kLogger5 = SourceToLocalSynchronizerKt.logger;
                kLogger5.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$stop$5
                    @Nullable
                    public final Object invoke() {
                        return "Shut down SourceToLocalSynchronizer";
                    }
                });
            } catch (Throwable th) {
                kLogger2 = SourceToLocalSynchronizerKt.logger;
                kLogger2.warn(th, new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$stop$4
                    @Nullable
                    public final Object invoke() {
                        return "Exception while shutting down LocalToDestinationSynchronizer";
                    }
                });
                kLogger3 = SourceToLocalSynchronizerKt.logger;
                kLogger3.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$stop$5
                    @Nullable
                    public final Object invoke() {
                        return "Shut down SourceToLocalSynchronizer";
                    }
                });
            }
        } catch (Throwable th2) {
            kLogger4 = SourceToLocalSynchronizerKt.logger;
            kLogger4.info(new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$stop$5
                @Nullable
                public final Object invoke() {
                    return "Shut down SourceToLocalSynchronizer";
                }
            });
            throw th2;
        }
    }

    private static final void performCollectionSynchronization$lambda$2(SourceToLocalSynchronizer sourceToLocalSynchronizer, MongoCursor mongoCursor, EventConsumer eventConsumer) {
        Object obj;
        KLogger kLogger;
        Intrinsics.checkNotNullParameter(sourceToLocalSynchronizer, "this$0");
        Intrinsics.checkNotNullParameter(mongoCursor, "$collectionCursor");
        Intrinsics.checkNotNullParameter(eventConsumer, "$eventConsumer");
        while (sourceToLocalSynchronizer.shouldSynchronize.get()) {
            try {
                Result.Companion companion = Result.Companion;
                Object next = mongoCursor.next();
                Intrinsics.checkNotNullExpressionValue(next, "collectionCursor.next()");
                eventConsumer.saveEventToLocalQueue$mongo_migration_stream_core(ChangeEvent.Companion.fromMongoChangeStreamDocument((ChangeStreamDocument) next));
                obj = Result.constructor-impl(Unit.INSTANCE);
            } catch (Throwable th) {
                Result.Companion companion2 = Result.Companion;
                obj = Result.constructor-impl(ResultKt.createFailure(th));
            }
            Throwable th2 = Result.exceptionOrNull-impl(obj);
            if (th2 != null) {
                kLogger = SourceToLocalSynchronizerKt.logger;
                kLogger.error(th2, new Function0<Object>() { // from class: pl.allegro.tech.mongomigrationstream.core.synchronization.SourceToLocalSynchronizer$performCollectionSynchronization$1$2$1
                    @Nullable
                    public final Object invoke() {
                        return "Error during source to local synchronization";
                    }
                });
            }
        }
    }
}
