package org.osgi.util.pushstream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/osgi/util/pushstream/SimplePushEventSourceImpl.class */
public class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent<? extends T>>> implements SimplePushEventSource<T> {
    private final PromiseFactory promiseFactory;
    private final PromiseFactory sameThread;
    private final QueuePolicy<T, U> queuePolicy;
    private final U queue;
    private final int parallelism;
    private final Semaphore semaphore;
    private final Runnable onClose;
    private boolean waitForFinishes;
    private final Object lock = new Object();
    private final List<PushEventConsumer<? super T>> connected = new ArrayList();
    private boolean closed = false;
    private Deferred<Void> connectPromise = null;

    public SimplePushEventSourceImpl(PromiseFactory promiseFactory, QueuePolicy<T, U> queuePolicy, U u, int i, Runnable runnable) {
        this.promiseFactory = promiseFactory;
        this.sameThread = new PromiseFactory(PromiseFactory.inlineExecutor(), promiseFactory.scheduledExecutor());
        this.queuePolicy = queuePolicy;
        this.queue = u;
        this.parallelism = i;
        this.semaphore = new Semaphore(i);
        this.onClose = runnable;
    }

    @Override // org.osgi.util.pushstream.PushEventSource
    public AutoCloseable open(PushEventConsumer<? super T> pushEventConsumer) throws Exception {
        Deferred<Void> deferred;
        synchronized (this.lock) {
            if (this.closed) {
                throw new IllegalStateException("This PushEventConsumer is closed");
            }
            deferred = this.connectPromise;
            this.connectPromise = null;
            this.connected.add(pushEventConsumer);
        }
        if (deferred != null) {
            deferred.resolve(null);
        }
        return () -> {
            closeConsumer(pushEventConsumer, PushEvent.close());
        };
    }

    private void closeConsumer(PushEventConsumer<? super T> pushEventConsumer, PushEvent<T> pushEvent) {
        boolean remove;
        synchronized (this.lock) {
            remove = this.connected.remove(pushEventConsumer);
        }
        if (remove) {
            doSend(pushEventConsumer, pushEvent);
        }
    }

    private void doSend(PushEventConsumer<? super T> pushEventConsumer, PushEvent<T> pushEvent) {
        try {
            this.promiseFactory.executor().execute(() -> {
                safePush(pushEventConsumer, pushEvent);
            });
        } catch (RejectedExecutionException e) {
            if (pushEvent.isTerminal()) {
                safePush(pushEventConsumer, pushEvent);
            } else {
                close(PushEvent.error(e));
            }
        }
    }

    private Promise<Long> doSendWithBackPressure(PushEventConsumer<? super T> pushEventConsumer, PushEvent<T> pushEvent) {
        Deferred deferred = this.sameThread.deferred();
        try {
            this.promiseFactory.executor().execute(() -> {
                deferred.resolve(Long.valueOf(System.nanoTime() + safePush(pushEventConsumer, pushEvent)));
            });
        } catch (RejectedExecutionException e) {
            if (pushEvent.isTerminal()) {
                deferred.resolve(Long.valueOf(System.nanoTime() + safePush(pushEventConsumer, pushEvent)));
            } else {
                close(PushEvent.error(e));
                deferred.resolve(Long.valueOf(System.nanoTime()));
            }
        }
        return deferred.getPromise();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private long safePush(PushEventConsumer<? super T> pushEventConsumer, PushEvent<T> pushEvent) {
        try {
            long accept = pushEventConsumer.accept(pushEvent) * 1000000;
            if (accept < 0 && !pushEvent.isTerminal()) {
                closeConsumer(pushEventConsumer, PushEvent.close());
                return -1L;
            }
            if (pushEvent.isTerminal()) {
                return -1L;
            }
            return accept;
        } catch (Exception e) {
            if (pushEvent.isTerminal()) {
                return -1L;
            }
            closeConsumer(pushEventConsumer, PushEvent.error(e));
            return -1L;
        }
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource, java.lang.AutoCloseable
    public void close() {
        close(PushEvent.close());
    }

    private void close(PushEvent<T> pushEvent) {
        List emptyList;
        Deferred<Void> deferred = null;
        synchronized (this.lock) {
            if (this.closed) {
                emptyList = Collections.emptyList();
            } else {
                this.closed = true;
                emptyList = new ArrayList(this.connected);
                this.connected.clear();
                this.queue.clear();
                if (this.connectPromise != null) {
                    deferred = this.connectPromise;
                    this.connectPromise = null;
                }
            }
        }
        emptyList.stream().forEach(pushEventConsumer -> {
            doSend(pushEventConsumer, pushEvent);
        });
        if (deferred != null) {
            deferred.resolveWith(closedConnectPromise());
        }
        this.onClose.run();
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void publish(T t) {
        enqueueEvent(PushEvent.data(t));
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void endOfStream() {
        enqueueEvent(PushEvent.close());
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void error(Throwable th) {
        enqueueEvent(PushEvent.error(th));
    }

    private void enqueueEvent(PushEvent<T> pushEvent) {
        boolean z;
        synchronized (this.lock) {
            if (this.closed || this.connected.isEmpty()) {
                return;
            }
            try {
                this.queuePolicy.doOffer(this.queue, pushEvent);
                synchronized (this.lock) {
                    z = !this.waitForFinishes && this.semaphore.tryAcquire();
                }
                if (z) {
                    startWorker();
                }
            } catch (Exception e) {
                close(PushEvent.error(e));
                throw new IllegalStateException("The queue policy threw an exception", e);
            }
        }
    }

    private void startWorker() {
        this.promiseFactory.executor().execute(() -> {
            boolean z;
            while (true) {
                try {
                    synchronized (this.lock) {
                        if (this.waitForFinishes) {
                            this.semaphore.release();
                            while (this.waitForFinishes) {
                                this.lock.notifyAll();
                                this.lock.wait();
                            }
                            this.semaphore.acquire();
                        }
                        PushEvent<T> pushEvent = (PushEvent) this.queue.poll();
                        if (pushEvent == null) {
                            break;
                        }
                        if (this.connected.isEmpty()) {
                            this.queue.clear();
                            break;
                        }
                        ArrayList arrayList = new ArrayList(this.connected);
                        if (pushEvent.isTerminal()) {
                            this.waitForFinishes = true;
                            z = true;
                            this.connected.clear();
                            while (!this.semaphore.tryAcquire(this.parallelism - 1)) {
                                this.lock.wait();
                            }
                        } else {
                            z = false;
                        }
                        Promise<Long> deliver = deliver(arrayList, pushEvent);
                        if (!deliver.isDone()) {
                            boolean z2 = z;
                            deliver.then(promise -> {
                                handleReset(z2);
                                long longValue = ((Long) promise.getValue()).longValue() - System.nanoTime();
                                if (longValue > 0) {
                                    this.promiseFactory.scheduledExecutor().schedule(this::startWorker, longValue, TimeUnit.NANOSECONDS);
                                } else {
                                    startWorker();
                                }
                                return promise;
                            }, promise2 -> {
                                close(PushEvent.error(promise2.getFailure()));
                            });
                            return;
                        } else {
                            handleReset(z);
                            long longValue = deliver.getValue().longValue() - System.nanoTime();
                            if (longValue > 0) {
                                this.promiseFactory.scheduledExecutor().schedule(this::startWorker, longValue, TimeUnit.NANOSECONDS);
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    close(PushEvent.error(e));
                }
            }
            if (this.queue.peek() == null || !this.semaphore.tryAcquire()) {
                return;
            }
            try {
                startWorker();
            } catch (Exception e2) {
                close(PushEvent.error(e2));
            }
        });
    }

    private void handleReset(boolean z) {
        if (z) {
            synchronized (this.lock) {
                this.waitForFinishes = false;
                this.lock.notifyAll();
            }
        }
    }

    private Promise<Long> deliver(List<PushEventConsumer<? super T>> list, PushEvent<T> pushEvent) {
        if (list.size() == 1) {
            return doCall(pushEvent, list.get(0));
        }
        return this.sameThread.all((List) list.stream().map(pushEventConsumer -> {
            return this.semaphore.tryAcquire() ? doSendWithBackPressure(pushEventConsumer, pushEvent).onResolve(() -> {
                this.semaphore.release();
            }) : doCall(pushEvent, pushEventConsumer);
        }).collect(Collectors.toList())).map(list2 -> {
            return (Long) list2.stream().max((v0, v1) -> {
                return v0.compareTo(v1);
            }).orElseGet(() -> {
                return Long.valueOf(System.nanoTime());
            });
        });
    }

    private Promise<Long> doCall(PushEvent<T> pushEvent, PushEventConsumer<? super T> pushEventConsumer) {
        return this.sameThread.resolved(Long.valueOf(System.nanoTime() + safePush(pushEventConsumer, pushEvent)));
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public boolean isConnected() {
        boolean z;
        synchronized (this.lock) {
            z = !this.connected.isEmpty();
        }
        return z;
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public Promise<Void> connectPromise() {
        synchronized (this.lock) {
            if (this.closed) {
                return closedConnectPromise();
            }
            if (!this.connected.isEmpty()) {
                return this.promiseFactory.resolved(null);
            }
            if (this.connectPromise == null) {
                this.connectPromise = this.promiseFactory.deferred();
            }
            return this.connectPromise.getPromise();
        }
    }

    private Promise<Void> closedConnectPromise() {
        return this.promiseFactory.failed(new IllegalStateException("This SimplePushEventSource is closed"));
    }
}
