package io.smallrye.reactive.messaging.providers.helpers;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

/* loaded from: input_file:io/smallrye/reactive/messaging/providers/helpers/PausablePollingStream.class */
public class PausablePollingStream<P, T> {
    private static final int STATE_NEW = 0;
    private static final int STATE_POLLING = 1;
    private static final int STATE_PAUSED = 2;
    private static final int STATE_CANCELLED = 3;
    private final AtomicInteger state = new AtomicInteger(STATE_NEW);
    private final Queue<T> queue;
    private final ScheduledExecutorService pollerExecutor;
    private final String channel;
    private final int maxQueueSize;
    private final Uni<P> pollUni;
    private final UnicastProcessor<T> processor;
    private final Multi<T> stream;
    private final int halfMaxQueueSize;
    private final boolean pauseResumeEnabled;

    public PausablePollingStream(String str, Uni<P> uni, BiConsumer<P, Flow.Processor<T, T>> biConsumer, ScheduledExecutorService scheduledExecutorService, int i, boolean z) {
        this.channel = str;
        this.maxQueueSize = i;
        this.halfMaxQueueSize = i / STATE_PAUSED;
        this.pollerExecutor = scheduledExecutorService;
        this.pauseResumeEnabled = z;
        this.queue = Queues.createSpscUnboundedArrayQueue(i);
        this.processor = UnicastProcessor.create(this.queue, (Runnable) null);
        this.pollUni = Uni.createFrom().deferred(() -> {
            return this.state.get() != STATE_POLLING ? Uni.createFrom().nullItem() : uni.onItem().invoke(obj -> {
                biConsumer.accept(obj, this.processor);
            });
        });
        this.stream = this.processor.onRequest().invoke(j -> {
            if (this.state.compareAndSet(STATE_NEW, STATE_POLLING)) {
                poll();
            }
        });
    }

    public Multi<T> getStream() {
        return this.stream;
    }

    private void poll() {
        int i = this.state.get();
        if (i == STATE_CANCELLED || i == 0) {
            return;
        }
        if (this.pauseResumeEnabled) {
            pauseResume();
        }
        this.pollUni.subscribe().with(obj -> {
            if (obj == null) {
                executeWithDelay(this::poll, Duration.ofMillis(2L)).subscribe().with((v1) -> {
                    emptyConsumer(v1);
                }, this::report);
            } else {
                runOnRequestThread(this::poll).subscribe().with((v1) -> {
                    emptyConsumer(v1);
                }, this::report);
            }
        }, this::report);
    }

    private <I> void emptyConsumer(I i) {
    }

    private void report(Throwable th) {
        int i;
        do {
            i = this.state.get();
            if (i == STATE_CANCELLED) {
                return;
            }
        } while (!this.state.compareAndSet(i, STATE_CANCELLED));
        this.processor.onError(th);
    }

    private Uni<Void> runOnRequestThread(Runnable runnable) {
        return Uni.createFrom().voidItem().invoke(runnable).runSubscriptionOn(this.pollerExecutor);
    }

    private Uni<Void> executeWithDelay(Runnable runnable, Duration duration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            this.pollerExecutor.schedule(() -> {
                try {
                    runnable.run();
                    uniEmitter.complete((Object) null);
                } catch (Exception e) {
                    uniEmitter.fail(e);
                }
            }, duration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private void pauseResume() {
        int size = this.queue.size();
        if (size >= this.maxQueueSize && this.state.compareAndSet(STATE_POLLING, STATE_PAUSED)) {
            ProviderLogging.log.pausingRequestingMessages(this.channel, size, this.maxQueueSize);
        } else {
            if (size > this.halfMaxQueueSize || !this.state.compareAndSet(STATE_PAUSED, STATE_POLLING)) {
                return;
            }
            ProviderLogging.log.resumingRequestingMessages(this.channel, size, this.halfMaxQueueSize);
        }
    }
}
