package org.apache.nifi.processors.mongodb;

import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateManyModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;
import org.bson.conversions.Bson;

@CapabilityDescription("This processor is a record-aware processor for inserting/upserting data into MongoDB. It uses a configured record reader and schema to read an incoming record set from the body of a flowfile and then inserts/upserts batches of those records into a configured MongoDB collection. This processor does not support deletes. The number of documents to insert/upsert at a time is controlled by the \"Batch Size\" configuration property. This value should be set to a reasonable size to ensure that MongoDB is not overloaded with too many operations at once.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"mongodb", "insert", "update", "upsert", "record", "put"})
@ReadsAttribute(attribute = PutMongoRecord.MONGODB_UPDATE_MODE, description = "Configurable parameter for controlling update mode on a per-flowfile basis. Acceptable values are 'one' and 'many' and controls whether a single incoming record should update a single or multiple Mongo documents.")
/* loaded from: input_file:org/apache/nifi/processors/mongodb/PutMongoRecord.class */
public class PutMongoRecord extends AbstractMongoProcessor {
    static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();
    static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update One", "Updates only the first document that matches the query.");
    static final AllowableValue UPDATE_MANY = new AllowableValue("many", "Update Many", "Updates every document that matches the query.");
    static final AllowableValue UPDATE_FF_ATTRIBUTE = new AllowableValue("flowfile-attribute", "Use 'mongodb.update.mode' flowfile attribute.", "Use the value of the 'mongodb.update.mode' attribute of the incoming flowfile. Acceptable values are 'one' and 'many'.");
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder().name("insert_count").displayName("Batch Size").description("The number of records to group together for one single insert/upsert operation against MongoDB.").defaultValue("100").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder().name("ordered").displayName("Ordered").description("Perform ordered or unordered operations").allowableValues(new String[]{"True", "False"}).defaultValue("False").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor BYPASS_VALIDATION = new PropertyDescriptor.Builder().name("bypass-validation").displayName("Bypass Validation").description("Bypass schema validation during insert/upsert").allowableValues(new String[]{"True", "False"}).defaultValue("True").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    static final PropertyDescriptor UPDATE_KEY_FIELDS = new PropertyDescriptor.Builder().name("update-key-fields").displayName("Update Key Fields").description("Comma separated list of fields based on which to identify documents that need to be updated. If this property is set NiFi will attempt an upsert operation on all documents. If this property is not set all documents will be inserted.").required(false).addValidator(StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR)).build();
    static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder().name("update-mode").displayName("Update Mode").dependsOn(UPDATE_KEY_FIELDS, new AllowableValue[0]).description("Choose between updating a single document or multiple documents per incoming record.").allowableValues(new AllowableValue[]{UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE}).defaultValue(UPDATE_ONE.getValue()).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        UpdateOneModel insertOneModel;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        WriteConcern writeConcern = this.clientService.getWriteConcern();
        int intValue = processContext.getProperty(INSERT_COUNT).asInteger().intValue();
        int i = 0;
        boolean booleanValue = processContext.getProperty(ORDERED).asBoolean().booleanValue();
        boolean booleanValue2 = processContext.getProperty(BYPASS_VALIDATION).asBoolean().booleanValue();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (processContext.getProperty(UPDATE_KEY_FIELDS).isSet()) {
            Arrays.stream(processContext.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*")).forEach(str -> {
                linkedHashMap.put(str, Arrays.asList(str.split("\\.")));
            });
        }
        BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
        bulkWriteOptions.ordered(booleanValue);
        bulkWriteOptions.bypassDocumentValidation(Boolean.valueOf(booleanValue2));
        try {
            try {
                InputStream read = processSession.read(flowFile);
                try {
                    RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                    try {
                        RecordSchema schema = createRecordReader.getSchema();
                        MongoCollection withWriteConcern = getCollection(processContext, flowFile).withWriteConcern(writeConcern);
                        ArrayList arrayList = new ArrayList();
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                if (arrayList.size() > 0) {
                                    withWriteConcern.bulkWrite(arrayList, bulkWriteOptions);
                                }
                                if (createRecordReader != null) {
                                    createRecordReader.close();
                                }
                                if (read != null) {
                                    read.close();
                                }
                                if (0 == 0) {
                                    processSession.getProvenanceReporter().send(flowFile, this.clientService.getURI(), String.format("Written %d documents to MongoDB.", Integer.valueOf(i)));
                                    processSession.transfer(flowFile, REL_SUCCESS);
                                    getLogger().info("Written {} records into MongoDB", new Object[]{Integer.valueOf(i)});
                                    return;
                                }
                                return;
                            }
                            Map map = (Map) DataTypeUtils.convertRecordFieldtoObject(nextRecord, RecordFieldType.RECORD.getRecordDataType(nextRecord.getSchema()));
                            Document document = new Document();
                            for (String str2 : schema.getFieldNames()) {
                                document.put(str2, map.get(str2));
                            }
                            Document convertArrays = convertArrays(document);
                            if (processContext.getProperty(UPDATE_KEY_FIELDS).isSet()) {
                                Bson[] buildFilters = buildFilters(linkedHashMap, convertArrays);
                                if (updateModeMatches(UPDATE_ONE.getValue(), processContext, flowFile)) {
                                    insertOneModel = new UpdateOneModel(Filters.and(buildFilters), new Document("$set", convertArrays), new UpdateOptions().upsert(true));
                                } else {
                                    if (!updateModeMatches(UPDATE_MANY.getValue(), processContext, flowFile)) {
                                        throw new ProcessException("Unrecognized 'mongodb.update.mode' value '" + flowFile.getAttribute(MONGODB_UPDATE_MODE) + "'");
                                    }
                                    insertOneModel = new UpdateManyModel(Filters.and(buildFilters), new Document("$set", convertArrays), new UpdateOptions().upsert(true));
                                }
                            } else {
                                insertOneModel = new InsertOneModel(convertArrays);
                            }
                            arrayList.add(insertOneModel);
                            if (arrayList.size() == intValue) {
                                withWriteConcern.bulkWrite(arrayList, bulkWriteOptions);
                                i += arrayList.size();
                                arrayList = new ArrayList();
                            }
                        }
                    } catch (Throwable th) {
                        if (createRecordReader != null) {
                            try {
                                createRecordReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (read != null) {
                        try {
                            read.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (ProcessException | SchemaNotFoundException | IOException | MalformedRecordException | MongoException e) {
                getLogger().error("PutMongoRecord failed with error:", e);
                processSession.transfer(flowFile, REL_FAILURE);
                if (1 == 0) {
                    processSession.getProvenanceReporter().send(flowFile, this.clientService.getURI(), String.format("Written %d documents to MongoDB.", 0));
                    processSession.transfer(flowFile, REL_SUCCESS);
                    getLogger().info("Written {} records into MongoDB", new Object[]{0});
                }
            }
        } catch (Throwable th5) {
            if (0 == 0) {
                processSession.getProvenanceReporter().send(flowFile, this.clientService.getURI(), String.format("Written %d documents to MongoDB.", 0));
                processSession.transfer(flowFile, REL_SUCCESS);
                getLogger().info("Written {} records into MongoDB", new Object[]{0});
            }
            throw th5;
        }
    }

    private Document convertArrays(Document document) {
        Document document2 = new Document();
        for (Map.Entry entry : document.entrySet()) {
            if (entry.getValue() != null && entry.getValue().getClass().isArray()) {
                document2.put((String) entry.getKey(), convertArrays((Object[]) entry.getValue()));
            } else if (entry.getValue() == null || !((entry.getValue() instanceof Map) || (entry.getValue() instanceof Document))) {
                document2.put((String) entry.getKey(), entry.getValue());
            } else {
                document2.put((String) entry.getKey(), convertArrays(new Document((Map) entry.getValue())));
            }
        }
        return document2;
    }

    private List convertArrays(Object[] objArr) {
        ArrayList arrayList = new ArrayList();
        for (Object obj : objArr) {
            if (obj != null && obj.getClass().isArray()) {
                arrayList.add(convertArrays((Object[]) obj));
            } else if (obj instanceof Map) {
                arrayList.add(convertArrays(new Document((Map) obj)));
            } else {
                arrayList.add(obj);
            }
        }
        return arrayList;
    }

    private Bson[] buildFilters(Map<String, List<String>> map, Document document) {
        return (Bson[]) map.entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            Object obj = document;
            String str2 = null;
            for (String str3 : (List) entry.getValue()) {
                if (!(obj instanceof Map)) {
                    throw new ProcessException("field '" + str2 + "' (from field expression '" + str + "') is not an embedded document");
                }
                obj = ((Map) obj).get(str3);
                if (obj == null) {
                    throw new ProcessException("field '" + str3 + "' (from field expression '" + str + "') has no value");
                }
                str2 = str3;
            }
            return Filters.eq(str, obj);
        }).toArray(i -> {
            return new Bson[i];
        });
    }

    private boolean updateModeMatches(String str, ProcessContext processContext, FlowFile flowFile) {
        String value = processContext.getProperty(UPDATE_MODE).getValue();
        return value.equals(str) || (value.equals(UPDATE_FF_ATTRIBUTE.getValue()) && str.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE)));
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(descriptors);
        arrayList.add(RECORD_READER_FACTORY);
        arrayList.add(INSERT_COUNT);
        arrayList.add(ORDERED);
        arrayList.add(BYPASS_VALIDATION);
        arrayList.add(UPDATE_KEY_FIELDS);
        arrayList.add(UPDATE_MODE);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
