package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationMetrics;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotImplementedException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSenderController.class */
public abstract class AMQPFederationSenderController implements SenderController {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final AMQPFederationRemotePolicyManager manager;
    protected final AMQPSessionContext session;
    protected final AMQPSessionCallback sessionSPI;
    protected final AMQPFederation federation;
    protected final AMQPFederationMetrics.ProducerMetrics metrics;
    protected final String controllerId = UUID.randomUUID().toString();
    protected MessageWriter standardMessageWriter;
    protected MessageWriter largeMessageWriter;
    protected MessageWriter coreMessageWriter;
    protected MessageWriter coreLargeMessageWriter;
    protected ProtonServerSenderContext senderContext;
    protected ServerConsumer serverConsumer;
    protected boolean tunnelCoreMessages;
    protected Consumer<ErrorCondition> resourceDeletedAction;
    protected final Consumer<AMQPFederationSenderController> closedListener;

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSenderController$CountedMessageWrites.class */
    private static class CountedMessageWrites implements MessageWriter {
        private final MessageWriter wrapped;
        private final AMQPFederationMetrics.ProducerMetrics metrics;

        CountedMessageWrites(MessageWriter messageWriter, AMQPFederationMetrics.ProducerMetrics producerMetrics) {
            this.wrapped = messageWriter;
            this.metrics = producerMetrics;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
        public void close() {
            this.wrapped.close();
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
        public MessageWriter open(MessageReference messageReference) {
            this.wrapped.open(messageReference);
            return this;
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
        public boolean isWriting() {
            return this.wrapped.isWriting();
        }

        @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter
        public void writeBytes(MessageReference messageReference) {
            try {
                this.wrapped.writeBytes(messageReference);
            } finally {
                this.metrics.incrementMessagesSent();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSenderController$Role.class */
    public enum Role {
        ADDRESS_PRODUCER,
        QUEUE_PRODUCER
    }

    public AMQPFederationSenderController(AMQPFederationRemotePolicyManager aMQPFederationRemotePolicyManager, AMQPFederationMetrics.ProducerMetrics producerMetrics, Consumer<AMQPFederationSenderController> consumer) throws ActiveMQAMQPException {
        this.manager = aMQPFederationRemotePolicyManager;
        this.federation = aMQPFederationRemotePolicyManager.getFederation();
        this.metrics = producerMetrics;
        this.session = this.federation.getSessionContext();
        this.sessionSPI = this.session.getSessionSPI();
        this.closedListener = consumer;
    }

    public abstract Role getRole();

    public final long getMessagesSent() {
        return this.metrics.getMessagesSent();
    }

    public final ActiveMQServer getServer() {
        return this.session.getServer();
    }

    public final ProtonServerSenderContext getSenderContext() {
        return this.senderContext;
    }

    public final ServerConsumer getServerConsumer() {
        return this.serverConsumer;
    }

    public final AMQPSessionContext getSessionContext() {
        return this.session;
    }

    public final AMQPSessionCallback getSessionCallback() {
        return this.sessionSPI;
    }

    public final AMQPFederation getFederation() {
        return this.federation;
    }

    public final AMQPFederationRemotePolicyManager getPolicyManager() {
        return this.manager;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    /* renamed from: init, reason: merged with bridge method [inline-methods] */
    public final ServerConsumer mo47init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
        Sender sender = protonServerSenderContext.getSender();
        Source remoteSource = sender.getRemoteSource();
        if (this.federation == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
        }
        if (remoteSource == null) {
            throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links.");
        }
        this.senderContext = protonServerSenderContext;
        this.tunnelCoreMessages = AmqpSupport.verifyOfferedCapabilities(sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
        this.serverConsumer = createServerConsumer(protonServerSenderContext);
        registerRemoteLinkClosedInterceptor(sender);
        registerSenderManagement();
        return this.serverConsumer;
    }

    protected abstract ServerConsumer createServerConsumer(ProtonServerSenderContext protonServerSenderContext) throws Exception;

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public final void close(boolean z) throws Exception {
        try {
            if (this.federation != null) {
                this.federation.removeLinkClosedInterceptor(this.controllerId);
            }
            if (this.closedListener != null) {
                this.closedListener.accept(this);
            }
        } finally {
            unregisterSenderManagement();
            if (z) {
                handleLinkRemotelyClosed();
            } else {
                handleLinkLocallyClosed(null);
            }
        }
    }

    protected void handleLinkRemotelyClosed() {
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public final void close(ErrorCondition errorCondition) {
        if (errorCondition != null) {
            try {
                if (AmqpError.RESOURCE_DELETED.equals(errorCondition.getCondition()) && this.resourceDeletedAction != null) {
                    this.resourceDeletedAction.accept(errorCondition);
                }
            } finally {
                unregisterSenderManagement();
                handleLinkLocallyClosed(errorCondition);
            }
        }
        if (this.federation != null) {
            this.federation.removeLinkClosedInterceptor(this.controllerId);
        }
        if (this.closedListener != null) {
            this.closedListener.accept(this);
        }
    }

    protected void handleLinkLocallyClosed(ErrorCondition errorCondition) {
    }

    private void registerSenderManagement() {
        try {
            this.federation.registerFederationProducerManagement(this);
        } catch (Exception e) {
            logger.trace("Ignored exception while adding sender to management: ", e);
        }
    }

    private void unregisterSenderManagement() {
        try {
            this.federation.unregisterFederationProducerManagement(this);
        } catch (Exception e) {
            logger.trace("Ignored exception while removing sender from management: ", e);
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public final MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext protonServerSenderContext, MessageReference messageReference) {
        MessageWriter messageWriter;
        MessageWriter messageWriter2;
        MessageWriter messageWriter3;
        MessageWriter messageWriter4;
        MessageWriter messageWriter5;
        MessageWriter messageWriter6;
        Message message = messageReference.getMessage();
        if (message instanceof AMQPMessage) {
            if (message.isLargeMessage()) {
                if (this.largeMessageWriter != null) {
                    messageWriter6 = this.largeMessageWriter;
                } else {
                    CountedMessageWrites countedMessageWrites = new CountedMessageWrites(new AMQPLargeMessageWriter(protonServerSenderContext), this.metrics);
                    messageWriter6 = countedMessageWrites;
                    this.largeMessageWriter = countedMessageWrites;
                }
                messageWriter2 = messageWriter6;
            } else {
                if (this.standardMessageWriter != null) {
                    messageWriter5 = this.standardMessageWriter;
                } else {
                    CountedMessageWrites countedMessageWrites2 = new CountedMessageWrites(new AMQPMessageWriter(protonServerSenderContext), this.metrics);
                    messageWriter5 = countedMessageWrites2;
                    this.standardMessageWriter = countedMessageWrites2;
                }
                messageWriter2 = messageWriter5;
            }
        } else if (!this.tunnelCoreMessages) {
            if (this.standardMessageWriter != null) {
                messageWriter = this.standardMessageWriter;
            } else {
                CountedMessageWrites countedMessageWrites3 = new CountedMessageWrites(new AMQPMessageWriter(protonServerSenderContext), this.metrics);
                messageWriter = countedMessageWrites3;
                this.standardMessageWriter = countedMessageWrites3;
            }
            messageWriter2 = messageWriter;
        } else if (message.isLargeMessage()) {
            if (this.coreLargeMessageWriter != null) {
                messageWriter4 = this.coreLargeMessageWriter;
            } else {
                CountedMessageWrites countedMessageWrites4 = new CountedMessageWrites(new AMQPTunneledCoreLargeMessageWriter(protonServerSenderContext), this.metrics);
                messageWriter4 = countedMessageWrites4;
                this.coreLargeMessageWriter = countedMessageWrites4;
            }
            messageWriter2 = messageWriter4;
        } else {
            if (this.coreMessageWriter != null) {
                messageWriter3 = this.coreMessageWriter;
            } else {
                CountedMessageWrites countedMessageWrites5 = new CountedMessageWrites(new AMQPTunneledCoreMessageWriter(protonServerSenderContext), this.metrics);
                messageWriter3 = countedMessageWrites5;
                this.coreMessageWriter = countedMessageWrites5;
            }
            messageWriter2 = messageWriter3;
        }
        return messageWriter2;
    }

    protected final void registerRemoteLinkClosedInterceptor(Sender sender) {
        Objects.requireNonNull(this.federation, "Initialization should have validated federation state before adding an interceptor");
        this.federation.addLinkClosedInterceptor(this.controllerId, link -> {
            if (link == sender) {
                return link.getRemoteCondition() == null || link.getRemoteCondition().getCondition() == null;
            }
            return false;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static RoutingType getRoutingType(Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol symbol : source.getCapabilities()) {
                if (AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
                    return RoutingType.MULTICAST;
                }
                if (AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
                    return RoutingType.ANYCAST;
                }
            }
        }
        return ActiveMQDefaultConfiguration.getDefaultRoutingType();
    }
}
