package fr.inria.eventcloud.proxies;

import com.hp.hpl.jena.graph.Node;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.QuadruplePattern;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.BindingNotificationListener;
import fr.inria.eventcloud.api.listeners.CompoundEventNotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.listeners.SignalNotificationListener;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.formatters.QuadruplesFormatter;
import fr.inria.eventcloud.messages.request.can.ReconstructCompoundEventRequest;
import fr.inria.eventcloud.messages.request.can.RemoveEphemeralSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.UnsubscribeRequest;
import fr.inria.eventcloud.messages.response.can.QuadruplePatternResponse;
import fr.inria.eventcloud.pubsub.PublishSubscribeUtils;
import fr.inria.eventcloud.pubsub.Subscription;
import fr.inria.eventcloud.pubsub.Subsubscription;
import fr.inria.eventcloud.pubsub.notifications.BindingNotification;
import fr.inria.eventcloud.pubsub.notifications.Notification;
import fr.inria.eventcloud.pubsub.notifications.NotificationId;
import fr.inria.eventcloud.pubsub.notifications.PollingSignalNotification;
import fr.inria.eventcloud.pubsub.notifications.QuadruplesNotification;
import fr.inria.eventcloud.pubsub.notifications.SignalNotification;
import fr.inria.eventcloud.pubsub.solutions.BindingSolution;
import fr.inria.eventcloud.pubsub.solutions.QuadruplesSolution;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.annotation.multiactivity.DefineGroups;
import org.objectweb.proactive.annotation.multiactivity.Group;
import org.objectweb.proactive.annotation.multiactivity.MemberOf;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.core.component.body.ComponentEndActive;
import org.objectweb.proactive.extensions.p2p.structured.proxies.Proxies;
import org.objectweb.proactive.multiactivity.MultiActiveService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DefineGroups({@Group(name = "parallel", selfCompatible = true)})
/* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl.class */
public class SubscribeProxyImpl extends AbstractProxy implements ComponentEndActive, SubscribeProxy, SubscribeProxyAttributeController {
    private static final long serialVersionUID = 130;
    private static final String NOTIFICATIONS_DELIVERED_MAP_NAME = "notificationsDelivered";
    public static final String SUBSCRIBE_PROXY_ADL = "fr.inria.eventcloud.proxies.SubscribeProxy";
    public static final String SUBSCRIBE_SERVICES_ITF = "subscribe-services";
    private static final Logger log = LoggerFactory.getLogger(SubscribeProxyImpl.class);
    private ConcurrentMap<SubscriptionId, SubscriptionEntry<?>> subscriptions;
    private ConcurrentMap<NotificationId, BindingSolution> bindingSolutions;
    private ConcurrentMap<NotificationId, QuadruplesSolution> quadruplesSolutions;
    private DB notificationsDeliveredDB;
    private String componentUri;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl$SubscriptionEntry.class */
    public static final class SubscriptionEntry<T extends NotificationListener<?>> {
        private final Subscription subscription;
        private final T listener;

        public SubscriptionEntry(Subscription subscription, T t) {
            this.subscription = subscription;
            this.listener = t;
        }
    }

    @Override // fr.inria.eventcloud.proxies.AbstractProxy
    public void initComponentActivity(Body body) {
        super.initComponentActivity(body);
        body.setImmediateService("setImmediateServices", false);
        body.setImmediateService("setAttributes", false);
        createAndRegisterNotificationsDeliveredDB(body);
    }

    public void runComponentActivity(Body body) {
        new MultiActiveService(body).multiActiveServing(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue(), false, false);
    }

    public void endComponentActivity(Body body) {
        closeNotificationsDeliveredDb();
    }

    private void createAndRegisterNotificationsDeliveredDB(Body body) {
        String str = EventCloudProperties.getDefaultTemporaryPath() + "jdbm" + File.separatorChar;
        new File(str).mkdirs();
        this.notificationsDeliveredDB = DBMaker.newFileDB(new File(str + body.getID())).cacheSoftRefEnable().closeOnJvmShutdown().deleteFilesAfterClose().transactionDisable().make();
        this.notificationsDeliveredDB.createHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME, new NotificationId.Serializer(), new SubscriptionId.Serializer());
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxyAttributeController
    public void setAttributes(EventCloudCache eventCloudCache, String str, AlterableElaProperty[] alterableElaPropertyArr) {
        if (this.eventCloudCache == null) {
            this.eventCloudCache = eventCloudCache;
            this.proxy = Proxies.newProxy(this.eventCloudCache.getTrackers());
            this.componentUri = str;
            this.subscriptions = new ConcurrentHashMap(100, 0.9f, 2);
            this.bindingSolutions = new ConcurrentHashMap(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue() * ((Integer) EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue()).intValue(), 0.75f, ((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue());
            this.quadruplesSolutions = new ConcurrentHashMap(((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue() * ((Integer) EventCloudProperties.AVERAGE_NB_QUADRUPLES_PER_COMPOUND_EVENT.getValue()).intValue(), 0.75f, ((Integer) EventCloudProperties.MAO_SOFT_LIMIT_SUBSCRIBE_PROXIES.getValue()).intValue());
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: fr.inria.eventcloud.proxies.SubscribeProxyImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    SubscribeProxyImpl.this.closeNotificationsDeliveredDb();
                }
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void closeNotificationsDeliveredDb() {
        if (this.notificationsDeliveredDB != null) {
            this.notificationsDeliveredDB.close();
        }
    }

    @MemberOf("parallel")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, BindingNotificationListener bindingNotificationListener) {
        indexSubscription(subscription, bindingNotificationListener);
    }

    @MemberOf("parallel")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, CompoundEventNotificationListener compoundEventNotificationListener) {
        indexSubscription(subscription, (EventCloudProperties.isSbce1PubSubAlgorithmUsed() || EventCloudProperties.isSbce2PubSubAlgorithmUsed()) ? PublishSubscribeUtils.removeResultVarsExceptGraphVar(subscription.getSparqlQuery()) : subscription.getSparqlQuery(), compoundEventNotificationListener);
    }

    @MemberOf("parallel")
    public void subscribe(fr.inria.eventcloud.api.Subscription subscription, SignalNotificationListener signalNotificationListener) {
        indexSubscription(subscription, signalNotificationListener);
    }

    private void indexSubscription(fr.inria.eventcloud.api.Subscription subscription, NotificationListener<?> notificationListener) {
        indexSubscription(subscription, subscription.getSparqlQuery(), notificationListener);
    }

    private void indexSubscription(fr.inria.eventcloud.api.Subscription subscription, String str, NotificationListener<?> notificationListener) {
        Subscription createInternalSubscription = createInternalSubscription(subscription, this.componentUri, str, notificationListener.getType());
        if (this.subscriptions.putIfAbsent(subscription.getId(), new SubscriptionEntry<>(createInternalSubscription, notificationListener)) != null) {
            throw new IllegalArgumentException("Subscription already registered for subscription id: " + createInternalSubscription.getId());
        }
        super.selectPeer().indexSubscription(createInternalSubscription);
        log.info("New subscription has been registered from {} with id {}", PAActiveObject.getBodyOnThis().getUrl(), createInternalSubscription.getId());
    }

    private static Subscription createInternalSubscription(fr.inria.eventcloud.api.Subscription subscription, String str, String str2, NotificationListenerType notificationListenerType) {
        return new Subscription(subscription.getId(), null, subscription.getId(), subscription.getCreationTime(), str2, str, subscription.getSubscriptionDestination(), notificationListenerType);
    }

    @MemberOf("parallel")
    public void unsubscribe(SubscriptionId subscriptionId) {
        SubscriptionEntry<?> remove = this.subscriptions.remove(subscriptionId);
        if (remove == null) {
            throw new IllegalArgumentException("No subscription registered with the specified subscription id: " + subscriptionId);
        }
        Subscription subscription = ((SubscriptionEntry) remove).subscription;
        for (Subsubscription subsubscription : subscription.getSubSubscriptions()) {
            super.selectPeer().send(new UnsubscribeRequest(subscription.getOriginalId(), subsubscription.getAtomicQuery(), subscription.getType() == NotificationListenerType.BINDING));
        }
        this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).values().remove(subscriptionId);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public void receive(BindingNotification bindingNotification) {
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(bindingNotification.getSubscriptionId());
        if (subscriptionEntry == null) {
            return;
        }
        logNotificationReception(bindingNotification);
        BindingSolution bindingSolution = this.bindingSolutions.get(bindingNotification.getId());
        if (bindingSolution == null) {
            bindingSolution = new BindingSolution(((SubscriptionEntry) subscriptionEntry).subscription.getSubSubscriptions().length, bindingNotification.getContent());
            BindingSolution putIfAbsent = this.bindingSolutions.putIfAbsent(bindingNotification.getId(), bindingSolution);
            if (putIfAbsent != null) {
                bindingSolution = putIfAbsent;
                bindingSolution.merge(bindingNotification.getContent());
            }
        } else {
            bindingSolution.merge(bindingNotification.getContent());
        }
        if (bindingSolution.isReady()) {
            deliver(subscriptionEntry, bindingSolution);
            this.bindingSolutions.remove(bindingNotification.getId());
        }
    }

    private void deliver(SubscriptionEntry<BindingNotificationListener> subscriptionEntry, BindingSolution bindingSolution) {
        ((SubscriptionEntry) subscriptionEntry).listener.onNotification(((SubscriptionEntry) subscriptionEntry).subscription.getId(), bindingSolution.getChunks());
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public void receive(QuadruplesNotification quadruplesNotification) {
        Node graph = quadruplesNotification.getContent().get(0).getGraph();
        SubscriptionId subscriptionId = quadruplesNotification.getSubscriptionId();
        if (this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).containsKey(quadruplesNotification.getId())) {
            log.warn("Received some quadruple duplicates for a CE that has already been delivered:\n{}", quadruplesNotification);
            super.selectPeer().sendv(new RemoveEphemeralSubscriptionRequest(graph, quadruplesNotification.getSubscriptionId()));
        }
        if (log.isDebugEnabled()) {
            log.debug("Received quadruples notification subscriptionId=" + subscriptionId + ", contentSize=" + quadruplesNotification.getContent().size() + ", from=" + quadruplesNotification.getSource() + "\n" + QuadruplesFormatter.toString(quadruplesNotification.getContent(), true));
        }
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(subscriptionId);
        if (subscriptionEntry == null) {
            return;
        }
        logNotificationReception(quadruplesNotification);
        QuadruplesSolution quadruplesSolution = this.quadruplesSolutions.get(quadruplesNotification.getId());
        if (quadruplesSolution == null) {
            quadruplesSolution = new QuadruplesSolution(quadruplesNotification.getContent());
            QuadruplesSolution putIfAbsent = this.quadruplesSolutions.putIfAbsent(quadruplesNotification.getId(), quadruplesSolution);
            if (putIfAbsent != null) {
                quadruplesSolution = putIfAbsent;
                quadruplesSolution.merge((Collection<Quadruple>) quadruplesNotification.getContent());
            }
        } else {
            quadruplesSolution.merge((Collection<Quadruple>) quadruplesNotification.getContent());
        }
        if (quadruplesSolution.isReady()) {
            if (markAsDelivered(quadruplesNotification.getId(), subscriptionId) != null) {
                log.warn("Received some quadruple duplicates for a CE that has already been delivered:\n{}", quadruplesNotification);
                this.quadruplesSolutions.remove(quadruplesNotification.getId());
                super.selectPeer().sendv(new RemoveEphemeralSubscriptionRequest(graph, subscriptionId));
            }
            CompoundEvent compoundEvent = new CompoundEvent(quadruplesSolution.getChunks());
            deliver(subscriptionEntry, compoundEvent.getGraph().toString(), compoundEvent);
            this.quadruplesSolutions.remove(quadruplesNotification.getId());
            super.selectPeer().sendv(new RemoveEphemeralSubscriptionRequest(compoundEvent.getGraph(), subscriptionId));
        }
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public void receive(SignalNotification signalNotification) {
        SubscriptionEntry<?> subscriptionEntry = this.subscriptions.get(signalNotification.getSubscriptionId());
        if (subscriptionEntry == null) {
            return;
        }
        logNotificationReception(signalNotification);
        deliver(subscriptionEntry);
    }

    private void deliver(SubscriptionEntry<SignalNotificationListener> subscriptionEntry) {
        ((SubscriptionEntry) subscriptionEntry).listener.onNotification(((SubscriptionEntry) subscriptionEntry).subscription.getId());
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public void receive(PollingSignalNotification pollingSignalNotification) {
        SubscriptionEntry<?> subscriptionEntry;
        SubscriptionId subscriptionId = pollingSignalNotification.getSubscriptionId();
        CompoundEvent reconstructCompoundEvent = reconstructCompoundEvent(pollingSignalNotification.getId(), subscriptionId, Node.createURI(pollingSignalNotification.getMetaEventId()));
        logNotificationReception(pollingSignalNotification);
        if (reconstructCompoundEvent == null || (subscriptionEntry = this.subscriptions.get(subscriptionId)) == null) {
            return;
        }
        deliver(subscriptionEntry, pollingSignalNotification.getMetaEventId(), reconstructCompoundEvent);
    }

    private void deliver(SubscriptionEntry<CompoundEventNotificationListener> subscriptionEntry, String str, CompoundEvent compoundEvent) {
        SubscriptionId id = ((SubscriptionEntry) subscriptionEntry).subscription.getId();
        CompoundEventNotificationListener compoundEventNotificationListener = ((SubscriptionEntry) subscriptionEntry).listener;
        compoundEventNotificationListener.onNotification(id, compoundEvent);
        sendInputOutputMonitoringReport(Quadruple.getPublicationSource(str), compoundEventNotificationListener.getSubscriberUrl(), Quadruple.getPublicationTime(str));
        logIntegrationInformation(str);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public final CompoundEvent reconstructCompoundEvent(NotificationId notificationId, SubscriptionId subscriptionId, Node node) {
        if (markAsDelivered(notificationId, subscriptionId) != null) {
            return null;
        }
        int i = -1;
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        QuadruplePattern quadruplePattern = new QuadruplePattern(node, Node.ANY, Node.ANY, Node.ANY);
        while (arrayList.size() != i) {
            if (log.isInfoEnabled()) {
                log.info("Reconstructing compound event for subscription {} and graph value {} ({}/{})", new Object[]{subscriptionId, node, Integer.valueOf(arrayList.size()), Integer.valueOf(i)});
            }
            for (Quadruple quadruple : ((QuadruplePatternResponse) PAFuture.getFutureValue(super.selectPeer().send(new ReconstructCompoundEventRequest(quadruplePattern, hashSet)))).getResult()) {
                if (quadruple.getPredicate().equals(PublishSubscribeConstants.EVENT_NB_QUADRUPLES_NODE)) {
                    i = ((Integer) quadruple.getObject().getLiteralValue()).intValue();
                } else {
                    arrayList.add(quadruple);
                }
                hashSet.add(quadruple.hashValue());
            }
            try {
                Thread.sleep(((Integer) EventCloudProperties.RECONSTRUCTION_RETRY_THRESHOLD.getValue()).intValue());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return new CompoundEvent(arrayList);
    }

    private void sendInputOutputMonitoringReport(String str, String str2, long j) {
        if (str == null) {
            str = "http://0.0.0.0";
        }
        if (str2 == null) {
            str2 = this.componentUri;
        }
        if (this.monitoringManager != null) {
            this.monitoringManager.sendInputOutputMonitoringReport(str, str2, j);
        }
    }

    private void logIntegrationInformation(String str) {
        if (((Boolean) EventCloudProperties.INTEGRATION_LOG.getValue()).booleanValue()) {
            String str2 = "EventCloud Exit";
            if (str != null) {
                str2 = (str2 + " ") + Quadruple.removeMetaInformation(Node.createURI(str));
            }
            log.info((str2 + " ") + this.eventCloudCache.getId().getStreamUrl());
        }
    }

    private void logNotificationReception(Notification<?> notification) {
        if (log.isDebugEnabled()) {
            log.debug("New notification received {} on {} for subscription id {}", new Object[]{notification.getId(), this.componentUri, notification.getSubscriptionId()});
        }
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    @MemberOf("parallel")
    public Subscription find(SubscriptionId subscriptionId) {
        return ((SubscriptionEntry) this.subscriptions.get(subscriptionId)).subscription;
    }

    @MemberOf("parallel")
    public String getComponentUri() {
        return this.componentUri;
    }

    private SubscriptionId markAsDelivered(NotificationId notificationId, SubscriptionId subscriptionId) {
        return (SubscriptionId) this.notificationsDeliveredDB.getHashMap(NOTIFICATIONS_DELIVERED_MAP_NAME).putIfAbsent(notificationId, subscriptionId);
    }
}
