package fr.inria.eventcloud.pubsub;

import com.hp.hpl.jena.datatypes.xsd.XSDDatatype;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.EventCloudId;
import fr.inria.eventcloud.api.PublishApi;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.SubscribeApi;
import fr.inria.eventcloud.api.Subscription;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.generators.NodeGenerator;
import fr.inria.eventcloud.api.generators.QuadrupleGenerator;
import fr.inria.eventcloud.api.listeners.BindingNotificationListener;
import fr.inria.eventcloud.api.listeners.CompoundEventNotificationListener;
import fr.inria.eventcloud.api.listeners.SignalNotificationListener;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.deployment.JunitEventCloudInfrastructureDeployer;
import fr.inria.eventcloud.exceptions.EventCloudIdNotManaged;
import fr.inria.eventcloud.factories.NotificationListenerFactory;
import fr.inria.eventcloud.factories.ProxyFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.objectweb.proactive.core.util.MutableInteger;
import org.objectweb.proactive.extensions.p2p.structured.utils.ComponentUtils;
import org.objectweb.proactive.extensions.p2p.structured.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/inria/eventcloud/pubsub/SubscribeProxyTest.class */
public class SubscribeProxyTest {
    private static final Logger log = LoggerFactory.getLogger(SubscribeProxyTest.class);
    private static List<CompoundEvent> events = new ArrayList();
    private static List<Binding> bindings = new ArrayList();
    private static MutableInteger signals = new MutableInteger();
    private static Node eventId = NodeGenerator.randomUri((String) EventCloudProperties.EVENT_CLOUD_ID_PREFIX.getValue());
    private EventCloudId eventCloudId;
    private JunitEventCloudInfrastructureDeployer deployer;
    private SubscribeApi subscribeProxy;
    private PublishApi publishProxy;

    /* loaded from: input_file:fr/inria/eventcloud/pubsub/SubscribeProxyTest$CustomBindingNotificationListener.class */
    private static class CustomBindingNotificationListener extends BindingNotificationListener {
        private static final long serialVersionUID = 1;

        private CustomBindingNotificationListener() {
        }

        public void onNotification(SubscriptionId subscriptionId, Binding binding) {
            synchronized (SubscribeProxyTest.bindings) {
                SubscribeProxyTest.bindings.add(binding);
                SubscribeProxyTest.bindings.notifyAll();
            }
            SubscribeProxyTest.log.info("New binding received:\n{}", binding);
        }
    }

    /* loaded from: input_file:fr/inria/eventcloud/pubsub/SubscribeProxyTest$CustomCompoundEventNotificationListener.class */
    private static class CustomCompoundEventNotificationListener extends CompoundEventNotificationListener {
        private static final long serialVersionUID = 1;

        private CustomCompoundEventNotificationListener() {
        }

        public void onNotification(SubscriptionId subscriptionId, CompoundEvent compoundEvent) {
            synchronized (SubscribeProxyTest.events) {
                SubscribeProxyTest.events.add(compoundEvent);
                SubscribeProxyTest.events.notifyAll();
            }
            SubscribeProxyTest.log.info("New event received:\n{}", compoundEvent);
        }
    }

    /* loaded from: input_file:fr/inria/eventcloud/pubsub/SubscribeProxyTest$CustomSignalNotificationListener.class */
    private static class CustomSignalNotificationListener extends SignalNotificationListener {
        private static final long serialVersionUID = 1;

        private CustomSignalNotificationListener() {
        }

        public void onNotification(SubscriptionId subscriptionId) {
            synchronized (SubscribeProxyTest.signals) {
                SubscribeProxyTest.signals.add(1);
                SubscribeProxyTest.signals.notifyAll();
            }
            SubscribeProxyTest.log.info("New signal received");
        }
    }

    @Before
    public void setUp() throws EventCloudIdNotManaged {
        this.deployer = new JunitEventCloudInfrastructureDeployer();
        this.eventCloudId = this.deployer.newEventCloud(1, 5);
        this.subscribeProxy = ProxyFactory.newSubscribeProxy(this.deployer.getEventCloudsRegistryUrl(), this.eventCloudId, new AlterableElaProperty[0]);
        this.publishProxy = ProxyFactory.newPublishProxy(this.deployer.getEventCloudsRegistryUrl(), this.eventCloudId);
    }

    @Test
    public void testSubscribeWithConcurrentPublications() throws EventCloudIdNotManaged {
        List<PublishApi> createPublishProxies = createPublishProxies(10);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
        ArrayList arrayList = new ArrayList();
        final AtomicInteger atomicInteger = new AtomicInteger();
        for (int i = 0; i < 10; i++) {
            final PublishApi publishApi = createPublishProxies.get(i);
            final int i2 = i;
            newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { // from class: fr.inria.eventcloud.pubsub.SubscribeProxyTest.1
                @Override // java.lang.Runnable
                public void run() {
                    if (atomicInteger.incrementAndGet() <= 100) {
                        ArrayList arrayList2 = new ArrayList();
                        Node createURI = Node.createURI(((String) EventCloudProperties.EVENT_CLOUD_ID_PREFIX.getValue()) + "/" + UUID.randomUUID().toString());
                        for (int i3 = 0; i3 < 1 + RandomUtils.nextInt(30); i3++) {
                            arrayList2.add(QuadrupleGenerator.random(createURI));
                        }
                        SubscribeProxyTest.log.debug("Publishing an event composed of {} quadruples from thread {}", Integer.valueOf(arrayList2.size()), Integer.valueOf(i2));
                        publishApi.publish(new CompoundEvent(arrayList2));
                    }
                }
            }, 0L, (i + 1) * 200, TimeUnit.MILLISECONDS);
        }
        this.subscribeProxy.subscribe(new Subscription("SELECT ?g ?s ?p ?o WHERE { GRAPH ?g { ?s ?p ?o } }"), new CustomCompoundEventNotificationListener());
        synchronized (events) {
            while (events.size() < 100) {
                try {
                    events.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((ScheduledFuture) it.next()).cancel(true);
        }
        newScheduledThreadPool.shutdown();
    }

    private List<PublishApi> createPublishProxies(int i) throws EventCloudIdNotManaged {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(ProxyFactory.newPublishProxy(this.deployer.getEventCloudsRegistryUrl(), this.eventCloudId));
        }
        return arrayList;
    }

    @Test(timeout = 60000)
    public void testSubscribeBindingNotificationListener() {
        this.subscribeProxy.subscribe(new Subscription("PREFIX foaf: <http://xmlns.com/foaf/0.1/> SELECT ?name ?email ?g WHERE { GRAPH ?g { ?id foaf:name ?name . ?id foaf:email ?email } }"), new CustomBindingNotificationListener());
        long currentTimeMillis = System.currentTimeMillis();
        Quadruple quadruple = new Quadruple(Node.createURI("https://plus.google.com/825349613"), Node.createURI("https://plus.google.com/107234124364605485774"), Node.createURI("http://xmlns.com/foaf/0.1/email"), Node.createLiteral("user1@company.com"));
        quadruple.setPublicationTime(currentTimeMillis);
        this.publishProxy.publish(quadruple);
        Quadruple quadruple2 = new Quadruple(Node.createURI("https://plus.google.com/825349613"), Node.createURI("https://plus.google.com/107234124364605485774"), Node.createURI("http://xmlns.com/foaf/0.1/name"), Node.createLiteral("User1"));
        quadruple2.setPublicationTime(currentTimeMillis);
        this.publishProxy.publish(quadruple2);
        Quadruple quadruple3 = new Quadruple(Node.createURI("https://plus.google.com/825349613"), Node.createURI("https://plus.google.com/107234124364605485774"), Node.createURI("http://xmlns.com/foaf/0.1/email"), Node.createLiteral("user1.new.email@company.com"));
        quadruple3.setPublicationTime(currentTimeMillis);
        this.publishProxy.publish(quadruple3);
        long currentTimeMillis2 = System.currentTimeMillis();
        Quadruple quadruple4 = new Quadruple(Node.createURI("https://plus.google.com/3283940594/2011-2012-08-30-18:13:05"), Node.createURI("https://plus.google.com/107545688688906540962"), Node.createURI("http://xmlns.com/foaf/0.1/email"), Node.createLiteral("user2@company.com"));
        quadruple4.setPublicationTime(currentTimeMillis2);
        this.publishProxy.publish(quadruple4);
        Quadruple quadruple5 = new Quadruple(Node.createURI("https://plus.google.com/124324034/2011-2012-08-30-19:04:54"), Node.createURI("https://plus.google.com/14023231238123495031/"), Node.createURI("http://xmlns.com/foaf/0.1/name"), Node.createLiteral("User 3"));
        quadruple5.setPublicationTime();
        this.publishProxy.publish(quadruple5);
        Quadruple quadruple6 = new Quadruple(Node.createURI("https://plus.google.com/3283940594/2011-2012-08-30-18:13:05"), Node.createURI("https://plus.google.com/107545688688906540962"), Node.createURI("http://xmlns.com/foaf/0.1/name"), Node.createLiteral("User 2"));
        quadruple6.setPublicationTime(currentTimeMillis2);
        this.publishProxy.publish(quadruple6);
        synchronized (bindings) {
            while (bindings.size() != 3) {
                try {
                    bindings.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test(timeout = 60000)
    public void testSubscribeSignalNotificationListener() throws InterruptedException {
        this.subscribeProxy.subscribe(new Subscription("SELECT ?g ?s ?p ?o WHERE { GRAPH ?g { ?s ?p ?o } }"), new CustomSignalNotificationListener());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(QuadrupleGenerator.random(eventId));
        }
        CompoundEvent compoundEvent = new CompoundEvent(arrayList);
        this.publishProxy.publish(compoundEvent);
        synchronized (signals) {
            while (signals.getValue() != compoundEvent.getQuadruples().size()) {
                try {
                    signals.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test(timeout = 60000)
    public void testSubscribeEventNotificationListener() throws InterruptedException {
        this.subscribeProxy.subscribe(new Subscription("SELECT ?g ?s ?p ?o WHERE { GRAPH ?g { ?s ?p ?o } }"), new CustomCompoundEventNotificationListener());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(QuadrupleGenerator.random(eventId));
        }
        this.publishProxy.publish(new CompoundEvent(arrayList));
        synchronized (events) {
            while (events.size() != 1) {
                try {
                    events.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        this.publishProxy.publish(events.get(0));
        synchronized (events) {
            while (events.size() != 2) {
                try {
                    events.wait();
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    @Test(timeout = 60000)
    public void testSubscribeEventNotificationListenerActiveObject() throws InterruptedException {
        Subscription subscription = new Subscription("SELECT ?g ?s ?p ?o WHERE { GRAPH ?g { ?s ?p ?o } }");
        CustomCompoundEventNotificationListenerActiveObject newNotificationListener = NotificationListenerFactory.newNotificationListener(CustomCompoundEventNotificationListenerActiveObject.class, new Object[0]);
        this.subscribeProxy.subscribe(subscription, newNotificationListener);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(QuadrupleGenerator.random(eventId));
        }
        this.publishProxy.publish(new CompoundEvent(arrayList));
        while (newNotificationListener.getEvents().size() != 1) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Test(timeout = 60000)
    public void testSubscribeEventNotificationListenerWeatherUsecase() throws InterruptedException {
        this.subscribeProxy.subscribe(new Subscription("SELECT ?g WHERE { GRAPH ?g { <urn:city:Nice> <urn:weather:avgtemp> ?temperature .<urn:city:Nice> <urn:weather:datetime> ?date } }"), new CustomCompoundEventNotificationListener());
        Node createURI = Node.createURI("urn:city:Nice");
        DateTime dateTime = new DateTime();
        for (int i = 0; i < 10; i++) {
            Node createURI2 = Node.createURI(((String) EventCloudProperties.EVENT_CLOUD_ID_PREFIX.getValue()) + Integer.toString(i));
            ArrayList arrayList = new ArrayList(2);
            arrayList.add(new Quadruple(createURI2, createURI, Node.createURI("urn:weather:avgtemp"), Node.createLiteral(Integer.toString(RandomUtils.nextInt(30)), XSDDatatype.XSDint)));
            arrayList.add(new Quadruple(createURI2, createURI, Node.createURI("urn:weather:datetime"), Node.createLiteral(dateTime.toString())));
            this.publishProxy.publish(new CompoundEvent(arrayList));
            dateTime = dateTime.plusDays(1);
        }
        synchronized (events) {
            while (events.size() != 10) {
                try {
                    events.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Test(timeout = 60000)
    public void testSubscribeEventNotificationListenerSimulatingNetworkCongestion() throws InterruptedException {
        this.subscribeProxy.subscribe(new Subscription("SELECT ?g ?s ?p ?o WHERE { GRAPH ?g { ?s ?p ?o } }"), new CustomCompoundEventNotificationListener());
        Thread.sleep(500L);
        long currentTimeMillis = System.currentTimeMillis();
        Quadruple quadruple = new Quadruple(eventId, eventId, PublishSubscribeConstants.EVENT_NB_QUADRUPLES_NODE, Node.createLiteral("9", XSDDatatype.XSDint));
        quadruple.setPublicationTime(currentTimeMillis);
        this.publishProxy.publish(quadruple);
        for (int i = 0; i < 4; i++) {
            Quadruple random = QuadrupleGenerator.random(eventId);
            random.setPublicationTime(currentTimeMillis);
            this.publishProxy.publish(random);
        }
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i2 = 0; i2 < 4; i2++) {
            Quadruple random2 = QuadrupleGenerator.random(eventId);
            random2.setPublicationTime(currentTimeMillis);
            this.publishProxy.publish(random2);
        }
        synchronized (events) {
            while (events.size() != 1) {
                try {
                    events.wait();
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    @After
    public void tearDown() {
        this.deployer.undeploy();
        ComponentUtils.terminateComponent(this.publishProxy);
        ComponentUtils.terminateComponent(this.subscribeProxy);
        signals.setValue(0);
        bindings.clear();
        events.clear();
    }
}
