package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.amqp.processors.AMQPWorker;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;

/* loaded from: input_file:org/apache/nifi/amqp/processors/AbstractAMQPProcessor.class */
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder().name("Host Name").description("Network address of AMQP broker (e.g., localhost)").required(true).defaultValue("localhost").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("Numeric value identifying Port of AMQP broker (e.g., 5671)").required(true).defaultValue("5672").addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder().name("Virtual Host").description("Virtual Host name which segregates AMQP system for enhanced security.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder().name("User Name").description("User Name used for authentication and authorization.").required(true).defaultValue("guest").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("Password").description("Password used for authentication and authorization.").required(true).defaultValue("guest").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder().name("AMQP Version").description("AMQP Version. Currently only supports AMQP v0.9.1.").required(true).allowableValues(new String[]{"0.9.1"}).defaultValue("0.9.1").build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("ssl-context-service").displayName("SSL Context Service").description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("ssl-client-auth").displayName("Client Auth").description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context has been defined and enabled.").required(false).allowableValues(SSLContextService.ClientAuth.values()).defaultValue("REQUIRED").build();
    static List<PropertyDescriptor> descriptors = new ArrayList();
    protected volatile Connection amqpConnection;
    protected volatile T targetResource;

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        synchronized (this) {
            buildTargetResource(processContext);
        }
        rendezvousWithAmqp(processContext, processSession);
    }

    @OnStopped
    public void close() {
        try {
            if (this.targetResource != null) {
                this.targetResource.close();
            }
        } catch (Exception e) {
            getLogger().warn("Failure while closing target resource " + this.targetResource, e);
        }
        try {
            if (this.amqpConnection != null) {
                this.amqpConnection.close();
            }
        } catch (IOException e2) {
            getLogger().warn("Failure while closing connection", e2);
        }
        this.amqpConnection = null;
    }

    protected abstract void rendezvousWithAmqp(ProcessContext processContext, ProcessSession processSession) throws ProcessException;

    protected abstract T finishBuildingTargetResource(ProcessContext processContext);

    private void buildTargetResource(ProcessContext processContext) {
        if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
            this.amqpConnection = createConnection(processContext);
            this.targetResource = finishBuildingTargetResource(processContext);
        }
    }

    private Connection createConnection(ProcessContext processContext) {
        SSLContext sSLContext;
        SSLContextService.ClientAuth valueOf;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(processContext.getProperty(HOST).getValue());
        connectionFactory.setPort(Integer.parseInt(processContext.getProperty(PORT).getValue()));
        connectionFactory.setUsername(processContext.getProperty(USER).getValue());
        connectionFactory.setPassword(processContext.getProperty(PASSWORD).getValue());
        String value = processContext.getProperty(V_HOST).getValue();
        if (value != null) {
            connectionFactory.setVirtualHost(value);
        }
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        String value2 = processContext.getProperty(CLIENT_AUTH).getValue();
        if (asControllerService != null) {
            if (StringUtils.isBlank(value2)) {
                valueOf = SSLContextService.ClientAuth.REQUIRED;
            } else {
                try {
                    valueOf = SSLContextService.ClientAuth.valueOf(value2);
                } catch (IllegalArgumentException e) {
                    throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", value2, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
                }
            }
            sSLContext = asControllerService.createSSLContext(valueOf);
        } else {
            sSLContext = null;
        }
        if (sSLContext != null) {
            connectionFactory.useSslProtocol(sSLContext);
        }
        try {
            return connectionFactory.newConnection();
        } catch (Exception e2) {
            throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + connectionFactory.toString(), e2);
        }
    }

    static {
        descriptors.add(HOST);
        descriptors.add(PORT);
        descriptors.add(V_HOST);
        descriptors.add(USER);
        descriptors.add(PASSWORD);
        descriptors.add(AMQP_VERSION);
        descriptors.add(SSL_CONTEXT_SERVICE);
        descriptors.add(CLIENT_AUTH);
    }
}
