package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@WritesAttributes({@WritesAttribute(attribute = "amqp$appId", description = "The App ID field from the AMQP Message"), @WritesAttribute(attribute = "amqp$contentEncoding", description = "The Content Encoding reported by the AMQP Message"), @WritesAttribute(attribute = "amqp$contentType", description = "The Content Type reported by the AMQP Message"), @WritesAttribute(attribute = "amqp$headers", description = "The headers present on the AMQP Message"), @WritesAttribute(attribute = "amqp$deliveryMode", description = "The numeric indicator for the Message's Delivery Mode"), @WritesAttribute(attribute = "amqp$priority", description = "The Message priority"), @WritesAttribute(attribute = "amqp$correlationId", description = "The Message's Correlation ID"), @WritesAttribute(attribute = "amqp$replyTo", description = "The value of the Message's Reply-To field"), @WritesAttribute(attribute = "amqp$expiration", description = "The Message Expiration"), @WritesAttribute(attribute = "amqp$messageId", description = "The unique ID of the Message"), @WritesAttribute(attribute = "amqp$timestamp", description = "The timestamp of the Message, as the number of milliseconds since epoch"), @WritesAttribute(attribute = "amqp$type", description = "The type of message"), @WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"), @WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster")})
/* loaded from: input_file:org/apache/nifi/amqp/processors/ConsumeAMQP.class */
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
    private static final String ATTRIBUTES_PREFIX = "amqp$";
    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder().name("Queue").description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder().name("auto.acknowledge").displayName("Auto-Acknowledge messages").description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. This generally will provide better throughput but could result in messages being lost upon restart of NiFi").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("batch.size").displayName("Batch Size").description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("10").required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received from the AMQP queue are routed to this relationship").build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    public void processResource(Connection connection, AMQPConsumer aMQPConsumer, ProcessContext processContext, ProcessSession processSession) {
        GetResponse getResponse = null;
        int i = 0;
        while (true) {
            if (i >= processContext.getProperty(BATCH_SIZE).asInteger().intValue()) {
                break;
            }
            GetResponse consume = aMQPConsumer.consume();
            if (consume != null) {
                FlowFile putAllAttributes = processSession.putAllAttributes(processSession.write(processSession.create(), outputStream -> {
                    outputStream.write(consume.getBody());
                }), buildAttributes(consume.getProps()));
                processSession.getProvenanceReporter().receive(putAllAttributes, connection.toString() + "/" + processContext.getProperty(QUEUE).getValue());
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                getResponse = consume;
                i++;
            } else if (getResponse == null) {
                processContext.yield();
            }
        }
        processSession.commit();
        if (getResponse != null) {
            try {
                aMQPConsumer.acknowledge(getResponse);
            } catch (IOException e) {
                throw new ProcessException("Failed to acknowledge message", e);
            }
        }
    }

    private Map<String, String> buildAttributes(AMQP.BasicProperties basicProperties) {
        HashMap hashMap = new HashMap();
        addAttribute(hashMap, "amqp$appId", basicProperties.getAppId());
        addAttribute(hashMap, "amqp$contentEncoding", basicProperties.getContentEncoding());
        addAttribute(hashMap, "amqp$contentType", basicProperties.getContentType());
        addAttribute(hashMap, "amqp$headers", basicProperties.getHeaders());
        addAttribute(hashMap, "amqp$deliveryMode", basicProperties.getDeliveryMode());
        addAttribute(hashMap, "amqp$priority", basicProperties.getPriority());
        addAttribute(hashMap, "amqp$correlationId", basicProperties.getCorrelationId());
        addAttribute(hashMap, "amqp$replyTo", basicProperties.getReplyTo());
        addAttribute(hashMap, "amqp$expiration", basicProperties.getExpiration());
        addAttribute(hashMap, "amqp$messageId", basicProperties.getMessageId());
        addAttribute(hashMap, "amqp$timestamp", basicProperties.getTimestamp() == null ? null : Long.valueOf(basicProperties.getTimestamp().getTime()));
        addAttribute(hashMap, "amqp$type", basicProperties.getType());
        addAttribute(hashMap, "amqp$userId", basicProperties.getUserId());
        addAttribute(hashMap, "amqp$clusterId", basicProperties.getClusterId());
        return hashMap;
    }

    private void addAttribute(Map<String, String> map, String str, Object obj) {
        if (obj == null) {
            return;
        }
        map.put(str, obj.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    public synchronized AMQPConsumer createAMQPWorker(ProcessContext processContext, Connection connection) {
        try {
            return new AMQPConsumer(connection, processContext.getProperty(QUEUE).getValue(), processContext.getProperty(AUTO_ACKNOWLEDGE).asBoolean().booleanValue());
        } catch (IOException e) {
            try {
                connection.close();
                getLogger().warn("Closed connection at port " + connection.getPort());
                throw new ProcessException("Failed to connect to AMQP Broker", e);
            } catch (IOException e2) {
                throw new ProcessException("Failed to close connection at port " + connection.getPort());
            }
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    @OnStopped
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(QUEUE);
        arrayList.add(AUTO_ACKNOWLEDGE);
        arrayList.add(BATCH_SIZE);
        arrayList.addAll(getCommonPropertyDescriptors());
        propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
