package cern.c2mon.server.daq.update;

import cern.c2mon.server.cache.ControlTagFacade;
import cern.c2mon.server.cache.DataTagFacade;
import cern.c2mon.server.cache.ProcessCache;
import cern.c2mon.server.cache.ProcessFacade;
import cern.c2mon.server.cache.exception.CacheElementNotFoundException;
import cern.c2mon.server.common.config.ServerProperties;
import cern.c2mon.server.common.process.Process;
import cern.c2mon.server.supervision.SupervisionManager;
import cern.c2mon.shared.common.datatag.DataTagValueUpdate;
import cern.c2mon.shared.common.datatag.SourceDataTagValue;
import cern.c2mon.shared.daq.datatag.DataTagValueUpdateConverter;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;

@Service("sourceUpdateManager")
@ManagedResource(objectName = "cern.c2mon:name=sourceUpdateManager")
/* loaded from: input_file:cern/c2mon/server/daq/update/SourceUpdateManagerImpl.class */
public class SourceUpdateManagerImpl implements SourceUpdateManager, SessionAwareMessageListener<Message> {
    private long lastEmailLog;
    private static final long EMAIL_FREQUENCY_MILLIS = 600000;
    private final DataTagValueUpdateConverter converter;
    private final DataTagFacade dataTagFacade;
    private final ControlTagFacade controlTagFacade;
    private final SupervisionManager supervisionManager;
    private final ProcessFacade processFacade;
    private final ProcessCache processCache;
    private final ServerProperties properties;
    private static final int WARNING_FREQUENCY = 100000;
    private static final int SWITCH_OFF_COUNTDOWN = 10000;
    private static final Logger log = LoggerFactory.getLogger(SourceUpdateManagerImpl.class);
    private static final Logger SMSLOGGER = LoggerFactory.getLogger("AdminSmsLogger");
    private static final Logger EMAILLOGGER = LoggerFactory.getLogger("AdminEmailLogger");
    private static final Boolean IGNORE_UPDATE = false;
    private static final Boolean ACCEPT_UPDATE = true;
    private volatile AtomicInteger activeUpdateThreads = new AtomicInteger(0);
    private volatile AtomicInteger warningCount = new AtomicInteger(0);
    private volatile boolean alarmActive = false;
    private volatile AtomicInteger switchOffCountDown = new AtomicInteger(SWITCH_OFF_COUNTDOWN);

    @Autowired
    public SourceUpdateManagerImpl(DataTagFacade dataTagFacade, ControlTagFacade controlTagFacade, SupervisionManager supervisionManager, DataTagValueUpdateConverter dataTagValueUpdateConverter, ProcessFacade processFacade, ProcessCache processCache, ServerProperties serverProperties) {
        this.dataTagFacade = dataTagFacade;
        this.controlTagFacade = controlTagFacade;
        this.supervisionManager = supervisionManager;
        this.converter = dataTagValueUpdateConverter;
        this.processFacade = processFacade;
        this.processCache = processCache;
        this.properties = serverProperties;
    }

    @Override // cern.c2mon.server.daq.update.SourceUpdateManager
    public void processUpdates(DataTagValueUpdate dataTagValueUpdate) {
        try {
            this.activeUpdateThreads.getAndIncrement();
            Collection<SourceDataTagValue> values = dataTagValueUpdate.getValues();
            if (values != null) {
                for (SourceDataTagValue sourceDataTagValue : values) {
                    if (sourceDataTagValue.isControlTag()) {
                        processControl(sourceDataTagValue);
                    } else {
                        processDataTag(sourceDataTagValue);
                    }
                    sourceDataTagValue.log();
                }
            }
            this.activeUpdateThreads.getAndDecrement();
            if (this.activeUpdateThreads.get() > 100) {
                this.alarmActive = true;
                this.switchOffCountDown = new AtomicInteger(SWITCH_OFF_COUNTDOWN);
                if (this.warningCount.getAndIncrement() % WARNING_FREQUENCY == 0) {
                    SMSLOGGER.warn("Over 100 source update threads active.");
                    return;
                }
                return;
            }
            if (this.alarmActive && this.switchOffCountDown.getAndDecrement() == 0) {
                this.alarmActive = false;
                SMSLOGGER.warn("Number of active update threads back to normal.");
                this.warningCount = new AtomicInteger(0);
            }
        } catch (Throwable th) {
            this.activeUpdateThreads.getAndDecrement();
            if (this.activeUpdateThreads.get() > 100) {
                this.alarmActive = true;
                this.switchOffCountDown = new AtomicInteger(SWITCH_OFF_COUNTDOWN);
                if (this.warningCount.getAndIncrement() % WARNING_FREQUENCY == 0) {
                    SMSLOGGER.warn("Over 100 source update threads active.");
                }
            } else if (this.alarmActive && this.switchOffCountDown.getAndDecrement() == 0) {
                this.alarmActive = false;
                SMSLOGGER.warn("Number of active update threads back to normal.");
                this.warningCount = new AtomicInteger(0);
            }
            throw th;
        }
    }

    public void onMessage(Message message, Session session) throws JMSException {
        try {
            DataTagValueUpdate dataTagValueUpdate = (DataTagValueUpdate) this.converter.fromMessage(message);
            if (checkProcessPIK(dataTagValueUpdate).booleanValue()) {
                processUpdates(dataTagValueUpdate);
            } else {
                log.warn("Received update(s) for Process #" + dataTagValueUpdate.getProcessId() + " with wrong PIK: Ignoring " + dataTagValueUpdate.getValues().size() + " updates");
            }
        } catch (MessageConversionException e) {
            log.error("Error processing incoming update from DAQ: message is being discarded!", e);
            if (System.currentTimeMillis() - this.lastEmailLog > EMAIL_FREQUENCY_MILLIS) {
                EMAILLOGGER.error("Error processing incoming update from DAQ: message is being discarded!", e);
                this.lastEmailLog = System.currentTimeMillis();
            }
        }
    }

    private void processControl(SourceDataTagValue sourceDataTagValue) {
        try {
            log.trace("Processing incoming update for control tag #" + sourceDataTagValue.getId());
            if (((Boolean) this.controlTagFacade.updateFromSource(sourceDataTagValue.getId(), sourceDataTagValue).getReturnValue()).booleanValue()) {
                this.supervisionManager.processControlTag(sourceDataTagValue);
            }
        } catch (CacheElementNotFoundException e) {
            log.warn("Received unrecognized control tag #" + sourceDataTagValue.getId() + ": ignoring the update");
        }
    }

    private void processDataTag(SourceDataTagValue sourceDataTagValue) {
        try {
            log.trace("Processing incoming update for datatag #" + sourceDataTagValue.getId());
            this.dataTagFacade.updateFromSource(sourceDataTagValue.getId(), sourceDataTagValue);
        } catch (CacheElementNotFoundException e) {
            log.warn("Received unrecognized data tag #" + sourceDataTagValue.getId() + ": ignoring the update");
        }
    }

    @ManagedAttribute(description = "Number of JMS container threads currently running in the server")
    public final AtomicInteger getActiveUpdateThreads() {
        return this.activeUpdateThreads;
    }

    private Boolean checkProcessPIK(DataTagValueUpdate dataTagValueUpdate) {
        this.processCache.acquireWriteLockOnKey(dataTagValueUpdate.getProcessId());
        try {
            try {
                Process process = (Process) this.processCache.get(dataTagValueUpdate.getProcessId());
                if (process.getProcessPIK() != null) {
                    if (dataTagValueUpdate.getProcessPIK() == null) {
                        log.warn(" Processing incoming update for Process " + process.getName() + ": PIK registered (" + process.getProcessPIK() + ") but no PIK received from update: Ignoring the update");
                        Boolean bool = IGNORE_UPDATE;
                        this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
                        return bool;
                    }
                    if (!process.getProcessPIK().equals(dataTagValueUpdate.getProcessPIK())) {
                        log.warn("Processing incoming updates for Process " + process.getName() + ": Received wrong PIK - cache vs update (" + process.getProcessPIK() + " vs " + dataTagValueUpdate.getProcessPIK() + "): Ignoring the update");
                        Boolean bool2 = IGNORE_UPDATE;
                        this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
                        return bool2;
                    }
                } else {
                    if (dataTagValueUpdate.getProcessPIK() == null) {
                        log.warn("Processing incoming update for Process " + process.getName() + " with no PIK: Ignoring the update");
                        Boolean bool3 = IGNORE_UPDATE;
                        this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
                        return bool3;
                    }
                    if (this.properties.isTestMode()) {
                        log.trace("[TEST] Processing incoming update for Process " + process.getName());
                    } else {
                        log.trace("Processing incoming update for Process " + process.getName() + " and saving PIK " + dataTagValueUpdate.getProcessPIK());
                        this.processFacade.setProcessPIK(process.getId(), dataTagValueUpdate.getProcessPIK());
                    }
                }
                this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
            } catch (CacheElementNotFoundException e) {
                log.warn("Receive updates from unrecognized Process #" + dataTagValueUpdate.getProcessId() + ": Ignoring the updates", e);
                this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
            }
            return ACCEPT_UPDATE;
        } catch (Throwable th) {
            this.processCache.releaseWriteLockOnKey(dataTagValueUpdate.getProcessId());
            throw th;
        }
    }
}
