package fr.inria.eventcloud.overlay;

import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.rdf.model.Model;
import fr.inria.eventcloud.api.responses.SparqlAskResponse;
import fr.inria.eventcloud.api.responses.SparqlConstructResponse;
import fr.inria.eventcloud.api.responses.SparqlSelectResponse;
import fr.inria.eventcloud.api.wrappers.ModelWrapper;
import fr.inria.eventcloud.api.wrappers.ResultSetWrapper;
import fr.inria.eventcloud.datastore.TransactionalTdbDatastore;
import fr.inria.eventcloud.messages.request.can.SparqlAtomicRequest;
import fr.inria.eventcloud.messages.response.can.QuadruplePatternResponse;
import fr.inria.eventcloud.reasoner.SparqlColander;
import fr.inria.eventcloud.reasoner.SparqlReasoner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.messages.request.Request;
import org.objectweb.proactive.extensions.p2p.structured.overlay.StructuredOverlay;
import org.objectweb.proactive.extensions.p2p.structured.overlay.can.CanRequestResponseManager;
import org.objectweb.proactive.extensions.p2p.structured.utils.converters.ObjectToByteConverter;

/* loaded from: input_file:fr/inria/eventcloud/overlay/SemanticRequestResponseManager.class */
public class SemanticRequestResponseManager extends CanRequestResponseManager {
    private static final long serialVersionUID = 1;
    private SparqlColander colander;
    private final ConcurrentHashMap<UUID, Future<? extends Object>> pendingResults = new ConcurrentHashMap<>();
    private ExecutorService threadPool = Executors.newFixedThreadPool(30);

    public SemanticRequestResponseManager(TransactionalTdbDatastore transactionalTdbDatastore) {
        this.colander = new SparqlColander(transactionalTdbDatastore);
    }

    public SparqlAskResponse executeSparqlAsk(String str, StructuredOverlay structuredOverlay) {
        List<QuadruplePatternResponse> dispatch = dispatch(SparqlReasoner.parse(str), structuredOverlay);
        boolean filterSparqlAsk = getColander().filterSparqlAsk(str, dispatch);
        long[] aggregateMeasurements = aggregateMeasurements(dispatch);
        return new SparqlAskResponse(aggregateMeasurements[0], aggregateMeasurements[1], aggregateMeasurements[2], aggregateMeasurements[3], Boolean.valueOf(filterSparqlAsk));
    }

    public SparqlConstructResponse executeSparqlConstruct(String str, StructuredOverlay structuredOverlay) {
        List<QuadruplePatternResponse> dispatch = dispatch(SparqlReasoner.parse(str), structuredOverlay);
        Model filterSparqlConstruct = getColander().filterSparqlConstruct(str, dispatch);
        long[] aggregateMeasurements = aggregateMeasurements(dispatch);
        return new SparqlConstructResponse(aggregateMeasurements[0], aggregateMeasurements[1], aggregateMeasurements[2], aggregateMeasurements[3], new ModelWrapper(filterSparqlConstruct));
    }

    public SparqlSelectResponse executeSparqlSelect(String str, StructuredOverlay structuredOverlay) {
        List<QuadruplePatternResponse> dispatch = dispatch(SparqlReasoner.parse(str), structuredOverlay);
        ResultSet filterSparqlSelect = getColander().filterSparqlSelect(str, dispatch);
        long[] aggregateMeasurements = aggregateMeasurements(dispatch);
        SparqlSelectResponse sparqlSelectResponse = new SparqlSelectResponse(aggregateMeasurements[0], aggregateMeasurements[1], aggregateMeasurements[2], aggregateMeasurements[3], new ResultSetWrapper(filterSparqlSelect));
        if (((Boolean) P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
            HashMap hashMap = new HashMap();
            long j = 0;
            int i = 0;
            for (int i2 = 0; i2 < dispatch.size(); i2++) {
                QuadruplePatternResponse quadruplePatternResponse = dispatch.get(i2);
                hashMap.put(quadruplePatternResponse.getInitialRequestForThisResponse(), Integer.valueOf(quadruplePatternResponse.getResult().size()));
                i += quadruplePatternResponse.getResult().size();
                for (int i3 = 0; i3 < quadruplePatternResponse.getResult().size(); i3++) {
                    j += responseToBytes(quadruplePatternResponse.getResult().get(i3));
                }
            }
            sparqlSelectResponse.setMapSubQueryNbResults(hashMap);
            sparqlSelectResponse.setNbIntermediateResults(i);
            sparqlSelectResponse.setSizeOfIntermediateResultsInBytes(j);
        }
        return sparqlSelectResponse;
    }

    private long[] aggregateMeasurements(List<QuadruplePatternResponse> list) {
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (QuadruplePatternResponse quadruplePatternResponse : list) {
            if (quadruplePatternResponse.getLatency() > j2) {
                j2 = quadruplePatternResponse.getLatency();
            }
            j += quadruplePatternResponse.getOutboundHopCount();
            j3 += quadruplePatternResponse.getActionTime();
        }
        return new long[]{j, j, j2, j3};
    }

    private List<QuadruplePatternResponse> dispatch(List<SparqlAtomicRequest> list, final StructuredOverlay structuredOverlay) {
        final List<QuadruplePatternResponse> synchronizedList = Collections.synchronizedList(new ArrayList(list.size()));
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (final SparqlAtomicRequest sparqlAtomicRequest : list) {
            getThreadPool().execute(new Runnable() { // from class: fr.inria.eventcloud.overlay.SemanticRequestResponseManager.1
                @Override // java.lang.Runnable
                public void run() {
                    if (((Boolean) P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue()).booleanValue()) {
                        QuadruplePatternResponse dispatch = SemanticRequestResponseManager.this.dispatch((Request) sparqlAtomicRequest, structuredOverlay);
                        dispatch.setInitialRequestForThisResponse(sparqlAtomicRequest.getQuery());
                        synchronizedList.add(dispatch);
                    } else {
                        synchronizedList.add(SemanticRequestResponseManager.this.dispatch((Request) sparqlAtomicRequest, structuredOverlay));
                    }
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return synchronizedList;
    }

    public ConcurrentHashMap<UUID, Future<? extends Object>> getPendingResults() {
        return this.pendingResults;
    }

    public ExecutorService getThreadPool() {
        return this.threadPool;
    }

    public SparqlColander getColander() {
        return this.colander;
    }

    public int responseToBytes(Object obj) {
        try {
            return ObjectToByteConverter.convert(obj).length;
        } catch (IOException e) {
            throw new IllegalStateException();
        }
    }

    public void close() throws IOException {
        this.colander.close();
        this.threadPool.shutdown();
    }
}
