package cn.guruguru.flink.connector.mongo.table;

import cn.guruguru.flink.connector.mongo.internal.options.MongoLookupOptions;
import cn.guruguru.flink.connector.mongo.internal.options.MongoOptions;
import cn.guruguru.flink.connector.mongo.internal.options.MongoReadOptions;
import cn.guruguru.flink.connector.mongo.internal.options.MongoWriteOptions;
import com.mongodb.internal.connection.ByteBufferBsonOutput;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:cn/guruguru/flink/connector/mongo/table/MongoDynamicTableFactory.class */
public class MongoDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "mongodb";
    public static final ConfigOption<String> URI = ConfigOptions.key("uri").stringType().defaultValue(MongoOptions.DEFAULT_MONGO_URI).withDescription("MongoDB Connection URI");
    public static final ConfigOption<String> DATABASE = ConfigOptions.key("database").stringType().noDefaultValue().withDescription("Mongodb database name");
    public static final ConfigOption<String> COLLECTION = ConfigOptions.key("collection").stringType().noDefaultValue().withDescription("Mongodb collection name");
    public static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions.key("scan.fetch-size").intType().defaultValue(0).withDescription("The fetch size");
    public static final ConfigOption<Boolean> SCAN_EXCLUDE_ID = ConfigOptions.key("scan.exclude-id").booleanType().defaultValue(true).withDescription("Whether to exclude the ObjectId field when scanning");
    public static final ConfigOption<Integer> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows").intType().defaultValue(10000).withDescription("The cache max batch size");
    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl").durationType().defaultValue(Duration.ofSeconds(10)).withDescription("The cache time to live.");
    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.cache.max-retries").intType().defaultValue(3).withDescription("The Max retry times if lookup database failed.");
    public static final ConfigOption<Boolean> LOOKUP_EXCLUDE_ID = ConfigOptions.key("lookup.exclude-id").booleanType().defaultValue(true).withDescription("Whether to exclude the ObjectId field when looking up.");
    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows").intType().defaultValue(Integer.valueOf(ByteBufferBsonOutput.INITIAL_BUFFER_SIZE)).withDescription("The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100.");
    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1)).withDescription("The flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");
    public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("The max retry times if writing records to database failed.");
    public static final ConfigOption<Boolean> SINK_ORDERED = ConfigOptions.key("sink.ordered").booleanType().defaultValue(true).withDescription("ordered");

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validate();
        validateConfigOptions(options);
        return new MongoDynamicTableSource(TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()), getMongoOptions(options), getMongoReadOptions(options), getMongoLookupOptions(options));
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        ReadableConfig options = createTableFactoryHelper.getOptions();
        createTableFactoryHelper.validate();
        validateConfigOptions(options);
        return new MongoDynamicTableSink(TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()), getMongoOptions(options), getMongoWriteOptions(options));
    }

    private MongoOptions getMongoOptions(ReadableConfig readableConfig) {
        MongoOptions.Builder builder = MongoOptions.builder();
        builder.setUri((String) readableConfig.get(URI));
        Optional optional = readableConfig.getOptional(DATABASE);
        builder.getClass();
        optional.ifPresent(builder::setDatabaseName);
        Optional optional2 = readableConfig.getOptional(COLLECTION);
        builder.getClass();
        optional2.ifPresent(builder::setCollectionName);
        return builder.build();
    }

    private MongoWriteOptions getMongoWriteOptions(ReadableConfig readableConfig) {
        return MongoWriteOptions.builder().setMaxRetries(((Integer) readableConfig.get(SINK_MAX_RETRIES)).intValue()).setBatchSize(((Integer) readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)).intValue()).setBatchIntervalMs(((Duration) readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL)).toMillis()).setOrdered(((Boolean) readableConfig.get(SINK_ORDERED)).booleanValue()).build();
    }

    private MongoReadOptions getMongoReadOptions(ReadableConfig readableConfig) {
        return MongoReadOptions.builder().setFetchSize(((Integer) readableConfig.get(SCAN_FETCH_SIZE)).intValue()).build();
    }

    private MongoLookupOptions getMongoLookupOptions(ReadableConfig readableConfig) {
        return MongoLookupOptions.builder().setCacheMaxRows(((Integer) readableConfig.get(LOOKUP_CACHE_MAX_ROWS)).intValue()).setCacheTtl((Duration) readableConfig.get(LOOKUP_CACHE_TTL)).setCacheMaxRows(((Integer) readableConfig.get(LOOKUP_CACHE_MAX_ROWS)).intValue()).build();
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(URI);
        hashSet.add(DATABASE);
        hashSet.add(COLLECTION);
        return hashSet;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet hashSet = new HashSet();
        hashSet.add(SCAN_FETCH_SIZE);
        hashSet.add(SCAN_EXCLUDE_ID);
        hashSet.add(LOOKUP_CACHE_MAX_ROWS);
        hashSet.add(LOOKUP_CACHE_TTL);
        hashSet.add(LOOKUP_MAX_RETRIES);
        hashSet.add(LOOKUP_EXCLUDE_ID);
        hashSet.add(SINK_BUFFER_FLUSH_MAX_ROWS);
        hashSet.add(SINK_BUFFER_FLUSH_INTERVAL);
        hashSet.add(SINK_MAX_RETRIES);
        return hashSet;
    }

    private void validateConfigOptions(ReadableConfig readableConfig) {
        checkAllOrNone(readableConfig, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
    }

    private void checkAllOrNone(ReadableConfig readableConfig, ConfigOption<?>[] configOptionArr) {
        int i = 0;
        for (ConfigOption<?> configOption : configOptionArr) {
            if (readableConfig.getOptional(configOption).isPresent()) {
                i++;
            }
        }
        Preconditions.checkArgument(configOptionArr.length == i || i == 0, "Either all or none of the following options should be provided:\n" + String.join("\n", (String[]) Arrays.stream(configOptionArr).map((v0) -> {
            return v0.key();
        }).toArray(i2 -> {
            return new String[i2];
        })));
    }
}
