package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.amqp.processors.AMQPWorker;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextService;

/* loaded from: input_file:org/apache/nifi/amqp/processors/AbstractAMQPProcessor.class */
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
    public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder().name("Brokers").description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR).build();
    public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder().name("Host Name").description("Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.").required(false).defaultValue("localhost").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.").required(false).defaultValue("5672").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder().name("Virtual Host").description("Virtual Host name which segregates AMQP system for enhanced security.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor USER = new PropertyDescriptor.Builder().name("User Name").description("User Name used for authentication and authorization.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder().name("Password").description("Password used for authentication and authorization.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor AMQP_VERSION = new PropertyDescriptor.Builder().name("AMQP Version").description("AMQP Version. Currently only supports AMQP v0.9.1.").required(true).allowableValues(new String[]{"0.9.1"}).defaultValue("0.9.1").build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("ssl-context-service").displayName("SSL Context Service").description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor USE_CERT_AUTHENTICATION = new PropertyDescriptor.Builder().name("cert-authentication").displayName("Use Client Certificate Authentication").description("Authenticate using the SSL certificate rather than user name/password.").required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("ssl-client-auth").displayName("Client Auth").description("The property has no effect and therefore deprecated.").required(false).allowableValues(ClientAuth.values()).defaultValue("NONE").build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private BlockingQueue<AMQPResource<T>> resourceQueue;

    /* JADX INFO: Access modifiers changed from: protected */
    public static List<PropertyDescriptor> getCommonPropertyDescriptors() {
        return propertyDescriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.resourceQueue = new LinkedBlockingQueue(processContext.getMaxConcurrentTasks());
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        boolean isSet = validationContext.getProperty(USER).isSet();
        boolean isSet2 = validationContext.getProperty(PASSWORD).isSet();
        boolean isSet3 = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
        boolean booleanValue = validationContext.getProperty(USE_CERT_AUTHENTICATION).asBoolean().booleanValue();
        if (booleanValue && (isSet || isSet2)) {
            arrayList.add(new ValidationResult.Builder().subject("Authentication configuration").valid(false).explanation(String.format("'%s' with '%s' and '%s' cannot be configured at the same time", USER.getDisplayName(), PASSWORD.getDisplayName(), USE_CERT_AUTHENTICATION.getDisplayName())).build());
        }
        if (!booleanValue && (!isSet || !isSet2)) {
            arrayList.add(new ValidationResult.Builder().subject("Authentication configuration").valid(false).explanation(String.format("either '%s' with '%s' or '%s' must be configured", USER.getDisplayName(), PASSWORD.getDisplayName(), USE_CERT_AUTHENTICATION.getDisplayName())).build());
        }
        if (booleanValue && !isSet3) {
            arrayList.add(new ValidationResult.Builder().subject("SSL configuration").valid(false).explanation(String.format("'%s' has been set but no '%s' configured", USE_CERT_AUTHENTICATION.getDisplayName(), SSL_CONTEXT_SERVICE.getDisplayName())).build());
        }
        return arrayList;
    }

    public final void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        AMQPResource<T> poll = this.resourceQueue.poll();
        if (poll == null) {
            try {
                poll = createResource(processContext);
            } catch (Exception e) {
                getLogger().error("Failed to initialize AMQP client", e);
                processContext.yield();
                return;
            }
        }
        try {
            processResource(poll.getConnection(), poll.getWorker(), processContext, processSession);
            if (!this.resourceQueue.offer(poll)) {
                getLogger().info("Worker queue is full, closing AMQP client");
                closeResource(poll);
            }
        } catch (AMQPException | AMQPRollbackException e2) {
            getLogger().error("AMQP failure, dropping the client", e2);
            processContext.yield();
            closeResource(poll);
        } catch (Exception e3) {
            getLogger().error("Processor failure", e3);
            processContext.yield();
        }
    }

    @OnStopped
    public void close() {
        if (this.resourceQueue == null) {
            return;
        }
        while (true) {
            AMQPResource<T> poll = this.resourceQueue.poll();
            if (poll == null) {
                this.resourceQueue = null;
                return;
            }
            closeResource(poll);
        }
    }

    private void closeResource(AMQPResource<T> aMQPResource) {
        try {
            aMQPResource.close();
        } catch (Exception e) {
            getLogger().error("Failed to close AMQP Connection", e);
        }
    }

    protected abstract void processResource(Connection connection, T t, ProcessContext processContext, ProcessSession processSession) throws ProcessException;

    protected abstract T createAMQPWorker(ProcessContext processContext, Connection connection);

    private AMQPResource<T> createResource(ProcessContext processContext) {
        Connection connection = null;
        try {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("AMQP Consumer: " + getIdentifier()).build());
            connection = createConnection(processContext, newSingleThreadExecutor);
            return new AMQPResource<>(connection, createAMQPWorker(processContext, connection), newSingleThreadExecutor);
        } catch (Exception e) {
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e2) {
                    getLogger().error("Failed to close AMQP Connection", e2);
                }
            }
            throw e;
        }
    }

    private Address[] createHostsList(ProcessContext processContext) {
        return Address.parseAddresses(processContext.getProperty(BROKERS).evaluateAttributeExpressions().getValue());
    }

    protected Connection createConnection(ProcessContext processContext, ExecutorService executorService) {
        Connection newConnection;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(processContext.getProperty(USER).evaluateAttributeExpressions().getValue());
        connectionFactory.setPassword(processContext.getProperty(PASSWORD).getValue());
        String value = processContext.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
        if (value != null) {
            connectionFactory.setVirtualHost(value);
        }
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        Boolean asBoolean = processContext.getProperty(USE_CERT_AUTHENTICATION).asBoolean();
        if (asControllerService != null) {
            connectionFactory.useSslProtocol(asControllerService.createContext());
            if (asBoolean.booleanValue()) {
                connectionFactory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
            }
        }
        connectionFactory.setAutomaticRecoveryEnabled(false);
        connectionFactory.setExceptionHandler(new DefaultExceptionHandler() { // from class: org.apache.nifi.amqp.processors.AbstractAMQPProcessor.1
            public void handleUnexpectedConnectionDriverException(Connection connection, Throwable th) {
                AbstractAMQPProcessor.this.getLogger().error("Connection lost to server {}:{}.", new Object[]{connection.getAddress(), Integer.valueOf(connection.getPort()), th});
            }
        });
        try {
            if (processContext.getProperty(BROKERS).isSet()) {
                newConnection = connectionFactory.newConnection(executorService, createHostsList(processContext));
            } else {
                connectionFactory.setHost(processContext.getProperty(HOST).evaluateAttributeExpressions().getValue());
                connectionFactory.setPort(Integer.parseInt(processContext.getProperty(PORT).evaluateAttributeExpressions().getValue()));
                newConnection = connectionFactory.newConnection(executorService);
            }
            return newConnection;
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Failed to establish connection with AMQP Broker: %s:%s", connectionFactory.getHost(), Integer.valueOf(connectionFactory.getPort())), e);
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(BROKERS);
        arrayList.add(HOST);
        arrayList.add(PORT);
        arrayList.add(V_HOST);
        arrayList.add(USER);
        arrayList.add(PASSWORD);
        arrayList.add(AMQP_VERSION);
        arrayList.add(SSL_CONTEXT_SERVICE);
        arrayList.add(USE_CERT_AUTHENTICATION);
        arrayList.add(CLIENT_AUTH);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
