package cern.c2mon.client.core.jms.impl;

import cern.c2mon.client.common.listener.ClientRequestReportListener;
import cern.c2mon.client.core.config.C2monClientProperties;
import cern.c2mon.client.core.jms.AlarmListener;
import cern.c2mon.client.core.jms.BroadcastMessageListener;
import cern.c2mon.client.core.jms.ConnectionListener;
import cern.c2mon.client.core.jms.HeartbeatListener;
import cern.c2mon.client.core.jms.JmsProxy;
import cern.c2mon.client.core.jms.SupervisionListener;
import cern.c2mon.client.core.jms.TopicRegistrationDetails;
import cern.c2mon.client.core.listener.TagUpdateListener;
import cern.c2mon.shared.client.request.ClientRequestReport;
import cern.c2mon.shared.client.request.ClientRequestResult;
import cern.c2mon.shared.client.request.JsonRequest;
import com.google.gson.JsonSyntaxException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PreDestroy;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Component;

@Component("jmsProxy")
@ManagedResource(objectName = "cern.c2mon:type=JMS,name=JmsProxy")
/* loaded from: input_file:cern/c2mon/client/core/jms/impl/JmsProxyImpl.class */
public final class JmsProxyImpl implements JmsProxy {
    private static final Logger log = LoggerFactory.getLogger(JmsProxyImpl.class);
    private static final long SLEEP_BETWEEN_CONNECTION_ATTEMPTS = 5000;
    private static final int JMS_MESSAGE_TIMEOUT = 600000;
    private static final int DEFAULT_LISTENER_QUEUE_SIZE = 100;
    private static final int HIGH_LISTENER_QUEUE_SIZE = 10000;
    private ConnectionFactory jmsConnectionFactory;
    private ActiveMQConnection connection;
    private SupervisionListenerWrapper supervisionListenerWrapper;
    private BroadcastMessageListenerWrapper broadcastMessageListenerWrapper;
    private HeartbeatListenerWrapper heartbeatListenerWrapper;
    private AlarmListenerWrapper alarmListenerWrapper;
    private SlowConsumerListener slowConsumerListener;
    private Session alarmSession;
    private MessageConsumer alarmConsumer;
    private Destination supervisionTopic;
    private Destination heartbeatTopic;
    private Destination alarmTopic;
    private Destination adminMessageTopic = null;
    private volatile boolean connected = false;
    private volatile boolean running = false;
    private volatile boolean shutdownRequested = false;
    private ReentrantReadWriteLock.WriteLock connectingWriteLock = new ReentrantReadWriteLock().writeLock();
    private ReentrantReadWriteLock refreshLock = new ReentrantReadWriteLock();
    private ExecutorService topicPollingExecutor = Executors.newCachedThreadPool(new ThreadFactory() { // from class: cern.c2mon.client.core.jms.impl.JmsProxyImpl.1
        ThreadFactory defaultFactory = Executors.defaultThreadFactory();

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread newThread = this.defaultFactory.newThread(runnable);
            newThread.setDaemon(true);
            return newThread;
        }
    });
    private Map<MessageListenerWrapper, Session> sessions = new ConcurrentHashMap();
    private Map<String, MessageListenerWrapper> topicToWrapper = new ConcurrentHashMap();
    private Map<TagUpdateListener, TopicRegistrationDetails> registeredListeners = new ConcurrentHashMap();
    private ReentrantReadWriteLock.WriteLock listenerLock = new ReentrantReadWriteLock().writeLock();
    private Collection<ConnectionListener> connectionListeners = new ArrayList();
    private ReentrantReadWriteLock connectionListenersLock = new ReentrantReadWriteLock();

    @Autowired
    public JmsProxyImpl(@Qualifier("clientJmsConnectionFactory") ConnectionFactory connectionFactory, SlowConsumerListener slowConsumerListener, C2monClientProperties c2monClientProperties) {
        this.jmsConnectionFactory = connectionFactory;
        this.supervisionTopic = new ActiveMQTopic(c2monClientProperties.getJms().getSupervisionTopic());
        this.heartbeatTopic = new ActiveMQTopic(c2monClientProperties.getJms().getHeartbeatTopic());
        this.alarmTopic = new ActiveMQTopic(c2monClientProperties.getJms().getAlarmTopic());
        this.slowConsumerListener = slowConsumerListener;
        this.supervisionListenerWrapper = new SupervisionListenerWrapper(HIGH_LISTENER_QUEUE_SIZE, slowConsumerListener, this.topicPollingExecutor);
        this.supervisionListenerWrapper.start();
        this.broadcastMessageListenerWrapper = new BroadcastMessageListenerWrapper(DEFAULT_LISTENER_QUEUE_SIZE, slowConsumerListener, this.topicPollingExecutor);
        this.broadcastMessageListenerWrapper.start();
        this.heartbeatListenerWrapper = new HeartbeatListenerWrapper(DEFAULT_LISTENER_QUEUE_SIZE, slowConsumerListener, this.topicPollingExecutor);
        this.heartbeatListenerWrapper.start();
        this.alarmListenerWrapper = new AlarmListenerWrapper(HIGH_LISTENER_QUEUE_SIZE, slowConsumerListener, this.topicPollingExecutor);
        this.alarmListenerWrapper.start();
    }

    private void setActiveMQConnectionPrefix() {
        String str = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        String str2 = null;
        try {
            str2 = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            log.info("Couldn't get hostname", e);
        }
        String str3 = "C2MON-CLIENT-" + System.getProperty("user.name") + "@" + str2 + "[" + str + "]";
        if (this.jmsConnectionFactory instanceof ActiveMQConnectionFactory) {
            this.jmsConnectionFactory.setClientIDPrefix(str3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void connect() {
        while (!this.connected && !this.shutdownRequested) {
            try {
                this.connection = this.jmsConnectionFactory.createConnection();
                this.connection.start();
                this.connection.addTransportListener(this::startReconnectThread);
                refreshSubscriptions();
                this.connected = true;
            } catch (Exception e) {
                log.error("Exception caught while trying to refresh the JMS connection; sleeping 5s before retrying.", e);
                try {
                    wait(SLEEP_BETWEEN_CONNECTION_ATTEMPTS);
                } catch (InterruptedException e2) {
                    log.error("InterruptedException caught while waiting to reconnect.", e2);
                }
            }
        }
        if (this.connected) {
            notifyConnectionListenerOnConnection();
        }
    }

    private void ensureConnection() {
        if (this.running) {
            return;
        }
        init();
    }

    private void notifyConnectionListenerOnConnection() {
        this.connectionListenersLock.readLock().lock();
        try {
            Iterator<ConnectionListener> it = this.connectionListeners.iterator();
            while (it.hasNext()) {
                it.next().onConnection();
            }
        } finally {
            this.connectionListenersLock.readLock().unlock();
        }
    }

    private void notifyConnectionListenerOnDisconnection() {
        this.connectionListenersLock.readLock().lock();
        try {
            Iterator<ConnectionListener> it = this.connectionListeners.iterator();
            while (it.hasNext()) {
                it.next().onDisconnection();
            }
        } finally {
            this.connectionListenersLock.readLock().unlock();
        }
    }

    private synchronized void disconnect() {
        disconnectQuietly();
        notifyConnectionListenerOnDisconnection();
    }

    private synchronized void disconnectQuietly() {
        this.connected = false;
        Iterator<Map.Entry<String, MessageListenerWrapper>> it = this.topicToWrapper.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().stop();
        }
        if (this.connection == null || this.connection.isClosed() || !this.connection.isTransportFailed()) {
            return;
        }
        try {
            this.connection.close();
        } catch (JMSException e) {
            String str = "Exception caught while attempting to disconnect from JMS: " + e.getMessage();
            log.error(str);
            log.debug(str, e);
        }
    }

    private void startReconnectThread() {
        this.connectingWriteLock.lock();
        try {
            if (this.connected) {
                disconnect();
                new Thread(new Runnable() { // from class: cern.c2mon.client.core.jms.impl.JmsProxyImpl.2
                    @Override // java.lang.Runnable
                    public void run() {
                        JmsProxyImpl.this.connect();
                    }
                }).start();
            }
        } finally {
            this.connectingWriteLock.unlock();
        }
    }

    private void refreshSubscriptions() throws JMSException {
        this.refreshLock.writeLock().lock();
        try {
            try {
                if (!this.registeredListeners.isEmpty()) {
                    this.sessions.clear();
                    this.topicToWrapper.clear();
                    for (Map.Entry<TagUpdateListener, TopicRegistrationDetails> entry : this.registeredListeners.entrySet()) {
                        registerUpdateListener(entry.getKey(), entry.getValue());
                    }
                }
                if (this.alarmListenerWrapper.getListenerCount() > 0) {
                    subscribeToAlarmTopic();
                }
                subscribeToSupervisionTopic();
                subscribeToHeartbeatTopic();
                subscribeToAdminMessageTopic();
                this.refreshLock.writeLock().unlock();
            } catch (JMSException e) {
                log.error("Did not manage to refresh Topic subscriptions", e);
                throw e;
            }
        } catch (Throwable th) {
            this.refreshLock.writeLock().unlock();
            throw th;
        }
    }

    private void subscribeToAlarmTopic() throws JMSException {
        this.alarmSession = this.connection.createSession(false, 1);
        this.alarmConsumer = this.alarmSession.createConsumer(this.alarmTopic);
        this.alarmConsumer.setMessageListener(this.alarmListenerWrapper);
        log.debug("Successfully subscribed to alarm topic");
    }

    private void unsubscribeFromAlarmTopic() throws JMSException {
        this.alarmSession.close();
        this.alarmSession = null;
        this.alarmConsumer.close();
        this.alarmConsumer = null;
        log.debug("Successfully unsubscribed from alarm topic");
    }

    private void subscribeToHeartbeatTopic() throws JMSException {
        this.connection.createSession(false, 1).createConsumer(this.heartbeatTopic).setMessageListener(this.heartbeatListenerWrapper);
    }

    private void subscribeToAdminMessageTopic() throws JMSException {
        if (this.adminMessageTopic != null) {
            this.connection.createSession(false, 1).createConsumer(this.adminMessageTopic).setMessageListener(this.broadcastMessageListenerWrapper);
        }
    }

    private void subscribeToSupervisionTopic() throws JMSException {
        this.connection.createSession(false, 1).createConsumer(this.supervisionTopic).setMessageListener(this.supervisionListenerWrapper);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public boolean isRegisteredListener(TagUpdateListener tagUpdateListener) {
        if (tagUpdateListener == null) {
            throw new NullPointerException("isRegisteredListener() method called with null parameter!");
        }
        return this.registeredListeners.containsKey(tagUpdateListener);
    }

    /* JADX WARN: Finally extract failed */
    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerUpdateListener(TagUpdateListener tagUpdateListener, TopicRegistrationDetails topicRegistrationDetails) throws JMSException {
        if (topicRegistrationDetails == null) {
            throw new NullPointerException("Trying to register a TagUpdateListener with null RegistrationDetails!");
        }
        if (tagUpdateListener == null) {
            throw new NullPointerException("TagUpdateListener must not be null!");
        }
        ensureConnection();
        this.refreshLock.readLock().lock();
        try {
            this.listenerLock.lock();
            try {
                boolean isWriteLocked = this.refreshLock.isWriteLocked();
                if (isWriteLocked || !isRegisteredListener(tagUpdateListener)) {
                    if (!isWriteLocked) {
                        try {
                            if (!this.connected) {
                                throw new JMSException("Not currently connected - will attempt to subscribe on reconnection. Attempting to reconnect.");
                            }
                        } catch (JMSException e) {
                            log.error("Failed to subscribe to topic - will do so on reconnection.", e);
                            if (!isWriteLocked) {
                                this.registeredListeners.put(tagUpdateListener, topicRegistrationDetails);
                            }
                            throw e;
                        }
                    }
                    String topicName = topicRegistrationDetails.getTopicName();
                    if (this.topicToWrapper.containsKey(topicName)) {
                        this.topicToWrapper.get(topicName).addListener(tagUpdateListener, topicRegistrationDetails.getId());
                    } else {
                        Session createSession = this.connection.createSession(false, 1);
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(topicRegistrationDetails.getTopicName()));
                        MessageListenerWrapper messageListenerWrapper = new MessageListenerWrapper(topicRegistrationDetails.getId(), tagUpdateListener, HIGH_LISTENER_QUEUE_SIZE, this.slowConsumerListener, this.topicPollingExecutor);
                        messageListenerWrapper.start();
                        createConsumer.setMessageListener(messageListenerWrapper);
                        this.topicToWrapper.put(topicName, messageListenerWrapper);
                        this.sessions.put(messageListenerWrapper, createSession);
                    }
                    if (!isWriteLocked) {
                        this.registeredListeners.put(tagUpdateListener, topicRegistrationDetails);
                    }
                } else {
                    log.debug("Update listener already registered; skipping registration (for Tag " + topicRegistrationDetails.getId() + ")");
                }
                this.listenerLock.unlock();
            } catch (Throwable th) {
                this.listenerLock.unlock();
                throw th;
            }
        } finally {
            this.refreshLock.readLock().unlock();
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void replaceListener(TagUpdateListener tagUpdateListener, TagUpdateListener tagUpdateListener2) {
        if (tagUpdateListener == null && tagUpdateListener2 == null) {
            throw new NullPointerException("replaceListener(..) method called with null argument");
        }
        this.refreshLock.readLock().lock();
        try {
            this.listenerLock.lock();
            try {
                TopicRegistrationDetails topicRegistrationDetails = this.registeredListeners.get(tagUpdateListener);
                this.topicToWrapper.get(topicRegistrationDetails.getTopicName()).addListener(tagUpdateListener2, topicRegistrationDetails.getId());
                this.registeredListeners.put(tagUpdateListener2, this.registeredListeners.remove(tagUpdateListener));
                this.listenerLock.unlock();
            } catch (Throwable th) {
                this.listenerLock.unlock();
                throw th;
            }
        } finally {
            this.refreshLock.readLock().unlock();
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void publish(String str, String str2, long j) throws JMSException {
        if (str2 == null) {
            throw new NullPointerException("publish(..) method called with null queue name argument");
        }
        if (str == null) {
            throw new NullPointerException("publish(..) method called with null message argument");
        }
        if (!this.connected) {
            throw new JMSException("Not currently connected: unable to send message at this time.");
        }
        Session createSession = this.connection.createSession(false, 1);
        try {
            TextMessage createTextMessage = createSession.createTextMessage(str);
            MessageProducer createProducer = createSession.createProducer(new ActiveMQTopic(str2));
            createProducer.setDeliveryMode(1);
            createProducer.setTimeToLive(600000L);
            createProducer.send(createTextMessage);
            createSession.close();
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public <T extends ClientRequestResult> Collection<T> sendRequest(JsonRequest<T> jsonRequest, String str, int i, ClientRequestReportListener clientRequestReportListener) throws JMSException {
        if (str == null) {
            throw new NullPointerException("sendRequest(..) method called with null queue name argument");
        }
        if (jsonRequest == null) {
            throw new NullPointerException("sendRequest(..) method called with null request argument");
        }
        ensureConnection();
        if (!this.connected) {
            throw new JMSException("Not currently connected: unable to send request at this time.");
        }
        Session createSession = this.connection.createSession(false, 1);
        try {
            ObjectMessage createObjectMessage = jsonRequest.isObjectRequest() ? createSession.createObjectMessage((Serializable) jsonRequest.getObjectParameter()) : createSession.createTextMessage(jsonRequest.toJson());
            TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
            MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
            try {
                createObjectMessage.setJMSReplyTo(createTemporaryQueue);
                MessageProducer createProducer = createSession.createProducer(new ActiveMQQueue(str));
                createProducer.setDeliveryMode(1);
                createProducer.setTimeToLive(600000L);
                createProducer.send(createObjectMessage);
                while (this.connected && !this.shutdownRequested) {
                    ObjectMessage receive = createConsumer.receive(i);
                    if (receive == null) {
                        log.error("No reply received from server on ClientRequest. I was waiting for " + i + " milliseconds..");
                        throw new RuntimeException("No reply received from server - possible timeout?");
                    }
                    if (receive instanceof ObjectMessage) {
                        Collection<T> collection = (Collection) receive.getObject();
                        if (createConsumer != null) {
                            createConsumer.close();
                        }
                        createTemporaryQueue.delete();
                        createSession.close();
                        return collection;
                    }
                    Collection<T> handleJsonResponse = handleJsonResponse((TextMessage) receive, jsonRequest, clientRequestReportListener);
                    if (handleJsonResponse != null) {
                        createSession.close();
                        return handleJsonResponse;
                    }
                }
                throw new RuntimeException("Disconnected from JMS, so unable to process request.");
            } finally {
                if (createConsumer != null) {
                    createConsumer.close();
                }
                createTemporaryQueue.delete();
            }
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public <T extends ClientRequestResult> Collection<T> sendRequest(JsonRequest<T> jsonRequest, String str, int i) throws JMSException {
        return sendRequest(jsonRequest, str, i, null);
    }

    private <T extends ClientRequestResult> Collection<T> handleJsonResponse(TextMessage textMessage, JsonRequest<T> jsonRequest, ClientRequestReportListener clientRequestReportListener) throws JsonSyntaxException, JMSException {
        Collection<T> fromJsonResponse = jsonRequest.fromJsonResponse(textMessage.getText());
        if (fromJsonResponse.isEmpty()) {
            return fromJsonResponse;
        }
        T next = fromJsonResponse.iterator().next();
        if (!(next instanceof ClientRequestReport)) {
            return fromJsonResponse;
        }
        ClientRequestReport clientRequestReport = (ClientRequestReport) next;
        if (isResult(clientRequestReport)) {
            return fromJsonResponse;
        }
        handleJsonReportResponse(clientRequestReport, clientRequestReportListener);
        return null;
    }

    private void handleJsonReportResponse(ClientRequestReport clientRequestReport, ClientRequestReportListener clientRequestReportListener) {
        if (clientRequestReportListener == null) {
            log.debug("handleJsonReportResponse(): Received a report, but no reportListener is registered. Ignoring..");
            return;
        }
        if (clientRequestReport.isErrorReport()) {
            log.debug("handleJsonReportResponse(): Received an error report. Informing listener.");
            clientRequestReportListener.onErrorReportReceived(clientRequestReport);
            throw new RuntimeException("Error report received from server on client request: " + clientRequestReport.getErrorMessage());
        }
        if (!clientRequestReport.isProgressReport()) {
            log.warn("handleJsonReportResponse(): Received a report of unknown type. Ignoring..");
        } else {
            log.debug("handleJsonReportResponse(): Received a progress report. Informing listener.");
            clientRequestReportListener.onProgressReportReceived(clientRequestReport);
        }
    }

    private boolean isResult(ClientRequestReport clientRequestReport) {
        return clientRequestReport.isResult();
    }

    /* JADX WARN: Finally extract failed */
    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void unregisterUpdateListener(TagUpdateListener tagUpdateListener) {
        this.refreshLock.readLock().lock();
        try {
            this.listenerLock.lock();
            try {
                if (isRegisteredListener(tagUpdateListener)) {
                    TopicRegistrationDetails topicRegistrationDetails = this.registeredListeners.get(tagUpdateListener);
                    MessageListenerWrapper messageListenerWrapper = this.topicToWrapper.get(topicRegistrationDetails.getTopicName());
                    messageListenerWrapper.removeListener(topicRegistrationDetails.getId());
                    if (messageListenerWrapper.isEmpty()) {
                        log.trace("No listeners registered to topic " + topicRegistrationDetails.getTopicName() + " so closing down MessageListenerWrapper");
                        try {
                            try {
                                this.sessions.get(messageListenerWrapper).close();
                                messageListenerWrapper.stop();
                                this.sessions.remove(messageListenerWrapper);
                                this.topicToWrapper.remove(topicRegistrationDetails.getTopicName());
                            } catch (Throwable th) {
                                messageListenerWrapper.stop();
                                this.sessions.remove(messageListenerWrapper);
                                this.topicToWrapper.remove(topicRegistrationDetails.getTopicName());
                                throw th;
                            }
                        } catch (JMSException e) {
                            log.error("Failed to unregister properly from a Tag update; subscriptions will be refreshed.");
                            startReconnectThread();
                            messageListenerWrapper.stop();
                            this.sessions.remove(messageListenerWrapper);
                            this.topicToWrapper.remove(topicRegistrationDetails.getTopicName());
                        }
                    }
                    this.registeredListeners.remove(tagUpdateListener);
                }
                this.listenerLock.unlock();
            } catch (Throwable th2) {
                this.listenerLock.unlock();
                throw th2;
            }
        } finally {
            this.refreshLock.readLock().unlock();
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerConnectionListener(ConnectionListener connectionListener) {
        if (connectionListener == null) {
            throw new NullPointerException("registerConnectionListener(..) method called with null listener argument");
        }
        ensureConnection();
        this.connectionListenersLock.writeLock().lock();
        try {
            this.connectionListeners.add(connectionListener);
            if (this.connected) {
                connectionListener.onConnection();
            } else {
                connectionListener.onDisconnection();
            }
        } finally {
            this.connectionListenersLock.writeLock().unlock();
        }
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerSupervisionListener(SupervisionListener supervisionListener) {
        if (supervisionListener == null) {
            throw new NullPointerException("Trying to register null Supervision listener with JmsProxy.");
        }
        ensureConnection();
        this.supervisionListenerWrapper.addListener(supervisionListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void unregisterSupervisionListener(SupervisionListener supervisionListener) {
        if (supervisionListener == null) {
            throw new NullPointerException("Trying to unregister null Supervision listener from JmsProxy.");
        }
        this.supervisionListenerWrapper.removeListener(supervisionListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerBroadcastMessageListener(BroadcastMessageListener broadcastMessageListener) {
        if (broadcastMessageListener == null) {
            throw new NullPointerException("Trying to register null BroadcastMessage listener with JmsProxy.");
        }
        if (this.adminMessageTopic == null) {
            throw new IllegalStateException(String.format("Cannot register '%s' without having the admin message topic", BroadcastMessageListener.class.getSimpleName()));
        }
        this.broadcastMessageListenerWrapper.addListener(broadcastMessageListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void unregisterBroadcastMessageListener(BroadcastMessageListener broadcastMessageListener) {
        if (broadcastMessageListener == null) {
            throw new NullPointerException("Trying to unregister null BroadcastMessage listener from JmsProxy.");
        }
        this.broadcastMessageListenerWrapper.removeListener(broadcastMessageListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerAlarmListener(AlarmListener alarmListener) throws JMSException {
        if (alarmListener == null) {
            throw new NullPointerException("Trying to register null alarm listener with JmsProxy.");
        }
        ensureConnection();
        if (this.alarmListenerWrapper.getListenerCount() == 0) {
            try {
                subscribeToAlarmTopic();
            } catch (JMSException e) {
                log.error("Did not manage to subscribe To Alarm Topic.", e);
                throw e;
            }
        }
        this.alarmListenerWrapper.addListener(alarmListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void unregisterAlarmListener(AlarmListener alarmListener) throws JMSException {
        if (alarmListener == null) {
            throw new NullPointerException("Trying to unregister null alarm listener from JmsProxy.");
        }
        if (this.alarmListenerWrapper.getListenerCount() == 1) {
            try {
                unsubscribeFromAlarmTopic();
            } catch (JMSException e) {
                log.error("Did not manage to subscribe To Alarm Topic.", e);
                throw e;
            }
        }
        this.alarmListenerWrapper.removeListener(alarmListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void registerHeartbeatListener(HeartbeatListener heartbeatListener) {
        if (heartbeatListener == null) {
            throw new NullPointerException("Trying to register null Heartbeat listener with JmsProxy.");
        }
        ensureConnection();
        this.heartbeatListenerWrapper.addListener(heartbeatListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void unregisterHeartbeatListener(HeartbeatListener heartbeatListener) {
        if (heartbeatListener == null) {
            throw new NullPointerException("Trying to unregister null Heartbeat listener from JmsProxy.");
        }
        this.heartbeatListenerWrapper.removeListener(heartbeatListener);
    }

    @Override // cern.c2mon.client.core.jms.JmsProxy
    public void setBroadcastMessageTopic(Destination destination) {
        if (this.adminMessageTopic != null) {
            throw new IllegalStateException("Cannot set the admin message topic more than one time");
        }
        this.adminMessageTopic = destination;
        if (this.connected) {
            try {
                subscribeToAdminMessageTopic();
            } catch (JMSException e) {
                log.error("Unable to subscribe to the admin message topic, this functionality may not function properly.", e);
            }
        }
    }

    public void init() {
        this.running = true;
        this.shutdownRequested = false;
        setActiveMQConnectionPrefix();
        connect();
    }

    @PreDestroy
    public void stop() {
        log.debug("Stopping JmsProxy and dependent listeners");
        this.shutdownRequested = true;
        this.supervisionListenerWrapper.stop();
        this.alarmListenerWrapper.stop();
        this.broadcastMessageListenerWrapper.stop();
        this.heartbeatListenerWrapper.stop();
        this.topicPollingExecutor.shutdown();
        disconnectQuietly();
        this.connectionListenersLock.writeLock().lock();
        try {
            this.connectionListeners.clear();
            this.running = false;
        } finally {
            this.connectionListenersLock.writeLock().unlock();
        }
    }

    @ManagedOperation(description = "Get size of current internal listener queues")
    public Map<String, Integer> getQueueSizes() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, MessageListenerWrapper> entry : this.topicToWrapper.entrySet()) {
            hashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().getQueueSize()));
        }
        hashMap.put(this.supervisionTopic.toString(), Integer.valueOf(this.supervisionListenerWrapper.getQueueSize()));
        hashMap.put(this.alarmTopic.toString(), Integer.valueOf(this.alarmListenerWrapper.getQueueSize()));
        if (this.adminMessageTopic != null) {
            hashMap.put(this.adminMessageTopic.toString(), Integer.valueOf(this.broadcastMessageListenerWrapper.getQueueSize()));
        }
        hashMap.put(this.heartbeatTopic.toString(), Integer.valueOf(this.heartbeatListenerWrapper.getQueueSize()));
        return hashMap;
    }

    public ActiveMQConnection getConnection() {
        return this.connection;
    }
}
