package cern.c2mon.server.cache.listener;

import cern.c2mon.server.cache.C2monBufferedCacheListener;
import cern.c2mon.server.cache.C2monCacheListener;
import cern.c2mon.server.common.component.Lifecycle;
import cern.c2mon.shared.common.Cacheable;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cern/c2mon/server/cache/listener/AbstractBufferedCacheListener.class */
public abstract class AbstractBufferedCacheListener<T extends Cacheable, S> implements C2monCacheListener<T>, Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(AbstractBufferedCacheListener.class);
    private static final int MAX_TO_LISTENER = 20000;
    private static final long CHECK_FREQUENCY = 500;
    private int frequency;
    private C2monBufferedCacheListener<S> bufferedCacheListener;
    private final LinkedBlockingQueue<S> onUpdateQueue = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<S> statusConfirmationQueue = new LinkedBlockingQueue<>();
    private volatile boolean enabled = false;

    public AbstractBufferedCacheListener(C2monBufferedCacheListener<S> c2monBufferedCacheListener, int i) {
        this.bufferedCacheListener = c2monBufferedCacheListener;
        this.frequency = i;
    }

    abstract S getDerivedObject(T t);

    @Override // cern.c2mon.server.cache.C2monCacheListener
    public void confirmStatus(T t) {
        if (!this.enabled) {
            log.warn("Updated notification received with listener disabled");
            throw new IllegalStateException("Updated notification received with listener disabled");
        }
        try {
            this.statusConfirmationQueue.put(getDerivedObject(t));
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting to insert into queue", e);
        }
    }

    @Override // cern.c2mon.server.cache.C2monCacheListener
    public void notifyElementUpdated(T t) {
        if (!this.enabled) {
            String str = "Update notification received with listener disabled for " + this.bufferedCacheListener.getThreadName();
            log.warn(str);
            throw new IllegalStateException(str);
        }
        try {
            this.onUpdateQueue.put(getDerivedObject(t));
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting to insert key into queue", e);
        }
    }

    public void notifyElementsUpdated(Collection<T> collection) {
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            notifyElementUpdated(it.next());
        }
    }

    public synchronized boolean isRunning() {
        return this.enabled;
    }

    public synchronized void start() {
        if (this.enabled) {
            return;
        }
        log.info("Starting BufferedCacheListener for {}", this.bufferedCacheListener.getThreadName());
        new Thread(() -> {
            while (true) {
                if (!this.enabled && this.onUpdateQueue.isEmpty() && this.statusConfirmationQueue.isEmpty()) {
                    return;
                }
                long currentTimeMillis = System.currentTimeMillis();
                flush();
                while (System.currentTimeMillis() - currentTimeMillis < this.frequency) {
                    try {
                        Thread.sleep(CHECK_FREQUENCY);
                    } catch (InterruptedException e) {
                        log.error("Sleep interrupted in BufferedCacheListener thread");
                    }
                }
            }
        }, this.bufferedCacheListener.getThreadName()).start();
        this.enabled = true;
    }

    public synchronized void stop() {
        if (this.enabled) {
            log.info("Shutting down BufferedKeyCacheListener for {}", this.bufferedCacheListener.getThreadName());
            this.enabled = false;
            flush();
        }
    }

    private synchronized void flush() {
        if (!this.onUpdateQueue.isEmpty()) {
            LinkedList linkedList = new LinkedList();
            this.onUpdateQueue.drainTo(linkedList, MAX_TO_LISTENER);
            if (!linkedList.isEmpty()) {
                try {
                    this.bufferedCacheListener.notifyElementUpdated(linkedList);
                } catch (Exception e) {
                    log.error("Uncaught exception occured in {} whilst notifying for update of {} elements!", new Object[]{this.bufferedCacheListener.getThreadName(), Integer.valueOf(linkedList.size()), e});
                }
            }
        }
        if (this.statusConfirmationQueue.isEmpty()) {
            return;
        }
        LinkedList linkedList2 = new LinkedList();
        this.statusConfirmationQueue.drainTo(linkedList2, MAX_TO_LISTENER);
        if (linkedList2.isEmpty()) {
            return;
        }
        try {
            this.bufferedCacheListener.confirmStatus(linkedList2);
        } catch (Exception e2) {
            log.error("Uncaught exception occured in {} whilst confirming status of {} cache objects!", new Object[]{this.bufferedCacheListener.getThreadName(), Integer.valueOf(linkedList2.size()), e2});
        }
    }

    public boolean isEnabled() {
        return this.enabled;
    }
}
