package org.apache.jena.riot.system;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.iterator.IteratorSlotted;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RDFParserBuilder;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.SysRIOT;
import org.apache.jena.sparql.core.Quad;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jena/riot/system/AsyncParser.class */
public class AsyncParser {
    static final int dftChunkSize = 100000;
    static final int dftQueueSize = 10;
    private static final int RUNNING = 0;
    private static final int ABORTING = 1;
    private static final int STOPPED = 2;
    static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
    static final ErrorHandler dftErrorHandler = createDefaultErrorhandler(LOG);
    static final List<EltStreamRDF> END = List.of();
    private static final StreamRDF alwaysFailingStreamRdf = new StreamToElements(eltStreamRDF -> {
        throw new RuntimeException(new InterruptedException());
    });
    static Function<EltStreamRDF, Triple> elt2Triple = eltStreamRDF -> {
        if (eltStreamRDF.isException()) {
            raiseException(eltStreamRDF.exception());
        }
        if (!eltStreamRDF.isQuad()) {
            return eltStreamRDF.triple();
        }
        Quad quad = eltStreamRDF.quad();
        Node graph = quad.getGraph();
        if (graph == Quad.tripleInQuad || Quad.isDefaultGraph(graph)) {
            return quad.asTriple();
        }
        return null;
    };
    static Function<EltStreamRDF, Quad> elt2Quad = eltStreamRDF -> {
        if (eltStreamRDF.isException()) {
            raiseException(eltStreamRDF.exception());
        }
        return eltStreamRDF.isTriple() ? Quad.create(Quad.defaultGraphIRI, eltStreamRDF.triple()) : eltStreamRDF.quad();
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jena/riot/system/AsyncParser$EltStreamBatcher.class */
    public static class EltStreamBatcher<T> implements Consumer<T> {
        private final int batchSize;
        private final Consumer<List<T>> batchDestination;
        private final List<T> endMarker;
        private List<T> elements = null;
        private int count = 0;

        public EltStreamBatcher(Consumer<List<T>> consumer, List<T> list, int i) {
            this.batchDestination = consumer;
            this.batchSize = i;
            this.endMarker = list;
        }

        public void startBatching() {
        }

        public void flush() {
            if (this.elements != null) {
                dispatch(this.elements);
                this.elements = null;
            }
        }

        public void finishBatching() {
            try {
                flush();
            } finally {
                dispatch(this.endMarker);
            }
        }

        private <X> boolean isEmpty(List<X> list) {
            return list == null || list.isEmpty();
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            if (this.elements == null) {
                this.elements = allocChunk();
            }
            this.elements.add(t);
            maybeDispatch();
        }

        private void maybeDispatch() {
            if (this.elements.size() < this.batchSize) {
                return;
            }
            dispatch(this.elements);
            this.elements = null;
        }

        private void dispatch(List<T> list) {
            this.count += list.size();
            this.batchDestination.accept(list);
        }

        private List<T> allocChunk() {
            return new ArrayList(this.batchSize);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jena/riot/system/AsyncParser$StreamToElements.class */
    public static class StreamToElements implements StreamRDF {
        private final Consumer<EltStreamRDF> destination;

        public StreamToElements(Consumer<EltStreamRDF> consumer) {
            this.destination = consumer;
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void start() {
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void finish() {
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void triple(Triple triple) {
            deliver(EltStreamRDF.triple(triple));
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void quad(Quad quad) {
            deliver(EltStreamRDF.quad(quad));
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void base(String str) {
            deliver(EltStreamRDF.base(str));
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void prefix(String str, String str2) {
            deliver(EltStreamRDF.prefix(str, str2));
        }

        private void deliver(EltStreamRDF eltStreamRDF) {
            this.destination.accept(eltStreamRDF);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jena/riot/system/AsyncParser$Task.class */
    public static class Task implements Runnable {
        private Logger logger;
        private List<RDFParserBuilder> sources;
        private BlockingQueue<List<EltStreamRDF>> queue;
        private int chunkSize;
        private Predicate<EltStreamRDF> prematureDispatch;
        private EltStreamBatcher<EltStreamRDF> batcher;
        private StreamRDF generatorStreamRdf;
        private AtomicInteger destinationState = new AtomicInteger(0);
        private boolean errorEncountered = false;

        public Task(List<RDFParserBuilder> list, BlockingQueue<List<EltStreamRDF>> blockingQueue, int i, Predicate<EltStreamRDF> predicate, Logger logger) {
            this.sources = list;
            this.queue = blockingQueue;
            this.chunkSize = i;
            this.prematureDispatch = predicate;
            this.logger = logger;
        }

        @Override // java.lang.Runnable
        public void run() {
            initDestination();
            try {
                start();
                int size = this.sources.size();
                for (int i = 0; i < size; i++) {
                    parse(this.sources.get(i));
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Finish parsing");
                }
            } finally {
                finish();
            }
        }

        public void abort() {
            this.destinationState.compareAndSet(0, 1);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v9, types: [void] */
        private void initDestination() {
            this.batcher = new EltStreamBatcher<>(list -> {
                if (this.destinationState.get() != 0) {
                    handleAbortingState();
                    throw new RuntimeException(new InterruptedException());
                }
                try {
                    this.queue.put(list);
                } catch (InterruptedException e) {
                    handleAbortingState();
                    throw new RuntimeException(e);
                }
            }, AsyncParser.END, this.chunkSize);
            EltStreamBatcher<EltStreamRDF> eltStreamBatcher = this.batcher;
            if (this.prematureDispatch != null) {
                eltStreamBatcher = eltStreamRDF -> {
                    boolean test = this.prematureDispatch.test(eltStreamRDF);
                    this.batcher.accept(eltStreamRDF);
                    if (test) {
                        this.batcher.flush();
                    }
                };
            }
            this.generatorStreamRdf = new StreamToElements(eltStreamBatcher);
        }

        private void start() {
            this.batcher.startBatching();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Start parsing");
            }
        }

        private void parse(RDFParserBuilder rDFParserBuilder) {
            try {
                rDFParserBuilder.parse(this.errorEncountered ? AsyncParser.alwaysFailingStreamRdf : this.generatorStreamRdf);
            } catch (RuntimeException e) {
                Throwable cause = e.getCause();
                if (this.errorEncountered) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Suppressed exception", e);
                    }
                } else {
                    if (!(cause instanceof InterruptedException)) {
                        this.batcher.accept(EltStreamRDF.exception(e));
                    } else if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Parsing was interrupted");
                    }
                    this.errorEncountered = true;
                }
            } catch (Throwable th) {
                if (!this.errorEncountered) {
                    this.batcher.accept(EltStreamRDF.exception(new RuntimeException(th)));
                    this.errorEncountered = true;
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Suppressed exception", th);
                }
            }
        }

        private void finish() {
            try {
                if (this.destinationState.get() != 2) {
                    this.batcher.finishBatching();
                }
            } catch (Throwable th) {
                if (this.errorEncountered) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Suppressed exception", th);
                    }
                } else {
                    if (!(th.getCause() instanceof InterruptedException)) {
                        throw new RuntimeException(th);
                    }
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Parsing was interrupted");
                    }
                }
            } finally {
                this.destinationState.set(2);
            }
        }

        private void handleAbortingState() {
            if (!this.destinationState.compareAndSet(1, 2)) {
                return;
            }
            while (true) {
                try {
                    this.queue.clear();
                    this.queue.put(AsyncParser.END);
                    return;
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private AsyncParser() {
    }

    public static void asyncParse(String str, StreamRDF streamRDF) {
        of(str).asyncParseSources(streamRDF);
    }

    public static void asyncParse(List<String> list, StreamRDF streamRDF) {
        ofLocations(list).asyncParseSources(streamRDF);
    }

    public static void asyncParse(InputStream inputStream, Lang lang, String str, StreamRDF streamRDF) {
        of(inputStream, lang, str).asyncParseSources(streamRDF);
    }

    public static void asyncParseSources(List<RDFParserBuilder> list, StreamRDF streamRDF) {
        ofSources(list).asyncParseSources(streamRDF);
    }

    public static IteratorCloseable<Triple> asyncParseTriples(String str) {
        return of(str).asyncParseTriples();
    }

    public static IteratorCloseable<Triple> asyncParseTriples(List<String> list) {
        return ofLocations(list).asyncParseTriples();
    }

    public static IteratorCloseable<Triple> asyncParseTriples(InputStream inputStream, Lang lang, String str) {
        return of(inputStream, lang, str).asyncParseTriples();
    }

    public static IteratorCloseable<Quad> asyncParseQuads(String str) {
        return of(str).asyncParseQuads();
    }

    public static IteratorCloseable<Quad> asyncParseQuads(List<String> list) {
        return ofLocations(list).asyncParseQuads();
    }

    public static IteratorCloseable<Quad> asyncParseQuads(InputStream inputStream, Lang lang, String str) {
        return of(inputStream, lang, str).asyncParseQuads();
    }

    public static AsyncParserBuilder of(String str) {
        Objects.requireNonNull(str);
        return ofLocations(List.of(str));
    }

    public static AsyncParserBuilder ofLocations(List<String> list) {
        Objects.requireNonNull(list);
        LOG.debug("Parse: " + String.valueOf(list));
        return new AsyncParserBuilder(urlsToSource(list));
    }

    public static AsyncParserBuilder of(InputStream inputStream, Lang lang, String str) {
        Objects.requireNonNull(inputStream);
        Objects.requireNonNull(lang);
        return ofSources(inputStreamToSource(inputStream, lang, str));
    }

    public static AsyncParserBuilder of(RDFParserBuilder rDFParserBuilder) {
        Objects.requireNonNull(rDFParserBuilder);
        return ofSources(Arrays.asList(rDFParserBuilder));
    }

    public static AsyncParserBuilder ofSources(List<RDFParserBuilder> list) {
        Objects.requireNonNull(list);
        return new AsyncParserBuilder(list);
    }

    private static List<RDFParserBuilder> urlsToSource(List<String> list) {
        return list.stream().map(str -> {
            return RDFParser.source(str).errorHandler(dftErrorHandler);
        }).toList();
    }

    private static List<RDFParserBuilder> inputStreamToSource(InputStream inputStream, Lang lang, String str) {
        return List.of(RDFParser.source(inputStream).lang(lang).base(str).errorHandler(dftErrorHandler));
    }

    private static void raiseException(Throwable th) {
        if (!(th instanceof RuntimeException)) {
            throw new RuntimeException("Encountered error element from parse thread", th);
        }
        RuntimeException runtimeException = (RuntimeException) th;
        runtimeException.addSuppressed(new RuntimeException("Encountered error element from parse thread"));
        throw runtimeException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <X> IteratorCloseable<X> blockingIterator(final Runnable runnable, final BlockingQueue<X> blockingQueue, final Predicate<X> predicate) {
        return new IteratorSlotted<X>() { // from class: org.apache.jena.riot.system.AsyncParser.1
            boolean ended = false;

            /* JADX WARN: Multi-variable type inference failed */
            protected X moveToNext() {
                try {
                    X x = null;
                    if (!this.ended) {
                        x = blockingQueue.take();
                    }
                    if (!predicate.test(x)) {
                        return x;
                    }
                    this.ended = true;
                    return null;
                } catch (InterruptedException e) {
                    this.ended = true;
                    return null;
                }
            }

            protected boolean hasMore() {
                return !this.ended;
            }

            protected void closeIterator() {
                runnable.run();
            }
        };
    }

    static ErrorHandler createDefaultErrorhandler(final Logger logger) {
        return new ErrorHandler() { // from class: org.apache.jena.riot.system.AsyncParser.2
            @Override // org.apache.jena.riot.system.ErrorHandler
            public void warning(String str, long j, long j2) {
                logger.warn(SysRIOT.fmtMessage(str, j, j2));
            }

            @Override // org.apache.jena.riot.system.ErrorHandler
            public void error(String str, long j, long j2) {
                throw new RiotException(SysRIOT.fmtMessage(str, j, j2));
            }

            @Override // org.apache.jena.riot.system.ErrorHandler
            public void fatal(String str, long j, long j2) {
                throw new RiotException(SysRIOT.fmtMessage(str, j, j2));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Runnable startParserThread(Logger logger, List<RDFParserBuilder> list, BlockingQueue<List<EltStreamRDF>> blockingQueue, int i, Predicate<EltStreamRDF> predicate, boolean z) {
        Task task = new Task(list, blockingQueue, i, predicate, logger);
        Thread thread = new Thread(task, "AsyncParser");
        thread.setDaemon(z);
        thread.start();
        return () -> {
            task.abort();
            thread.interrupt();
            try {
                thread.join();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private static void dispatch(EltStreamRDF eltStreamRDF, StreamRDF streamRDF) {
        switch (eltStreamRDF.getType()) {
            case TRIPLE:
                streamRDF.triple(eltStreamRDF.triple());
                return;
            case QUAD:
                streamRDF.quad(eltStreamRDF.quad());
                return;
            case PREFIX:
                streamRDF.prefix(eltStreamRDF.prefix(), eltStreamRDF.iri());
                return;
            case BASE:
                streamRDF.base(eltStreamRDF.iri());
                return;
            case EXCEPTION:
                raiseException(eltStreamRDF.exception());
                return;
            default:
                throw new InternalErrorException("Bad EltStreamRDF");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void receiver(Runnable runnable, Logger logger, BlockingQueue<List<EltStreamRDF>> blockingQueue, StreamRDF streamRDF) {
        int i = 0;
        while (true) {
            try {
                try {
                    List<EltStreamRDF> take = blockingQueue.take();
                    if (take == END) {
                        FmtLog.debug(logger, "Receive: END (%,d)", new Object[]{Integer.valueOf(i)});
                        return;
                    }
                    i += take.size();
                    if (LOG.isDebugEnabled()) {
                        FmtLog.debug(logger, "Receive: Batch : %,d (%,d)", new Object[]{Integer.valueOf(take.size()), Integer.valueOf(i)});
                    }
                    dispatch(take, streamRDF);
                } catch (InterruptedException e) {
                    FmtLog.error(logger, e, "Interrupted", new Object[0]);
                    runnable.run();
                    return;
                }
            } finally {
                runnable.run();
            }
        }
    }

    private static void dispatch(List<EltStreamRDF> list, StreamRDF streamRDF) {
        Iterator<EltStreamRDF> it = list.iterator();
        while (it.hasNext()) {
            dispatch(it.next(), streamRDF);
        }
    }
}
