package org.apache.nifi.amqp.processors;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import org.apache.nifi.logging.ComponentLog;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/nifi/amqp/processors/AMQPConsumer.class */
public final class AMQPConsumer extends AMQPWorker {
    private final String queueName;
    private final BlockingQueue<GetResponse> responseQueue;
    private final boolean autoAcknowledge;
    private final Consumer consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AMQPConsumer(Connection connection, String str, boolean z, int i, final ComponentLog componentLog) throws IOException {
        super(connection, componentLog);
        validateStringProperty("queueName", str);
        this.queueName = str;
        this.autoAcknowledge = z;
        this.responseQueue = new LinkedBlockingQueue(10);
        componentLog.info("Successfully connected AMQPConsumer to {} and '{}' queue", new Object[]{connection, str});
        Channel channel = getChannel();
        this.consumer = new DefaultConsumer(channel) { // from class: org.apache.nifi.amqp.processors.AMQPConsumer.1
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                if (AMQPConsumer.this.closed) {
                    componentLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{Long.valueOf(envelope.getDeliveryTag())});
                    return;
                }
                try {
                    AMQPConsumer.this.responseQueue.put(new GetResponse(envelope, basicProperties, bArr, Integer.MAX_VALUE));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            public void handleCancel(String str2) throws IOException {
                componentLog.error("Consumer has been cancelled by the broker, eg. due to deleted queue.");
                try {
                    AMQPConsumer.this.close();
                } catch (Exception e) {
                    componentLog.error("Failed to close consumer.", e);
                }
            }
        };
        channel.basicQos(i);
        channel.basicConsume(str, z, this.consumer);
    }

    int getResponseQueueSize() {
        return this.responseQueue.size();
    }

    public GetResponse consume() {
        return this.responseQueue.poll();
    }

    public void acknowledge(GetResponse getResponse) {
        if (this.autoAcknowledge) {
            return;
        }
        try {
            getChannel().basicAck(getResponse.getEnvelope().getDeliveryTag(), true);
        } catch (Exception e) {
            throw new AMQPException("Failed to acknowledge message", e);
        }
    }

    @Override // org.apache.nifi.amqp.processors.AMQPWorker, java.lang.AutoCloseable
    public void close() throws TimeoutException, IOException {
        try {
            super.close();
            while (true) {
                try {
                    GetResponse poll = this.responseQueue.poll();
                    if (poll == null) {
                        return;
                    } else {
                        this.processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{Long.valueOf(poll.getEnvelope().getDeliveryTag())});
                    }
                } catch (Exception e) {
                    this.processorLog.error("Failed to drain response queue.");
                    return;
                }
            }
        } catch (Throwable th) {
            while (true) {
                try {
                    GetResponse poll2 = this.responseQueue.poll();
                    if (poll2 == null) {
                        break;
                    } else {
                        this.processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{Long.valueOf(poll2.getEnvelope().getDeliveryTag())});
                    }
                } catch (Exception e2) {
                    this.processorLog.error("Failed to drain response queue.");
                }
            }
            throw th;
        }
    }

    @Override // org.apache.nifi.amqp.processors.AMQPWorker
    public String toString() {
        return super.toString() + ", QUEUE:" + this.queueName;
    }
}
