package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationMetrics;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromResourcePolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumer.class */
public abstract class AMQPFederationConsumer implements FederationConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected static final Symbol[] OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
    protected static final Modified DEFAULT_OUTCOME = new Modified();
    protected static final AtomicLong LINK_SEQUENCE_ID;
    protected final AMQPFederationLocalPolicyManager manager;
    protected final AMQPFederation federation;
    protected final AMQPFederationConsumerConfiguration configuration;
    protected final FederationConsumerInfo consumerInfo;
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionContext session;
    protected final Transformer transformer;
    protected final AMQPFederationMetrics.ConsumerMetrics metrics;
    protected volatile boolean initialized;
    protected ProtonAbstractReceiver receiver;
    protected Receiver protonReceiver;
    protected Consumer<AMQPFederationConsumer> remoteOpenHandler;
    protected Consumer<AMQPFederationConsumer> remoteCloseHandler;
    protected final Predicate<Link> remoteCloseInterceptor = this::remoteLinkClosedInterceptor;
    protected final AtomicBoolean closed = new AtomicBoolean();

    public AMQPFederationConsumer(AMQPFederationLocalPolicyManager aMQPFederationLocalPolicyManager, AMQPFederationConsumerConfiguration aMQPFederationConsumerConfiguration, AMQPSessionContext aMQPSessionContext, FederationConsumerInfo federationConsumerInfo, FederationReceiveFromResourcePolicy federationReceiveFromResourcePolicy, AMQPFederationMetrics.ConsumerMetrics consumerMetrics) {
        this.manager = aMQPFederationLocalPolicyManager;
        this.federation = aMQPFederationLocalPolicyManager.getFederation();
        this.consumerInfo = federationConsumerInfo;
        this.connection = aMQPSessionContext.getAMQPConnectionContext();
        this.session = aMQPSessionContext;
        this.configuration = aMQPFederationConsumerConfiguration;
        this.metrics = consumerMetrics;
        TransformerConfiguration transformerConfiguration = federationReceiveFromResourcePolicy.getTransformerConfiguration();
        if (transformerConfiguration != null) {
            this.transformer = this.federation.getServer().getServiceRegistry().getFederationTransformer(federationReceiveFromResourcePolicy.getPolicyName(), transformerConfiguration);
        } else {
            this.transformer = message -> {
                return message;
            };
        }
    }

    public final FederationConsumerInfo.Role getRole() {
        return this.consumerInfo.getRole();
    }

    public final long getMessagesReceived() {
        return this.metrics.getMessagesReceived();
    }

    public AMQPFederationLocalPolicyManager getPolicyManager() {
        return this.manager;
    }

    public AMQPFederationConsumerConfiguration getConfiguration() {
        return this.configuration;
    }

    public abstract int getReceiverIdleTimeout();

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer
    public final AMQPFederation getFederation() {
        return this.federation;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer
    public final FederationConsumerInfo getConsumerInfo() {
        return this.consumerInfo;
    }

    public final boolean isInitialized() {
        return this.initialized;
    }

    public void initialize() {
        if (this.initialized) {
            throw new IllegalStateException("A receiver should only be initialized once");
        }
        this.initialized = true;
        this.connection.runLater(this::doCreateReceiver);
    }

    protected abstract void doCreateReceiver();

    public final void startAsync(AMQPFederationAsyncCompletion<AMQPFederationConsumer> aMQPFederationAsyncCompletion) {
        Objects.requireNonNull(aMQPFederationAsyncCompletion, "The asynchronous completion object cannot be null");
        if (this.closed.get()) {
            throw new IllegalStateException("The consumer has already been closed.");
        }
        if (!this.initialized) {
            throw new IllegalStateException("A consumer must be initialized before a start call");
        }
        this.connection.runLater(() -> {
            try {
            } catch (Exception e) {
                aMQPFederationAsyncCompletion.onException(this, e);
            }
            if (this.receiver == null) {
                throw new IllegalStateException("The consumer was either not initialized or the receiver create failed");
            }
            this.receiver.start();
            aMQPFederationAsyncCompletion.onComplete(this);
            this.connection.flush();
        });
    }

    public final void stopAsync(AMQPFederationAsyncCompletion<AMQPFederationConsumer> aMQPFederationAsyncCompletion) {
        Objects.requireNonNull(aMQPFederationAsyncCompletion, "The asynchronous completion object cannot be null");
        if (!this.initialized) {
            throw new IllegalStateException("A receiver must be initialized before a stop call");
        }
        this.connection.runLater(() -> {
            try {
            } catch (Exception e) {
                aMQPFederationAsyncCompletion.onException(this, e);
            }
            if (this.receiver == null) {
                throw new IllegalStateException("The consumer was either not yet initialized or the receiver create failed");
            }
            this.receiver.stop(this.configuration.getReceiverQuiesceTimeout(), (protonAbstractReceiver, bool) -> {
                try {
                    if (bool.booleanValue()) {
                        aMQPFederationAsyncCompletion.onComplete(this);
                    } else {
                        aMQPFederationAsyncCompletion.onException(this, new TimeoutException("Timed out waiting for the AMQP link to stop"));
                    }
                } catch (Exception e2) {
                    logger.trace("Caught error running provided completion callback: ", e2);
                }
            });
            this.connection.flush();
        });
    }

    public final void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.runLater(() -> {
                this.federation.removeLinkClosedInterceptor(this.consumerInfo.getId());
                if (this.receiver != null) {
                    try {
                        this.receiver.close(false);
                    } catch (ActiveMQAMQPException e) {
                    } finally {
                        this.receiver = null;
                    }
                }
                if (this.protonReceiver != null) {
                    try {
                        this.protonReceiver.close();
                    } finally {
                        this.protonReceiver = null;
                    }
                }
                this.connection.flush();
            });
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public final AMQPFederationConsumer setRemoteOpenHandler(Consumer<AMQPFederationConsumer> consumer) {
        if (this.protonReceiver != null) {
            throw new IllegalStateException("Cannot set a remote opened handler after the consumer is started");
        }
        this.remoteOpenHandler = consumer;
        return this;
    }

    public final AMQPFederationConsumer setRemoteClosedHandler(Consumer<AMQPFederationConsumer> consumer) {
        if (this.protonReceiver != null) {
            throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
        }
        this.remoteCloseHandler = consumer;
        return this;
    }

    protected final boolean remoteLinkClosedInterceptor(Link link) {
        if (link != this.protonReceiver || link.getRemoteCondition() == null || link.getRemoteCondition().getCondition() == null) {
            return false;
        }
        Symbol condition = link.getRemoteCondition().getCondition();
        return AmqpSupport.RESOURCE_DELETED.equals(condition) || AmqpSupport.NOT_FOUND.equals(condition) || AmqpSupport.DETACH_FORCED.equals(condition);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void recordFederatedMessageReceived(Message message) {
        this.metrics.incrementMessagesReceived();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void signalPluginBeforeFederationConsumerMessageHandled(Message message) throws ActiveMQException {
        try {
            this.federation.getServer().callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).beforeFederationConsumerMessageHandled(this, message);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("beforeFederationConsumerMessageHandled", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void signalPluginAfterFederationConsumerMessageHandled(Message message) throws ActiveMQException {
        try {
            this.federation.getServer().callBrokerAMQPFederationPlugins(aMQPFederationBrokerPlugin -> {
                if (aMQPFederationBrokerPlugin instanceof ActiveMQServerAMQPFederationPlugin) {
                    ((ActiveMQServerAMQPFederationPlugin) aMQPFederationBrokerPlugin).afterFederationConsumerMessageHandled(this, message);
                }
            });
        } catch (ActiveMQException e) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError("afterFederationConsumerMessageHandled", e);
        }
    }

    static {
        DEFAULT_OUTCOME.setDeliveryFailed(true);
        LINK_SEQUENCE_ID = new AtomicLong();
    }
}
