package org.apache.activemq.artemis.protocol.amqp.proton;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.class */
public class AMQPLargeMessageReader implements MessageReader {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ProtonAbstractReceiver serverReceiver;
    private volatile AMQPLargeMessage currentMessage;
    private DeliveryAnnotations deliveryAnnotations;
    private boolean closed = true;

    public AMQPLargeMessageReader(ProtonAbstractReceiver protonAbstractReceiver) {
        this.serverReceiver = protonAbstractReceiver;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public DeliveryAnnotations getDeliveryAnnotations() {
        return this.deliveryAnnotations;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public void close() {
        if (this.closed) {
            return;
        }
        try {
            AMQPSessionCallback sessionSPI = this.serverReceiver.getSessionContext().getSessionSPI();
            if (this.currentMessage != null) {
                sessionSPI.execute(() -> {
                    try {
                    } catch (Throwable th) {
                        ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(th);
                    } finally {
                        this.currentMessage = null;
                    }
                    if (this.currentMessage != null) {
                        this.currentMessage.deleteFile();
                    }
                });
            }
        } catch (Exception e) {
            logger.trace("AMQP Large Message reader close ignored error: ", e);
        }
        this.deliveryAnnotations = null;
        this.closed = true;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public AMQPLargeMessageReader open() {
        if (!this.closed) {
            throw new IllegalStateException("Reader was not closed before call to open.");
        }
        this.closed = false;
        return this;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public Message readBytes(Delivery delivery) throws Exception {
        if (this.closed) {
            throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
        }
        try {
            this.serverReceiver.connection.requireInHandler();
            ReadableBuffer recv = delivery.getLink().recv();
            AMQPSessionCallback sessionSPI = this.serverReceiver.getSessionContext().getSessionSPI();
            if (this.currentMessage == null) {
                long generateID = sessionSPI.getStorageManager().generateID();
                AMQPLargeMessage aMQPLargeMessage = new AMQPLargeMessage(generateID, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
                aMQPLargeMessage.parseHeader(recv);
                sessionSPI.getStorageManager().onLargeMessageCreate(generateID, aMQPLargeMessage);
                this.currentMessage = aMQPLargeMessage;
            }
            this.serverReceiver.getConnection().disableAutoRead();
            boolean isPartial = delivery.isPartial();
            sessionSPI.execute(() -> {
                addBytes(delivery, recv, isPartial);
            });
            return null;
        } catch (Exception e) {
            this.serverReceiver.getConnection().enableAutoRead();
            throw e;
        }
    }

    private void addBytes(Delivery delivery, ReadableBuffer readableBuffer, boolean z) {
        AMQPLargeMessage aMQPLargeMessage = this.currentMessage;
        if (aMQPLargeMessage == null) {
            return;
        }
        try {
            try {
                aMQPLargeMessage.addBytes(readableBuffer);
                if (!z) {
                    aMQPLargeMessage.releaseResources(this.serverReceiver.getConnection().isLargeMessageSync(), true);
                    this.currentMessage = null;
                    this.serverReceiver.connection.runNow(() -> {
                        this.serverReceiver.onMessageComplete(delivery, aMQPLargeMessage, aMQPLargeMessage.getDeliveryAnnotations());
                    });
                }
            } catch (Throwable th) {
                this.serverReceiver.onExceptionWhileReading(th);
                AMQPConnectionContext aMQPConnectionContext = this.serverReceiver.connection;
                AMQPConnectionContext connection = this.serverReceiver.getConnection();
                Objects.requireNonNull(connection);
                aMQPConnectionContext.runNow(connection::enableAutoRead);
            }
        } finally {
            AMQPConnectionContext aMQPConnectionContext2 = this.serverReceiver.connection;
            AMQPConnectionContext connection2 = this.serverReceiver.getConnection();
            Objects.requireNonNull(connection2);
            aMQPConnectionContext2.runNow(connection2::enableAutoRead);
        }
    }
}
