package io.debezium.connector.mongodb;

import com.mongodb.client.MongoCursor;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.class */
public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class);
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private final MongoDbConnectorConfig connectorConfig;
    private final MongoDbTaskContext taskContext;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    protected final Clock clock;
    private final SnapshotProgressListener<MongoDbPartition> snapshotProgressListener;
    private final ErrorHandler errorHandler;
    private AtomicBoolean aborted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource$MongoDbSnapshotContext.class */
    public static class MongoDbSnapshotContext extends AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> {
        public boolean lastCollection;
        public boolean lastRecordInCollection;

        MongoDbSnapshotContext(MongoDbPartition mongoDbPartition) {
            super(mongoDbPartition);
        }
    }

    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource$MongoDbSnapshottingTask.class */
    public static class MongoDbSnapshottingTask extends AbstractSnapshotChangeEventSource.SnapshottingTask {
        private final List<ReplicaSet> replicaSetsToSnapshot;

        public MongoDbSnapshottingTask(List<ReplicaSet> list) {
            super(false, !list.isEmpty());
            this.replicaSetsToSnapshot = list;
        }

        public List<ReplicaSet> getReplicaSetsToSnapshot() {
            return Collections.unmodifiableList(this.replicaSetsToSnapshot);
        }

        public boolean shouldSkipSnapshot() {
            return !snapshotData();
        }

        public String toString() {
            return "SnapshottingTask [replicaSetsToSnapshot=" + this.replicaSetsToSnapshot + "]";
        }
    }

    public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbTaskContext mongoDbTaskContext, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> eventDispatcher, Clock clock, SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler) {
        super(mongoDbConnectorConfig, snapshotProgressListener);
        this.aborted = new AtomicBoolean(false);
        this.connectorConfig = mongoDbConnectorConfig;
        this.taskContext = mongoDbTaskContext;
        this.connectionContext = mongoDbTaskContext.getConnectionContext();
        this.replicaSets = replicaSets;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
        this.errorHandler = errorHandler;
    }

    protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbOffsetContext mongoDbOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        MongoDbSnapshottingTask mongoDbSnapshottingTask = (MongoDbSnapshottingTask) snapshottingTask;
        MongoDbSnapshotContext mongoDbSnapshotContext = (MongoDbSnapshotContext) snapshotContext;
        LOGGER.info("Snapshot step 1 - Preparing");
        if (mongoDbOffsetContext != null && mongoDbOffsetContext.isSnapshotRunning()) {
            LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
        }
        LOGGER.info("Snapshot step 2 - Determining snapshot offsets");
        determineSnapshotOffsets(mongoDbSnapshotContext, this.replicaSets);
        List<ReplicaSet> replicaSetsToSnapshot = mongoDbSnapshottingTask.getReplicaSetsToSnapshot();
        int size = replicaSetsToSnapshot.size();
        ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "replicator-snapshot", size);
        CountDownLatch countDownLatch = new CountDownLatch(size);
        LOGGER.info("Ignoring unnamed replica sets: {}", this.replicaSets.unnamedReplicaSets());
        LOGGER.info("Starting {} thread(s) to snapshot replica sets: {}", Integer.valueOf(size), replicaSetsToSnapshot);
        LOGGER.info("Snapshot step 3 - Snapshotting data");
        replicaSetsToSnapshot.forEach(replicaSet -> {
            newFixedThreadPool.submit(() -> {
                try {
                    try {
                        this.taskContext.configureLoggingContext(replicaSet.replicaSetName());
                        try {
                            snapshotReplicaSet(changeEventSourceContext, mongoDbSnapshotContext, replicaSet);
                            countDownLatch.countDown();
                        } catch (Throwable th) {
                            throw th;
                        }
                    } catch (Throwable th2) {
                        LOGGER.error("Snapshot for replica set {} failed", replicaSet.replicaSetName(), th2);
                        this.errorHandler.setProducerThrowable(th2);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th3) {
                    countDownLatch.countDown();
                    throw th3;
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.aborted.set(true);
        }
        try {
            newFixedThreadPool.shutdown();
            LOGGER.info("Stopping mongodb connections");
            this.taskContext.getConnectionContext().shutdown();
            return this.aborted.get() ? SnapshotResult.aborted() : SnapshotResult.completed((MongoDbOffsetContext) snapshotContext.offset);
        } catch (Throwable th) {
            LOGGER.info("Stopping mongodb connections");
            this.taskContext.getConnectionContext().shutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(MongoDbPartition mongoDbPartition, MongoDbOffsetContext mongoDbOffsetContext) {
        if (mongoDbOffsetContext == null) {
            LOGGER.info("No previous offset has been found");
            if (!this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
                return new MongoDbSnapshottingTask(this.replicaSets.all());
            }
            LOGGER.info("According to the connector configuration, no snapshot will occur.");
            return new MongoDbSnapshottingTask(Collections.emptyList());
        }
        if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
            LOGGER.info("According to the connector configuration, no snapshot will occur.");
            return new MongoDbSnapshottingTask(Collections.emptyList());
        }
        ArrayList arrayList = new ArrayList();
        try {
            this.replicaSets.onEachReplicaSet(replicaSet -> {
                ConnectionContext.MongoPrimary mongoPrimary = null;
                try {
                    mongoPrimary = establishConnectionToPrimary(mongoDbPartition, replicaSet);
                    ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
                    if (mongoPrimary != null && isSnapshotExpected(mongoPrimary, replicaSetOffsetContext)) {
                        arrayList.add(replicaSet);
                    }
                    if (mongoPrimary != null) {
                        mongoPrimary.stop();
                    }
                } catch (Throwable th) {
                    if (mongoPrimary != null) {
                        mongoPrimary.stop();
                    }
                    throw th;
                }
            });
            this.taskContext.getConnectionContext().shutdown();
            return new MongoDbSnapshottingTask(arrayList);
        } catch (Throwable th) {
            this.taskContext.getConnectionContext().shutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoDbPartition mongoDbPartition) throws Exception {
        return new MongoDbSnapshotContext(mongoDbPartition);
    }

    private void snapshotReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSet replicaSet) throws InterruptedException {
        ConnectionContext.MongoPrimary mongoPrimary = null;
        try {
            mongoPrimary = establishConnectionToPrimary((MongoDbPartition) mongoDbSnapshotContext.partition, replicaSet);
            if (mongoPrimary != null) {
                createDataEvents(changeEventSourceContext, mongoDbSnapshotContext, replicaSet, mongoPrimary);
            }
            if (mongoPrimary != null) {
                mongoPrimary.stop();
            }
        } catch (Throwable th) {
            if (mongoPrimary != null) {
                mongoPrimary.stop();
            }
            throw th;
        }
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(MongoDbPartition mongoDbPartition, ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (str, th) -> {
            if (th.getMessage() != null && th.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + str, th);
            }
            this.dispatcher.dispatchConnectorEvent(mongoDbPartition, new DisconnectEvent());
            LOGGER.error("Error while attempting to {}: ", new Object[]{str, th.getMessage(), th});
            throw new ConnectException("Error while attempting to " + str, th);
        });
    }

    private boolean isSnapshotExpected(ConnectionContext.MongoPrimary mongoPrimary, ReplicaSetOffsetContext replicaSetOffsetContext) {
        boolean z;
        if (replicaSetOffsetContext.hasOffset()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Found existing offset for replica set '{}' at {}", replicaSetOffsetContext.getReplicaSetName(), replicaSetOffsetContext.getOffset());
            }
            z = false;
            if (replicaSetOffsetContext.isSnapshotOngoing()) {
                LOGGER.info("The previous snapshot was incomplete for '{}', so restarting the snapshot", replicaSetOffsetContext.getReplicaSetName());
                z = true;
            } else {
                BsonTimestamp lastOffsetTimestamp = replicaSetOffsetContext.lastOffsetTimestamp();
                BsonTimestamp bsonTimestamp = (BsonTimestamp) mongoPrimary.execute("get oplog position", mongoClient -> {
                    return SourceInfo.extractEventTimestamp((Document) mongoClient.getDatabase("local").getCollection("oplog.rs").find().sort(new Document("$natural", 1)).limit(1).first());
                });
                if (bsonTimestamp == null) {
                    LOGGER.info("The oplog contains no entries, so performing snapshot of replica set '{}'", replicaSetOffsetContext.getReplicaSetName());
                    z = true;
                } else if (lastOffsetTimestamp.compareTo(bsonTimestamp) < 0) {
                    LOGGER.info("Snapshot is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}", new Object[]{replicaSetOffsetContext.getReplicaSetName(), bsonTimestamp, lastOffsetTimestamp});
                    z = true;
                } else {
                    LOGGER.info("The oplog contains the last entry previously read for '{}', so no snapshot will be performed", replicaSetOffsetContext.getReplicaSetName());
                }
            }
        } else {
            LOGGER.info("No existing offset found for replica set '{}', starting snapshot", replicaSetOffsetContext.getReplicaSetName());
            z = true;
        }
        return z;
    }

    protected void determineSnapshotOffsets(MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSets replicaSets) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary establishConnectionToPrimary = establishConnectionToPrimary((MongoDbPartition) mongoDbSnapshotContext.partition, replicaSet);
            if (establishConnectionToPrimary != null) {
                try {
                    establishConnectionToPrimary.execute("get oplog position", mongoClient -> {
                        linkedHashMap.put(replicaSet, (Document) mongoClient.getDatabase("local").getCollection("oplog.rs").find().sort(new Document("$natural", -1)).limit(1).first());
                    });
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                } catch (Throwable th) {
                    LOGGER.info("Stopping primary client");
                    establishConnectionToPrimary.stop();
                    throw th;
                }
            }
        });
        mongoDbSnapshotContext.offset = new MongoDbOffsetContext(new SourceInfo(this.connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext(false), linkedHashMap);
    }

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSet replicaSet, ConnectionContext.MongoPrimary mongoPrimary) throws InterruptedException {
        EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        ((MongoDbOffsetContext) mongoDbSnapshotContext.offset).preSnapshotStart();
        createDataEventsForReplicaSet(changeEventSourceContext, mongoDbSnapshotContext, snapshotChangeEventReceiver, replicaSet, mongoPrimary);
        ((MongoDbOffsetContext) mongoDbSnapshotContext.offset).preSnapshotCompletion();
        snapshotChangeEventReceiver.completeSnapshot();
        ((MongoDbOffsetContext) mongoDbSnapshotContext.offset).postSnapshotCompletion();
    }

    private void createDataEventsForReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, ConnectionContext.MongoPrimary mongoPrimary) throws InterruptedException {
        String replicaSetName = replicaSet.replicaSetName();
        MongoDbOffsetContext mongoDbOffsetContext = (MongoDbOffsetContext) mongoDbSnapshotContext.offset;
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        mongoDbSnapshotContext.lastCollection = false;
        mongoDbOffsetContext.startReplicaSetSnapshot(replicaSet.replicaSetName());
        LOGGER.info("Beginning snapshot of '{}' at {}", replicaSetName, replicaSetOffsetContext.getOffset());
        List list = (List) determineDataCollectionsToBeSnapshotted(mongoPrimary.collections()).collect(Collectors.toList());
        this.snapshotProgressListener.monitoredDataCollectionsDetermined((MongoDbPartition) mongoDbSnapshotContext.partition, list);
        if (this.connectorConfig.getSnapshotMaxThreads() > 1) {
            int min = Math.min(list.size(), this.connectorConfig.getSnapshotMaxThreads());
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(list);
            ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "snapshot-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main"), this.connectorConfig.getSnapshotMaxThreads());
            CountDownLatch countDownLatch = new CountDownLatch(min);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", new Object[]{Integer.valueOf(min), Integer.valueOf(list.size()), Strings.join(", ", list)});
            for (int i = 0; i < min; i++) {
                newFixedThreadPool.submit(() -> {
                    CollectionId collectionId;
                    this.taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-snapshot" + atomicInteger.incrementAndGet());
                    try {
                        while (!atomicBoolean.get() && (collectionId = (CollectionId) concurrentLinkedQueue.poll()) != null) {
                            try {
                                if (!changeEventSourceContext.isRunning()) {
                                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                                }
                                if (concurrentLinkedQueue.isEmpty()) {
                                    mongoDbSnapshotContext.lastCollection = true;
                                }
                                createDataEventsForCollection(changeEventSourceContext, mongoDbSnapshotContext, snapshotReceiver, replicaSet, collectionId, mongoPrimary);
                            } catch (InterruptedException e) {
                                atomicBoolean.set(true);
                                countDownLatch.countDown();
                                return;
                            }
                        }
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                atomicBoolean.set(true);
            }
            newFixedThreadPool.shutdown();
        } else {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                CollectionId collectionId = (CollectionId) it.next();
                if (!changeEventSourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                }
                if (!it.hasNext()) {
                    mongoDbSnapshotContext.lastCollection = true;
                }
                createDataEventsForCollection(changeEventSourceContext, mongoDbSnapshotContext, snapshotReceiver, replicaSet, collectionId, mongoPrimary);
            }
        }
        mongoDbOffsetContext.stopReplicaSetSnapshot(replicaSet.replicaSetName());
    }

    private void createDataEventsForCollection(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, CollectionId collectionId, ConnectionContext.MongoPrimary mongoPrimary) throws InterruptedException {
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data for collection '{}'", collectionId);
        mongoPrimary.executeBlocking("sync '" + collectionId + "'", mongoClient -> {
            long j = 0;
            MongoCursor it = mongoClient.getDatabase(collectionId.dbName()).getCollection(collectionId.name()).find(Document.parse(this.connectorConfig.getSnapshotFilterQueryForCollection(collectionId).orElseGet(() -> {
                return "{}";
            }))).batchSize(this.taskContext.getConnectorConfig().getSnapshotFetchSize()).iterator();
            try {
                mongoDbSnapshotContext.lastRecordInCollection = false;
                if (it.hasNext()) {
                    while (it.hasNext()) {
                        if (!changeEventSourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.name());
                        }
                        Document document = (Document) it.next();
                        j++;
                        mongoDbSnapshotContext.lastRecordInCollection = !it.hasNext();
                        if (mongoDbSnapshotContext.lastCollection && mongoDbSnapshotContext.lastRecordInCollection) {
                            ((MongoDbOffsetContext) mongoDbSnapshotContext.offset).markLastSnapshotRecord();
                        }
                        this.dispatcher.dispatchSnapshotEvent((MongoDbPartition) mongoDbSnapshotContext.partition, collectionId, getChangeRecordEmitter(mongoDbSnapshotContext, collectionId, document, replicaSet), snapshotReceiver);
                    }
                } else if (mongoDbSnapshotContext.lastCollection) {
                    ((MongoDbOffsetContext) mongoDbSnapshotContext.offset).markLastSnapshotRecord();
                }
                LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", new Object[]{Long.valueOf(j), collectionId, Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                this.snapshotProgressListener.dataCollectionSnapshotCompleted((MongoDbPartition) mongoDbSnapshotContext.partition, collectionId, j);
                if (it != null) {
                    it.close();
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    protected ChangeRecordEmitter<MongoDbPartition> getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, CollectionId collectionId, Document document, ReplicaSet replicaSet) {
        MongoDbOffsetContext mongoDbOffsetContext = (MongoDbOffsetContext) snapshotContext.offset;
        ReplicaSetPartition replicaSetPartition = mongoDbOffsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        replicaSetOffsetContext.readEvent(collectionId, getClock().currentTime());
        return new MongoDbChangeSnapshotOplogRecordEmitter(replicaSetPartition, replicaSetOffsetContext, getClock(), document, true);
    }

    protected Clock getClock() {
        return this.clock;
    }

    protected /* bridge */ /* synthetic */ SnapshotResult doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OffsetContext offsetContext, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        return doExecute(changeEventSourceContext, (MongoDbOffsetContext) offsetContext, (AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext>) snapshotContext, snapshottingTask);
    }
}
