package net.pincette.rs;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Flow;
import net.pincette.util.ScheduledCompletionStage;
import net.pincette.util.Util;

/* loaded from: input_file:net/pincette/rs/BackpressureTimout.class */
public class BackpressureTimout<T> extends ProcessorBase<T, T> {
    private long requested;
    private Instant requestedTime = Instant.now();
    private boolean started;

    public BackpressureTimout(Duration duration) {
        checkTimeout(duration);
    }

    public static <T> Flow.Processor<T, T> backpressureTimeout(Duration duration) {
        return new BackpressureTimout(duration);
    }

    private void checkTimeout(Duration duration) {
        ScheduledCompletionStage.runAsyncAfter(() -> {
            if (finished()) {
                return;
            }
            if (this.started && this.requested == 0 && expired(duration)) {
                super.onError(new Util.GeneralException("Backpressure timed out"));
            } else {
                checkTimeout(duration);
            }
        }, duration);
    }

    @Override // net.pincette.rs.ProcessorBase
    protected void emit(long j) {
        this.started = true;
        this.requested += j;
        this.requestedTime = Instant.now();
        this.subscription.request(j);
    }

    private boolean expired(Duration duration) {
        return Duration.between(this.requestedTime, Instant.now()).compareTo(duration) > 0;
    }

    private boolean finished() {
        return completed() || cancelled() || getError();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(T t) {
        this.requested--;
        this.subscriber.onNext(t);
    }
}
