package org.apache.nifi.amqp.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@WritesAttributes({@WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, description = "The App ID field from the AMQP Message"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, description = "The Content Encoding reported by the AMQP Message"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, description = "The Content Type reported by the AMQP Message"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, description = "The headers present on the AMQP Message. Added only if processor is configured to output this attribute."), @WritesAttribute(attribute = "<Header Key Prefix>.<attribute>", description = "Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, description = "The numeric indicator for the Message's Delivery Mode"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, description = "The Message priority"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, description = "The Message's Correlation ID"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, description = "The value of the Message's Reply-To field"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, description = "The Message Expiration"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, description = "The unique ID of the Message"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, description = "The timestamp of the Message, as the number of milliseconds since epoch"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_TYPE_ATTRIBUTE, description = "The type of message"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, description = "The ID of the user"), @WritesAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"), @WritesAttribute(attribute = ConsumeAMQP.AMQP_ROUTING_KEY_ATTRIBUTE, description = "The routingKey of the AMQP Message"), @WritesAttribute(attribute = ConsumeAMQP.AMQP_EXCHANGE_ATTRIBUTE, description = "The exchange from which AMQP Message was received")})
/* loaded from: input_file:org/apache/nifi/amqp/processors/ConsumeAMQP.class */
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
    public static final String AMQP_ROUTING_KEY_ATTRIBUTE = "amqp$routingKey";
    public static final String AMQP_EXCHANGE_ATTRIBUTE = "amqp$exchange";
    public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder().name("Queue").description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder().name("auto.acknowledge").displayName("Auto-Acknowledge Messages").description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("batch.size").displayName("Batch Size").description("The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("10").required(true).build();
    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder().name("prefetch.count").displayName("Prefetch Count").description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it.Allowed values: from 0 to 65535. 0 means no limit").addValidator(StandardValidators.createLongValidator(0, 65535, true)).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("0").required(true).build();
    public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder().name("header.format").displayName("Header Output Format").description("Defines how to output headers from the received message").allowableValues(OutputHeaderFormat.class).defaultValue(OutputHeaderFormat.COMMA_SEPARATED_STRING).required(true).build();
    public static final String DEFAULT_HEADERS_KEY_PREFIX = "consume.amqp";
    public static final PropertyDescriptor HEADER_KEY_PREFIX = new PropertyDescriptor.Builder().name("header.key.prefix").displayName("Header Key Prefix").description("Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property").defaultValue(DEFAULT_HEADERS_KEY_PREFIX).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dependsOn(HEADER_FORMAT, OutputHeaderFormat.ATTRIBUTES, new DescribedValue[0]).required(true).build();
    public static final PropertyDescriptor HEADER_SEPARATOR = new PropertyDescriptor.Builder().name("header.separator").displayName("Header Separator").description("The character that is used to separate key-value for header in String. The value must be only one character.").addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR).defaultValue(",").dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING, new DescribedValue[0]).required(false).build();
    static final PropertyDescriptor REMOVE_CURLY_BRACES = new PropertyDescriptor.Builder().name("remove.curly.braces").displayName("Remove Curly Braces").description("If true Remove Curly Braces, Curly Braces in the header will be automatically remove.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("False").allowableValues(new String[]{"True", "False"}).dependsOn(HEADER_FORMAT, OutputHeaderFormat.COMMA_SEPARATED_STRING, new DescribedValue[0]).required(false).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received from the AMQP queue are routed to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(Stream.of((Object[]) new PropertyDescriptor[]{QUEUE, AUTO_ACKNOWLEDGE, BATCH_SIZE, PREFETCH_COUNT, HEADER_FORMAT, HEADER_KEY_PREFIX, HEADER_SEPARATOR, REMOVE_CURLY_BRACES, MAX_INBOUND_MESSAGE_BODY_SIZE}), getCommonPropertyDescriptors().stream()).toList();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:org/apache/nifi/amqp/processors/ConsumeAMQP$OutputHeaderFormat.class */
    public enum OutputHeaderFormat implements DescribedValue {
        COMMA_SEPARATED_STRING("Comma-Separated String", "Comma-Separated String", "Put all headers as a string with the specified separator in the attribute 'amqp$headers'."),
        JSON_STRING("JSON String", "JSON String", "Format all headers as JSON string and output in the attribute 'amqp$headers'. It will include keys with null value as well."),
        ATTRIBUTES("FlowFile Attributes", "FlowFile Attributes", "Put each header as attribute of the flow file with a prefix specified in the properties");

        private final String value;
        private final String displayName;
        private final String description;

        OutputHeaderFormat(String str, String str2, String str3) {
            this.value = str;
            this.displayName = str2;
            this.description = str3;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.displayName;
        }

        public String getDescription() {
            return this.description;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    public void processResource(Connection connection, AMQPConsumer aMQPConsumer, ProcessContext processContext, ProcessSession processSession) {
        GetResponse getResponse = null;
        if (!connection.isOpen() || !aMQPConsumer.getChannel().isOpen()) {
            throw new AMQPException("AMQP client has lost connection.");
        }
        int i = 0;
        while (true) {
            if (i >= processContext.getProperty(BATCH_SIZE).asInteger().intValue()) {
                break;
            }
            GetResponse consume = aMQPConsumer.consume();
            if (consume != null) {
                FlowFile putAllAttributes = processSession.putAllAttributes(processSession.write(processSession.create(), outputStream -> {
                    outputStream.write(consume.getBody());
                }), buildAttributes(consume.getProps(), consume.getEnvelope(), processContext));
                processSession.getProvenanceReporter().receive(putAllAttributes, connection.toString() + "/" + processContext.getProperty(QUEUE).getValue());
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                getResponse = consume;
                i++;
            } else if (getResponse == null) {
                processContext.yield();
            }
        }
        if (getResponse != null) {
            GetResponse getResponse2 = getResponse;
            processSession.commitAsync(() -> {
                aMQPConsumer.acknowledge(getResponse2);
            }, (Consumer) null);
        }
    }

    private Map<String, String> buildAttributes(AMQP.BasicProperties basicProperties, Envelope envelope, ProcessContext processContext) {
        HashMap hashMap = new HashMap();
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_APPID_ATTRIBUTE, basicProperties.getAppId());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_CONTENT_ENCODING_ATTRIBUTE, basicProperties.getContentEncoding());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, basicProperties.getContentType());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_DELIVERY_MODE_ATTRIBUTE, basicProperties.getDeliveryMode());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_PRIORITY_ATTRIBUTE, basicProperties.getPriority());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_CORRELATION_ID_ATTRIBUTE, basicProperties.getCorrelationId());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_REPLY_TO_ATTRIBUTE, basicProperties.getReplyTo());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_EXPIRATION_ATTRIBUTE, basicProperties.getExpiration());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_MESSAGE_ID_ATTRIBUTE, basicProperties.getMessageId());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_TIMESTAMP_ATTRIBUTE, basicProperties.getTimestamp() == null ? null : Long.valueOf(basicProperties.getTimestamp().getTime()));
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_CONTENT_TYPE_ATTRIBUTE, basicProperties.getType());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_USER_ID_ATTRIBUTE, basicProperties.getUserId());
        addAttribute(hashMap, AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, basicProperties.getClusterId());
        addAttribute(hashMap, AMQP_ROUTING_KEY_ATTRIBUTE, envelope.getRoutingKey());
        addAttribute(hashMap, AMQP_EXCHANGE_ATTRIBUTE, envelope.getExchange());
        Map<String, Object> headers = basicProperties.getHeaders();
        if (headers != null) {
            addHeaderAttributes(hashMap, headers, (OutputHeaderFormat) processContext.getProperty(HEADER_FORMAT).asAllowableValue(OutputHeaderFormat.class), processContext);
        }
        return hashMap;
    }

    private void addHeaderAttributes(Map<String, String> map, Map<String, Object> map2, OutputHeaderFormat outputHeaderFormat, ProcessContext processContext) {
        switch (outputHeaderFormat) {
            case COMMA_SEPARATED_STRING:
                String convertMapToString = convertMapToString(map2, processContext.getProperty(HEADER_SEPARATOR).toString());
                if (!processContext.getProperty(REMOVE_CURLY_BRACES).asBoolean().booleanValue()) {
                    convertMapToString = "{" + convertMapToString + "}";
                }
                addAttribute(map, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, convertMapToString);
                return;
            case JSON_STRING:
                String str = null;
                try {
                    str = convertMapToJSONString(map2);
                } catch (JsonProcessingException e) {
                    getLogger().warn("Header formatting as JSON failed", e);
                }
                addAttribute(map, AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, str);
                return;
            case ATTRIBUTES:
                String value = processContext.getProperty(HEADER_KEY_PREFIX).getValue();
                map2.forEach((str2, obj) -> {
                    addAttribute(map, String.format("%s.%s", value, str2), obj);
                });
                return;
            default:
                return;
        }
    }

    private void addAttribute(Map<String, String> map, String str, Object obj) {
        if (obj == null) {
            return;
        }
        map.put(str, obj.toString());
    }

    private static String convertMapToString(Map<String, Object> map, String str) {
        return (String) map.entrySet().stream().map(entry -> {
            return entry.getValue() != null ? ((String) entry.getKey()) + "=" + String.valueOf(entry.getValue()) : (String) entry.getKey();
        }).collect(Collectors.joining(str));
    }

    private static String convertMapToJSONString(Map<String, Object> map) throws JsonProcessingException {
        return OBJECT_MAPPER.writeValueAsString(map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    public synchronized AMQPConsumer createAMQPWorker(ProcessContext processContext, Connection connection) {
        try {
            return new AMQPConsumer(connection, processContext.getProperty(QUEUE).getValue(), processContext.getProperty(AUTO_ACKNOWLEDGE).asBoolean().booleanValue(), processContext.getProperty(PREFETCH_COUNT).asInteger().intValue(), getLogger());
        } catch (IOException e) {
            throw new ProcessException("Failed to connect to AMQP Broker", e);
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    @OnStopped
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }

    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    @OnScheduled
    public /* bridge */ /* synthetic */ void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
    }

    @Override // org.apache.nifi.amqp.processors.AbstractAMQPProcessor
    public /* bridge */ /* synthetic */ void migrateProperties(PropertyConfiguration propertyConfiguration) {
        super.migrateProperties(propertyConfiguration);
    }
}
