package cn.guruguru.flink.connector.mongo.sink;

import cn.guruguru.flink.connector.mongo.internal.connection.DefaultMongoClientFactory;
import cn.guruguru.flink.connector.mongo.internal.conveter.MgSerializationConverter;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertManyOptions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/guruguru/flink/connector/mongo/sink/MongoRowDataSinkFunction.class */
public class MongoRowDataSinkFunction extends RichSinkFunction<RowData> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MongoRowDataSinkFunction.class);
    private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions();
    private final MgSerializationConverter<RowData> mongoSerConverter;
    private final String uri;
    private final String databaseName;
    private final String collectionName;
    private final long maxRetries;
    private final int batchSize;
    private final long batchIntervalMs;
    private final boolean ordered;
    private List<BsonDocument> batch = new ArrayList();
    private int batchCount = 0;
    private transient MongoClient mgClient;
    private transient MongoCollection<BsonDocument> mgCollection;
    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;

    public MongoRowDataSinkFunction(MgSerializationConverter<RowData> mgSerializationConverter, String str, String str2, String str3, long j, int i, long j2, boolean z) {
        this.mongoSerConverter = mgSerializationConverter;
        this.uri = str;
        this.databaseName = str2;
        this.collectionName = str3;
        this.maxRetries = j;
        this.batchSize = i;
        this.batchIntervalMs = j2;
        this.ordered = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        LOG.info("Starting MongoDB connection");
        this.mgClient = new DefaultMongoClientFactory(this.uri).create();
        this.mgCollection = this.mgClient.getDatabase(this.databaseName).getCollection(this.collectionName, BsonDocument.class);
        LOG.info("Started MongoDB connection");
    }

    public void invoke(RowData rowData, SinkFunction.Context context) {
        this.mgCollection.insertOne(this.mongoSerConverter.toExternal(rowData));
    }

    private void flush(BsonDocument bsonDocument) throws MongoSinkException {
        for (int i = 0; i <= this.maxRetries; i++) {
            try {
                addToBatch(bsonDocument);
                this.batchCount++;
                if (this.batchCount % this.batchSize == 0) {
                    executeBatch(this.batch);
                    this.batchCount = 0;
                    this.batch = new ArrayList();
                }
                return;
            } catch (Exception e) {
                LOG.error("MongoDB insertMany error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.maxRetries) {
                    throw new MongoSinkException("Mongo retry error");
                }
            }
        }
    }

    private void addToBatch(BsonDocument bsonDocument) {
        this.batch.add(bsonDocument);
    }

    private void executeBatch(List<BsonDocument> list) {
        LOG.debug("Bulk writing {} document(s) into collection [{}]", Integer.valueOf(this.batchSize), this.mgCollection);
        LOG.debug("MongoDB bulk write result: {}", this.mgCollection.insertMany(list, new InsertManyOptions().ordered(this.ordered)));
    }

    public void close() {
        if (this.mgClient != null) {
            try {
                this.mgClient.close();
            } catch (Exception e) {
                LOG.error("MongoDB client close exception.");
            }
            this.mgClient = null;
        }
    }

    public String toString() {
        return "MongoDB";
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }
}
