package cern.c2mon.server.daq.update;

import cern.c2mon.server.cache.ClusterCache;
import cern.c2mon.server.cache.ProcessCache;
import cern.c2mon.server.common.process.Process;
import cern.c2mon.server.daq.JmsContainerManager;
import cern.c2mon.server.daq.config.DaqProperties;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.SmartLifecycle;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
@ManagedResource(objectName = "cern.c2mon:name=processJmsContainerManager")
/* loaded from: input_file:cern/c2mon/server/daq/update/JmsContainerManagerImpl.class */
public class JmsContainerManagerImpl implements JmsContainerManager, SmartLifecycle {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsContainerManagerImpl.class);
    private static final long THREAD_IDLE_LIMIT = 60000;
    private ProcessCache processCache;
    private ThreadPoolTaskExecutor daqThreadPoolTaskExecutor;
    private ConnectionFactory updateConnectionFactory;
    private SessionAwareMessageListener<Message> listener;
    private Timer subscriptionChecker;
    private ClusterCache clusterCache;
    private static final long SUBSCRIPTION_CHECK_INTERVAL = 120000;
    private DaqProperties properties;
    private volatile boolean running = false;
    private ConcurrentHashMap<Long, DefaultMessageListenerContainer> jmsContainers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cern/c2mon/server/daq/update/JmsContainerManagerImpl$ContainerShutdownTask.class */
    public class ContainerShutdownTask implements Callable<DefaultMessageListenerContainer> {
        private DefaultMessageListenerContainer container;

        public ContainerShutdownTask(DefaultMessageListenerContainer defaultMessageListenerContainer) {
            this.container = defaultMessageListenerContainer;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public DefaultMessageListenerContainer call() throws Exception {
            this.container.shutdown();
            return this.container;
        }
    }

    /* loaded from: input_file:cern/c2mon/server/daq/update/JmsContainerManagerImpl$SubscriptionCheck.class */
    private class SubscriptionCheck extends TimerTask {
        private SubscriptionCheck() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            JmsContainerManagerImpl.this.clusterCache.acquireWriteLockOnKey(JmsContainerManager.CONFIG_LOCK_KEY);
            try {
                JmsContainerManagerImpl.LOGGER.debug("Checking JMS subscriptions are up to date.");
                try {
                    for (Long l : JmsContainerManagerImpl.this.processCache.getKeys()) {
                        if (!JmsContainerManagerImpl.this.jmsContainers.containsKey(l)) {
                            JmsContainerManagerImpl.this.subscribe((Process) JmsContainerManagerImpl.this.processCache.get(l));
                        }
                    }
                    for (Map.Entry entry : JmsContainerManagerImpl.this.jmsContainers.entrySet()) {
                        if (!JmsContainerManagerImpl.this.processCache.getKeys().contains(entry.getKey())) {
                            JmsContainerManagerImpl.this.unsubscribe((Long) entry.getKey());
                        }
                    }
                } catch (Exception e) {
                    JmsContainerManagerImpl.LOGGER.error("Unexpected exception caught while updating Process JMS containers", e);
                }
            } finally {
                JmsContainerManagerImpl.this.clusterCache.releaseWriteLockOnKey(JmsContainerManager.CONFIG_LOCK_KEY);
            }
        }
    }

    @Autowired
    public JmsContainerManagerImpl(ProcessCache processCache, @Qualifier("daqInConnectionFactory") ConnectionFactory connectionFactory, @Qualifier("sourceUpdateManager") SessionAwareMessageListener<Message> sessionAwareMessageListener, @Qualifier("clusterCache") ClusterCache clusterCache, ThreadPoolTaskExecutor threadPoolTaskExecutor, DaqProperties daqProperties) {
        this.processCache = processCache;
        this.updateConnectionFactory = connectionFactory;
        this.listener = sessionAwareMessageListener;
        this.clusterCache = clusterCache;
        this.daqThreadPoolTaskExecutor = threadPoolTaskExecutor;
        this.properties = daqProperties;
    }

    @PostConstruct
    public void init() {
        this.daqThreadPoolTaskExecutor.initialize();
        Iterator it = this.processCache.getKeys().iterator();
        while (it.hasNext()) {
            subscribe((Process) this.processCache.get((Long) it.next()), this.properties.getJms().getUpdate().getInitialConsumers());
        }
    }

    @Override // cern.c2mon.server.daq.JmsContainerManager
    public void subscribe(Process process) {
        LOGGER.trace("Subscribing to updates from Process " + process.getId());
        if (this.jmsContainers.containsKey(process.getId())) {
            LOGGER.warn("Attempt at creating a JMS listener container for a Process that already has one.");
        } else {
            subscribe(process, this.properties.getJms().getUpdate().getMaxConsumers()).start();
        }
    }

    private DefaultMessageListenerContainer subscribe(Process process, int i) {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(this.updateConnectionFactory);
        defaultMessageListenerContainer.setDestination(new ActiveMQQueue(this.properties.getJms().getQueuePrefix() + ".update." + process.getName()));
        defaultMessageListenerContainer.setMessageListener(this.listener);
        defaultMessageListenerContainer.setConcurrentConsumers(this.properties.getJms().getUpdate().getInitialConsumers());
        defaultMessageListenerContainer.setMaxConcurrentConsumers(i);
        defaultMessageListenerContainer.setSessionTransacted(this.properties.getJms().getUpdate().isTransacted());
        defaultMessageListenerContainer.setCacheLevel(3);
        defaultMessageListenerContainer.setAutoStartup(false);
        defaultMessageListenerContainer.setPhase(10);
        defaultMessageListenerContainer.setMaxMessagesPerTask(this.properties.getJms().getUpdate().getMaxMessagesPerTask());
        defaultMessageListenerContainer.setReceiveTimeout(this.properties.getJms().getUpdate().getReceiveTimeout());
        defaultMessageListenerContainer.setIdleTaskExecutionLimit(this.properties.getJms().getUpdate().getIdleTaskExecutionLimit());
        defaultMessageListenerContainer.setBeanName(process.getName() + " update JMS container");
        defaultMessageListenerContainer.setTaskExecutor(this.daqThreadPoolTaskExecutor);
        defaultMessageListenerContainer.setAcceptMessagesWhileStopping(false);
        this.jmsContainers.put(process.getId(), defaultMessageListenerContainer);
        defaultMessageListenerContainer.initialize();
        return defaultMessageListenerContainer;
    }

    @Override // cern.c2mon.server.daq.JmsContainerManager
    public void unsubscribe(Process process) {
        unsubscribe(process.getId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsubscribe(Long l) {
        LOGGER.trace("Unsubscribing from updates for Process " + l);
        if (!this.jmsContainers.containsKey(l)) {
            LOGGER.warn("Attempt to remove an unrecognized JMS listener container.");
            return;
        }
        Executors.newFixedThreadPool(1).submit(new ContainerShutdownTask(this.jmsContainers.get(l)));
        this.jmsContainers.remove(l);
    }

    @ManagedOperation(description = "Stop this JMS container")
    public void stopContainer(String str) {
        LOGGER.info("Stopping JMS container for Process " + str);
        this.jmsContainers.get(this.processCache.getProcessId(str)).stop();
    }

    @ManagedOperation(description = "Start this JMS container.")
    public void startContainer(String str) {
        LOGGER.info("Starting JMS container for Process " + str);
        this.jmsContainers.get(this.processCache.getProcessId(str)).start();
    }

    @ManagedOperation(description = "Get executor queue size.")
    public int getTaskQueueSize() {
        return this.daqThreadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size();
    }

    @ManagedOperation(description = "Get the number of active threads listening for JMS updates.")
    public int getNumActiveThreads() {
        return this.daqThreadPoolTaskExecutor.getActiveCount();
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        LOGGER.info("Starting Process JMS listeners...");
        Iterator<Map.Entry<Long, DefaultMessageListenerContainer>> it = this.jmsContainers.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().start();
        }
        LOGGER.info("Finished starting Process JMS listeners.");
        new Thread(new Runnable() { // from class: cern.c2mon.server.daq.update.JmsContainerManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < JmsContainerManagerImpl.this.properties.getJms().getUpdate().getConsumerWarmupTime() && JmsContainerManagerImpl.this.running; i++) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        JmsContainerManagerImpl.LOGGER.error("Interrupted during warm-up phase; starting all listener threads.", e);
                    }
                }
                if (JmsContainerManagerImpl.this.running) {
                    JmsContainerManagerImpl.LOGGER.info("Increasing max concurrent update consumers to operational value.");
                    Iterator it2 = JmsContainerManagerImpl.this.jmsContainers.entrySet().iterator();
                    while (it2.hasNext()) {
                        ((DefaultMessageListenerContainer) ((Map.Entry) it2.next()).getValue()).setMaxConcurrentConsumers(JmsContainerManagerImpl.this.properties.getJms().getUpdate().getMaxConsumers());
                    }
                }
            }
        }, "JmsContainer").start();
        this.subscriptionChecker = new Timer();
        this.subscriptionChecker.schedule(new SubscriptionCheck(), SUBSCRIPTION_CHECK_INTERVAL, SUBSCRIPTION_CHECK_INTERVAL);
    }

    public synchronized void stop() {
        try {
            LOGGER.info("Stopping JMS update containers listening for tag updates from the DAQ layer.");
            this.subscriptionChecker.cancel();
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), runnable -> {
                return new Thread(runnable, "StopDaqUpdate");
            });
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<Long, DefaultMessageListenerContainer>> it = this.jmsContainers.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(new ContainerShutdownTask(it.next().getValue()));
            }
            threadPoolExecutor.invokeAll(arrayList, 60L, TimeUnit.SECONDS);
            threadPoolExecutor.shutdown();
            this.jmsContainers.clear();
            this.daqThreadPoolTaskExecutor.shutdown();
        } catch (Exception e) {
            LOGGER.error("Exception caught while closing down the Spring listener/JMS thread pool", e);
        }
    }

    public boolean isAutoStartup() {
        return true;
    }

    public synchronized void stop(Runnable runnable) {
        this.running = false;
        stop();
        runnable.run();
    }

    public int getPhase() {
        return 10;
    }
}
