package cn.guruguru.flink.connector.mongo.source;

import cn.guruguru.flink.connector.mongo.internal.connection.DefaultMongoClientFactory;
import cn.guruguru.flink.connector.mongo.internal.conveter.MgDeserializationConverter;
import cn.guruguru.flink.connector.mongo.internal.schema.MongoTableSchema;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.bson.BsonDocument;

/* loaded from: input_file:cn/guruguru/flink/connector/mongo/source/MongoRowDataSourceFunction.class */
public class MongoRowDataSourceFunction<RowData> extends RichSourceFunction<RowData> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private final MgDeserializationConverter<RowData> deserConverter;
    private final String uri;
    private final String databaseName;
    private final String collectionName;
    private final int fetchSize;
    private final boolean excludeId;
    private transient MongoClient mongoClient;
    private transient MongoCollection<BsonDocument> mongoCollection;

    public MongoRowDataSourceFunction(MgDeserializationConverter<RowData> mgDeserializationConverter, String str, String str2, String str3, int i, boolean z) {
        this.deserConverter = mgDeserializationConverter;
        this.uri = str;
        this.databaseName = str2;
        this.collectionName = str3;
        this.fetchSize = i;
        this.excludeId = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.mongoClient = new DefaultMongoClientFactory(this.uri).create();
        this.mongoCollection = this.mongoClient.getDatabase(this.databaseName).getCollection(this.collectionName, BsonDocument.class);
    }

    public void close() throws Exception {
        this.mongoClient.close();
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        MongoCursor<BsonDocument> it;
        if (this.excludeId) {
            it = this.mongoCollection.find().projection(Projections.exclude(MongoTableSchema.ObjectIdInfo.DEFAULT_OID_NAME)).batchSize2(this.fetchSize).iterator();
            Throwable th = null;
            while (it.hasNext()) {
                try {
                    try {
                        sourceContext.collect(this.deserConverter.toInternal(it.next()));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            if (it != null) {
                if (0 == 0) {
                    it.close();
                    return;
                }
                try {
                    it.close();
                    return;
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                    return;
                }
            }
            return;
        }
        it = this.mongoCollection.find().batchSize2(this.fetchSize).iterator();
        Throwable th4 = null;
        while (it.hasNext()) {
            try {
                try {
                    sourceContext.collect(this.deserConverter.toInternal(it.next()));
                } catch (Throwable th5) {
                    th4 = th5;
                    throw th5;
                }
            } finally {
            }
        }
        if (it != null) {
            if (0 == 0) {
                it.close();
                return;
            }
            try {
                it.close();
            } catch (Throwable th6) {
                th4.addSuppressed(th6);
            }
        }
    }

    public void cancel() {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }
}
