package fr.inria.eventcloud.pubsub;

import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.hash.HashCode;
import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.query.QueryExecution;
import com.hp.hpl.jena.query.QueryExecutionFactory;
import com.hp.hpl.jena.query.QueryFactory;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.sparql.algebra.Algebra;
import com.hp.hpl.jena.sparql.algebra.Op;
import com.hp.hpl.jena.sparql.algebra.OpAsQuery;
import com.hp.hpl.jena.sparql.algebra.TransformBase;
import com.hp.hpl.jena.sparql.algebra.Transformer;
import com.hp.hpl.jena.sparql.algebra.op.OpProject;
import com.hp.hpl.jena.sparql.core.Var;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import com.hp.hpl.jena.sparql.syntax.ElementNamedGraph;
import com.hp.hpl.jena.sparql.syntax.ElementVisitorBase;
import com.hp.hpl.jena.sparql.syntax.ElementWalker;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.wrappers.BindingWrapper;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.datastore.AccessMode;
import fr.inria.eventcloud.datastore.TransactionalDatasetGraph;
import fr.inria.eventcloud.datastore.TransactionalTdbDatastore;
import fr.inria.eventcloud.datastore.Vars;
import fr.inria.eventcloud.messages.request.can.IndexEphemeralSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.IndexSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.UnsubscribeRequest;
import fr.inria.eventcloud.operations.can.RetrieveSubSolutionOperation;
import fr.inria.eventcloud.overlay.SemanticCanOverlay;
import fr.inria.eventcloud.overlay.SemanticPeer;
import fr.inria.eventcloud.proxies.SubscribeProxy;
import fr.inria.eventcloud.pubsub.Subscription;
import fr.inria.eventcloud.pubsub.notifications.BindingNotification;
import fr.inria.eventcloud.pubsub.notifications.PollingSignalNotification;
import fr.inria.eventcloud.pubsub.notifications.QuadruplesNotification;
import fr.inria.eventcloud.pubsub.notifications.SignalNotification;
import fr.inria.eventcloud.reasoner.AtomicQuery;
import fr.inria.eventcloud.utils.SparqlResultSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.mutable.MutableObject;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.utils.Pair;
import org.openjena.riot.out.NodeFmtLib;
import org.openjena.riot.out.NodeToLabel;
import org.openjena.riot.out.OutputLangUtils;
import org.openjena.riot.system.Prologue;
import org.openjena.riot.tokens.Tokenizer;
import org.openjena.riot.tokens.TokenizerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/inria/eventcloud/pubsub/PublishSubscribeUtils.class */
public final class PublishSubscribeUtils {
    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeUtils.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: fr.inria.eventcloud.pubsub.PublishSubscribeUtils$3, reason: invalid class name */
    /* loaded from: input_file:fr/inria/eventcloud/pubsub/PublishSubscribeUtils$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType = new int[NotificationListenerType.values().length];

        static {
            try {
                $SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType[NotificationListenerType.BINDING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType[NotificationListenerType.COMPOUND_EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType[NotificationListenerType.SIGNAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType[NotificationListenerType.UNKNOWN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:fr/inria/eventcloud/pubsub/PublishSubscribeUtils$BindingMap.class */
    public static final class BindingMap implements com.hp.hpl.jena.sparql.engine.binding.BindingMap, Serializable {
        private static final long serialVersionUID = 130;
        private transient Map<Var, Node> content = new HashMap();

        public Iterator<Var> vars() {
            return this.content.keySet().iterator();
        }

        public boolean contains(Var var) {
            return this.content.containsKey(var);
        }

        public Node get(Var var) {
            return this.content.get(var);
        }

        public int size() {
            return this.content.size();
        }

        public boolean isEmpty() {
            return this.content.isEmpty();
        }

        public void add(Var var, Node node) {
            this.content.put(var, node);
        }

        public void addAll(Binding binding) {
            Iterator vars = binding.vars();
            while (vars.hasNext()) {
                Var var = (Var) vars.next();
                this.content.put(var, binding.get(var));
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("(");
            Iterator<Map.Entry<Var, Node>> it = this.content.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Var, Node> next = it.next();
                sb.append(next.getKey());
                sb.append('=');
                sb.append(next.getValue());
                if (it.hasNext()) {
                    sb.append(", ");
                }
            }
            sb.append(')');
            return sb.toString();
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            objectOutputStream.defaultWriteObject();
            SparqlResultSerializer.serialize(objectOutputStream, this, ((Boolean) EventCloudProperties.COMPRESSION.getValue()).booleanValue());
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            objectInputStream.defaultReadObject();
            Binding deserializeBinding = SparqlResultSerializer.deserializeBinding(objectInputStream, ((Boolean) EventCloudProperties.COMPRESSION.getValue()).booleanValue());
            this.content = new HashMap();
            Iterator vars = deserializeBinding.vars();
            while (vars.hasNext()) {
                Var var = (Var) vars.next();
                this.content.put(var, deserializeBinding.get(var));
            }
        }
    }

    private PublishSubscribeUtils() {
    }

    public static final Quadruple createMetaQuadruple(Quadruple quadruple, Node node, Node node2) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(byteArrayOutputStream);
        OutputLangUtils.output(outputStreamWriter, node2, (Prologue) null);
        try {
            outputStreamWriter.write(32);
        } catch (IOException e) {
            e.printStackTrace();
        }
        OutputLangUtils.output(outputStreamWriter, quadruple.createMetaGraphNode(), quadruple.getSubject(), quadruple.getPredicate(), quadruple.getObject(), (Prologue) null, (NodeToLabel) null);
        try {
            outputStreamWriter.flush();
        } catch (IOException e2) {
            e2.printStackTrace();
        }
        return new Quadruple(createQuadrupleHashUri(quadruple), node, PublishSubscribeConstants.QUADRUPLE_MATCHES_SUBSCRIPTION_NODE, Node.createLiteral(new String(byteArrayOutputStream.toByteArray())));
    }

    public static final List<SubscriptionId> findSubscriptionIds(TransactionalTdbDatastore transactionalTdbDatastore, SubscriptionId subscriptionId) {
        ArrayList arrayList = new ArrayList();
        TransactionalDatasetGraph begin = transactionalTdbDatastore.begin(AccessMode.READ_ONLY);
        QueryExecution queryExecution = null;
        try {
            queryExecution = QueryExecutionFactory.create("SELECT " + Vars.SUBSCRIPTION_ID.toString() + " WHERE {\n    GRAPH " + NodeFmtLib.str(PublishSubscribeConstants.SUBSCRIPTION_NS_NODE) + " {\n        ?sIdUri " + NodeFmtLib.str(PublishSubscribeConstants.SUBSCRIPTION_ORIGINAL_ID_NODE) + ' ' + NodeFmtLib.str(subscriptionId.toJenaNode()) + " .\n        ?sIdUri " + NodeFmtLib.str(PublishSubscribeConstants.SUBSCRIPTION_ID_NODE) + ' ' + Vars.SUBSCRIPTION_ID.toString() + " .\n    }\n}", begin.getUnderlyingDataset());
            ResultSet execSelect = queryExecution.execSelect();
            while (execSelect.hasNext()) {
                arrayList.add(SubscriptionId.parseSubscriptionId(execSelect.nextBinding().get(Vars.SUBSCRIPTION_ID).getLiteralLexicalForm()));
            }
            if (queryExecution != null) {
                queryExecution.close();
            }
            begin.end();
            return arrayList;
        } catch (Throwable th) {
            if (queryExecution != null) {
                queryExecution.close();
            }
            begin.end();
            throw th;
        }
    }

    public static final Pair<Quadruple, SubscriptionId> extractMetaInformation(Quadruple quadruple) {
        Tokenizer makeTokenizerUTF8 = TokenizerFactory.makeTokenizerUTF8(new ByteArrayInputStream(quadruple.getObject().getLiteralLexicalForm().getBytes()));
        return Pair.create(new Quadruple(makeTokenizerUTF8.next().asNode(), makeTokenizerUTF8.next().asNode(), makeTokenizerUTF8.next().asNode(), makeTokenizerUTF8.next().asNode()), SubscriptionId.parseSubscriptionId(makeTokenizerUTF8.next().asNode().getLiteralLexicalForm()));
    }

    public static final Node createQuadrupleHashUri(Quadruple quadruple) {
        return createQuadrupleHashUri(quadruple.hashValue());
    }

    public static final Node createQuadrupleHashUri(HashCode hashCode) {
        return Node.createURI("urn:ec:quad:".concat(hashCode.toString()));
    }

    public static final Node createSubscriptionIdUri(SubscriptionId subscriptionId) {
        return createSubscriptionIdUri(subscriptionId.toString());
    }

    public static final Node createSubSubscriptionIdUri(String str) {
        return Node.createURI("urn:ec:ss:" + str);
    }

    public static final Node createSubSubscriptionIdUri(SubscriptionId subscriptionId) {
        return createSubSubscriptionIdUri(subscriptionId.toString());
    }

    public static final Node createSubscriptionIdUri(String str) {
        return Node.createURI("urn:ec:s:" + str);
    }

    public static final void deleteSubscription(TransactionalTdbDatastore transactionalTdbDatastore, SubscriptionId subscriptionId) {
        Node createSubscriptionIdUri = createSubscriptionIdUri(subscriptionId);
        TransactionalDatasetGraph begin = transactionalTdbDatastore.begin(AccessMode.READ_ONLY);
        ArrayList arrayList = null;
        try {
            try {
                arrayList = Lists.newArrayList(begin.find(PublishSubscribeConstants.SUBSCRIPTION_NS_NODE, createSubscriptionIdUri, PublishSubscribeConstants.SUBSCRIPTION_HAS_SUBSUBSCRIPTION_NODE, Node.ANY));
                begin.end();
            } catch (Exception e) {
                e.printStackTrace();
                begin.end();
            }
            if (arrayList != null) {
                begin = transactionalTdbDatastore.begin(AccessMode.WRITE);
                try {
                    try {
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            begin.delete(Node.ANY, createSubSubscriptionIdUri(((Quadruple) it.next()).getObject().getLiteralLexicalForm()), Node.ANY, Node.ANY);
                        }
                        begin.delete(Node.ANY, createSubscriptionIdUri, Node.ANY, Node.ANY);
                        begin.commit();
                        begin.end();
                    } catch (Throwable th) {
                        throw th;
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                    begin.end();
                }
            }
        } finally {
            begin.end();
        }
    }

    public static final SubscriptionId extractSubscriptionId(Node node) {
        if (node.isURI() && node.getURI().startsWith("urn:ec:s:")) {
            return SubscriptionId.parseSubscriptionId(node.getURI().substring(node.getURI().lastIndexOf(58) + 1));
        }
        throw new IllegalArgumentException("The specified subscription id URI is not valid: " + node);
    }

    public static final String extractSubscriptionId(String str) {
        int lastIndexOf = str.lastIndexOf(58);
        if (lastIndexOf == -1) {
            throw new IllegalArgumentException("The specified subscription id URI is not valid: " + str);
        }
        return str.substring(lastIndexOf + 1);
    }

    public static final Binding filter(Quadruple quadruple, Set<Var> set, AtomicQuery atomicQuery) {
        Sets.SetView intersection = Sets.intersection(set, FluentIterable.from(atomicQuery.getVars()).toImmutableSet());
        BindingMap bindingMap = new BindingMap();
        Node[] array = quadruple.toArray();
        int i = 0;
        for (Node node : atomicQuery.toArray()) {
            if (node.isVariable() && intersection.contains(Var.alloc(node.getName()))) {
                bindingMap.add(Var.alloc(node.getName()), i == 0 ? quadruple.createMetaGraphNode() : array[i]);
            }
            i++;
        }
        return bindingMap;
    }

    public static final String removeResultVarsExceptGraphVar(String str) {
        Query create = QueryFactory.create(str);
        final MutableObject mutableObject = new MutableObject();
        ElementWalker.walk(create.getQueryPattern(), new ElementVisitorBase() { // from class: fr.inria.eventcloud.pubsub.PublishSubscribeUtils.1
            public void visit(ElementNamedGraph elementNamedGraph) {
                if (!elementNamedGraph.getGraphNameNode().isVariable()) {
                    throw new IllegalArgumentException("The specified subscription does not have a graph variable: " + elementNamedGraph.getGraphNameNode());
                }
                mutableObject.setValue(elementNamedGraph.getGraphNameNode());
            }
        });
        return OpAsQuery.asQuery(Transformer.transform(new TransformBase() { // from class: fr.inria.eventcloud.pubsub.PublishSubscribeUtils.2
            public Op transform(OpProject opProject, Op op) {
                return new OpProject(op, Lists.newArrayList(new Var[]{(Var) mutableObject.getValue()}));
            }
        }, Algebra.compile(create))).toString();
    }

    public static void rewriteSubscriptionOrNotifySender(SemanticCanOverlay semanticCanOverlay, Subscription subscription, Quadruple quadruple) {
        if (subscription.getSubSubscriptions().length != 1) {
            rewriteAndIndexSubscription(semanticCanOverlay, subscription, quadruple);
        } else {
            log.debug("{} matches a subscription which cannot be rewritten, a notification will be delivered", quadruple);
            notifySubscriberAboutSolution(semanticCanOverlay, subscription, quadruple);
        }
    }

    private static void notifySubscriberAboutSolution(SemanticCanOverlay semanticCanOverlay, Subscription subscription, Quadruple quadruple) {
        if (semanticCanOverlay.hasSocialFilter()) {
            double strength = semanticCanOverlay.getSocialFilter().getRelationshipStrength(quadruple.getPublicationSource(), subscription.getSubscriptionDestination()).getStrength();
            logSocialFilterAnswer(subscription, quadruple, strength);
            if (strength < ((Double) EventCloudProperties.SOCIAL_FILTER_THRESHOLD.getValue()).doubleValue()) {
                return;
            }
        }
        try {
            SubscribeProxy subscriberProxy = subscription.getSubscriberProxy();
            String url = PAActiveObject.getUrl(semanticCanOverlay.getStub());
            switch (AnonymousClass3.$SwitchMap$fr$inria$eventcloud$api$listeners$NotificationListenerType[subscription.getType().ordinal()]) {
                case 1:
                    BindingNotification bindingNotification = new BindingNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), url, (Binding) new BindingWrapper(createBindingSolution(subscription, quadruple)));
                    subscriberProxy.receive(bindingNotification);
                    for (Subscription.Stub stub : subscription.getStubs()) {
                        SemanticPeer findPeerStub = semanticCanOverlay.findPeerStub(stub.peerUrl);
                        if (findPeerStub != null) {
                            findPeerStub.receive(new RetrieveSubSolutionOperation(bindingNotification.getId(), stub.quadrupleHash));
                        } else {
                            log.error("Error while retrieving peer stub for url: {}", stub.peerUrl);
                        }
                    }
                    break;
                case 2:
                    if (EventCloudProperties.isSbce1PubSubAlgorithmUsed()) {
                        subscriberProxy.receive(new PollingSignalNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), url));
                        break;
                    } else if (EventCloudProperties.isSbce2PubSubAlgorithmUsed()) {
                        QuadruplesNotification quadruplesNotification = new QuadruplesNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), url, ImmutableList.of(quadruple));
                        if (semanticCanOverlay.markAsSent(quadruplesNotification.getId(), quadruple)) {
                            subscriberProxy.receive(quadruplesNotification);
                        }
                        semanticCanOverlay.getStub().sendv(new IndexEphemeralSubscriptionRequest(quadruple.createMetaGraphNode(), subscription.getOriginalId(), subscription.getSubscriberUrl()));
                        break;
                    }
                    break;
                case 3:
                    subscriberProxy.receive(new SignalNotification(subscription.getOriginalId(), quadruple.createMetaGraphNode(), url));
                    break;
                case 4:
                    throw new IllegalStateException();
            }
        } catch (ExecutionException e) {
            log.warn("Notification cannot be sent because no SubscribeProxy found under URL: " + subscription.getSubscriberUrl());
            handleSubscriberConnectionFailure(semanticCanOverlay, subscription);
        }
    }

    private static void handleSubscriberConnectionFailure(SemanticCanOverlay semanticCanOverlay, Subscription subscription) {
        SubscriberConnectionFailure subscriberConnectionFailure = new SubscriberConnectionFailure();
        SubscriberConnectionFailure putIfAbsent = semanticCanOverlay.getSubscriberConnectionFailures().putIfAbsent(subscription.getOriginalId(), subscriberConnectionFailure);
        if (putIfAbsent != null) {
            subscriberConnectionFailure = putIfAbsent;
        }
        synchronized (subscriberConnectionFailure) {
            subscriberConnectionFailure.incNbAttempts();
            if (subscriberConnectionFailure.getNbAttempts() == ((Integer) EventCloudProperties.PROXY_MAX_LOOKUP_ATTEMPTS.getValue()).intValue()) {
                for (Subsubscription subsubscription : subscription.getSubSubscriptions()) {
                    PAFuture.waitFor(semanticCanOverlay.getStub().send(new UnsubscribeRequest(subscription.getOriginalId(), subsubscription.getAtomicQuery(), subscription.getType() == NotificationListenerType.BINDING)));
                    semanticCanOverlay.getSubscriberConnectionFailures().remove(subscription.getOriginalId());
                    log.info("Removed subscription {} due to subscriber which is not reachable under URL {}", subscription.getId(), subscription.getSubscriberUrl());
                }
            }
        }
    }

    private static void logSocialFilterAnswer(Subscription subscription, Quadruple quadruple, double d) {
        if (log.isDebugEnabled()) {
            log.debug("SocialFilterAnswer[source={}, destination={}, threshold={}, relationship_strengh={}, quadruple={}|{}|{}|{}]", new Object[]{quadruple.getPublicationSource(), subscription.getSubscriptionDestination(), EventCloudProperties.SOCIAL_FILTER_THRESHOLD.getValue(), Double.valueOf(d), quadruple.getGraph(), quadruple.getSubject(), quadruple.getPredicate(), quadruple.getObject()});
        }
    }

    private static Binding createBindingSolution(Subscription subscription, Quadruple quadruple) {
        return filter(quadruple, subscription.getResultVars(), subscription.getSubSubscriptions()[0].getAtomicQuery());
    }

    private static void rewriteAndIndexSubscription(SemanticCanOverlay semanticCanOverlay, Subscription subscription, Quadruple quadruple) {
        if (subscription.getType() == NotificationListenerType.BINDING) {
            Quadruple createMetaQuadruple = createMetaQuadruple(quadruple, createSubscriptionIdUri(subscription.getId()), Node.createLiteral(subscription.getId().toString(), XSDDatatype.XSDlong));
            TransactionalDatasetGraph begin = semanticCanOverlay.getSubscriptionsDatastore().begin(AccessMode.WRITE);
            try {
                try {
                    begin.add(createMetaQuadruple);
                    begin.commit();
                    begin.end();
                } catch (Exception e) {
                    e.printStackTrace();
                    begin.end();
                }
            } catch (Throwable th) {
                begin.end();
                throw th;
            }
        }
        Subscription rewrite = SubscriptionRewriter.rewrite(subscription, quadruple);
        rewrite.addStub(new Subscription.Stub(PAActiveObject.getUrl(semanticCanOverlay.getStub()), quadruple.hashValue()));
        if (((Boolean) P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
            log.info("Peer " + semanticCanOverlay + " is about to dispatch a rewritten subscription, creation time = " + rewrite.getCreationTime() + " , subscription: " + rewrite.getSparqlQuery());
        }
        semanticCanOverlay.dispatchv(new IndexSubscriptionRequest(rewrite));
    }
}
