package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Terminus;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventProcessor.class */
public class AMQPFederationEventProcessor extends ProtonAbstractReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int PROCESSOR_RECEIVER_CREDITS = 10;
    private static final int PROCESSOR_RECEIVER_CREDITS_LOW = 3;
    private final ActiveMQServer server;
    private final AMQPFederation federation;

    public AMQPFederationEventProcessor(AMQPFederation aMQPFederation, AMQPSessionContext aMQPSessionContext, Receiver receiver) {
        super(aMQPSessionContext.getSessionSPI(), aMQPSessionContext.getAMQPConnectionContext(), aMQPSessionContext, receiver);
        this.server = this.protonSession.getServer();
        this.federation = aMQPFederation;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        this.initialized = true;
        this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
        this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        if (this.receiver.getLocalState() != EndpointState.ACTIVE) {
            this.receiver.setOfferedCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_EVENT_LINK});
            Terminus remoteTarget = this.receiver.getRemoteTarget();
            if (remoteTarget == null || !remoteTarget.getDynamic()) {
                throw new ActiveMQAMQPInternalErrorException("Remote Terminus did not arrive as dynamic node: " + String.valueOf(remoteTarget));
            }
            remoteTarget.setAddress(this.receiver.getName());
        }
        this.federation.registerEventReceiver(this);
        topUpCreditIfNeeded();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction transaction) {
        logger.trace("{}::actualdelivery called for {}", this.server, message);
        AMQPMessage aMQPMessage = (AMQPMessage) message;
        delivery.setContext(message);
        try {
            Object messageAnnotationProperty = AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPFederationConstants.EVENT_TYPE);
            if (AMQPFederationConstants.REQUESTED_QUEUE_ADDED.equals(messageAnnotationProperty)) {
                Map<String, Object> decodeQueueAddedEvent = AMQPFederationEventSupport.decodeQueueAddedEvent(aMQPMessage);
                String obj = decodeQueueAddedEvent.get(AMQPFederationConstants.REQUESTED_ADDRESS_NAME).toString();
                String obj2 = decodeQueueAddedEvent.get(AMQPFederationConstants.REQUESTED_QUEUE_NAME).toString();
                logger.trace("Remote event indicates Queue added that matched a previous request [{}::{}]", obj, obj2);
                this.federation.processRemoteQueueAdded(obj, obj2);
            } else if (!AMQPFederationConstants.REQUESTED_ADDRESS_ADDED.equals(messageAnnotationProperty)) {
                this.federation.signalError(new ActiveMQAMQPInternalErrorException("Remote sent unknown event."));
                return;
            } else {
                String obj3 = AMQPFederationEventSupport.decodeAddressAddedEvent(aMQPMessage).get(AMQPFederationConstants.REQUESTED_ADDRESS_NAME).toString();
                logger.trace("Remote event indicates Address added that matched a previous request [{}]", obj3);
                this.federation.processRemoteAddressAdded(obj3);
            }
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
            topUpCreditIfNeeded();
            this.connection.flush();
        } catch (Throwable th) {
            logger.warn(th.getMessage(), th);
            this.federation.signalError(new ActiveMQAMQPInternalErrorException("Error while processing incoming event message: " + th.getMessage()));
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected Runnable createCreditRunnable(AMQPConnectionContext aMQPConnectionContext) {
        return createCreditRunnable(PROCESSOR_RECEIVER_CREDITS, 3, this.receiver, aMQPConnectionContext, this);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected void doCreditTopUpRun() {
        this.creditRunnable.run();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected SimpleString getAddressInUse() {
        return SimpleString.of(this.receiver.getName());
    }
}
