package io.micronaut.http.body.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.exceptions.BufferLengthExceededException;
import io.micronaut.http.exceptions.ContentLengthExceededException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Contract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Internal
/* loaded from: input_file:io/micronaut/http/body/stream/BaseSharedBuffer.class */
public abstract class BaseSharedBuffer<C extends BufferConsumer, F> {
    private static final Class<ByteBody> SPLIT_LOG_CLASS;
    private static final Logger SPLIT_LOG;
    private final BodySizeLimits limits;
    private final BufferConsumer.Upstream rootUpstream;
    private boolean complete;
    private Throwable error;
    private List<C> subscribers;
    private List<DelayedExecutionFlow<F>> fullSubscribers;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int reserved = 1;
    private boolean working = false;
    private long lengthSoFar = 0;
    private volatile long expectedLength = -1;

    /* loaded from: input_file:io/micronaut/http/body/stream/BaseSharedBuffer$AsFlux.class */
    public static abstract class AsFlux<B> implements BufferConsumer {
        private final BaseSharedBuffer<?, ?> sharedBuffer;
        private final AtomicLong unconsumed = new AtomicLong(0);
        private final Sinks.Many<B> sink = Sinks.many().unicast().onBackpressureBuffer();

        public AsFlux(BaseSharedBuffer<?, ?> baseSharedBuffer) {
            this.sharedBuffer = baseSharedBuffer;
        }

        protected abstract int size(B b);

        public final boolean add0(B b) {
            long addAndGet = this.unconsumed.addAndGet(size(b));
            if (addAndGet <= this.sharedBuffer.getLimits().maxBufferSize()) {
                return this.sink.tryEmitNext(b) == Sinks.EmitResult.OK;
            }
            this.sink.tryEmitError(new BufferLengthExceededException(this.sharedBuffer.getLimits().maxBufferSize(), addAndGet));
            return false;
        }

        @Override // io.micronaut.http.body.stream.BufferConsumer
        public final void complete() {
            this.sink.tryEmitComplete();
        }

        @Override // io.micronaut.http.body.stream.BufferConsumer
        public final void error(Throwable th) {
            this.sink.tryEmitError(th);
        }

        public final Flux<B> asFlux(BufferConsumer.Upstream upstream) {
            return this.sink.asFlux().doOnSubscribe(subscription -> {
                upstream.start();
            }).doOnNext(obj -> {
                int size = size(obj);
                this.unconsumed.addAndGet(-size);
                upstream.onBytesConsumed(size);
            }).doOnCancel(() -> {
                upstream.allowDiscard();
                upstream.disregardBackpressure();
            });
        }
    }

    public BaseSharedBuffer(BodySizeLimits bodySizeLimits, BufferConsumer.Upstream upstream) {
        this.limits = bodySizeLimits;
        this.rootUpstream = upstream;
    }

    @Contract("-> fail")
    public static void failClaim() {
        throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for " + SPLIT_LOG_CLASS.getName() + ".");
    }

    public static void logClaim() {
        if (SPLIT_LOG.isTraceEnabled()) {
            SPLIT_LOG.trace("Body split at this location. This is not an error, but may aid in debugging other errors", new Exception());
        }
    }

    public final OptionalLong getExpectedLength() {
        long j = this.expectedLength;
        return j < 0 ? OptionalLong.empty() : OptionalLong.of(j);
    }

    public final BodySizeLimits getLimits() {
        return this.limits;
    }

    public final BufferConsumer.Upstream getRootUpstream() {
        return this.rootUpstream;
    }

    public final void setExpectedLengthFrom(String str) {
        if (str == null) {
            return;
        }
        try {
            long parseLong = Long.parseLong(str);
            if (parseLong < 0) {
                return;
            }
            if (parseLong > this.limits.maxBodySize()) {
                error(new ContentLengthExceededException(this.limits.maxBodySize(), parseLong));
            }
            setExpectedLength(parseLong);
        } catch (NumberFormatException e) {
        }
    }

    public final void setExpectedLength(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("Should be > 0");
        }
        this.expectedLength = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reserve0() {
        if (this.reserved == 0) {
            throw new IllegalStateException("Cannot go from streaming state back to buffering state");
        }
        this.reserved++;
    }

    protected abstract void forwardInitialBuffer(@Nullable C c, boolean z);

    protected void afterSubscribe(boolean z) {
    }

    protected abstract F subscribeFullResult(boolean z);

    /* JADX INFO: Access modifiers changed from: protected */
    public final void subscribe0(@Nullable C c, BufferConsumer.Upstream upstream) {
        if (!$assertionsDisabled && this.working) {
            throw new AssertionError();
        }
        if (this.reserved == 0) {
            throw new IllegalStateException("Need to reserve a spot first");
        }
        this.working = true;
        int i = this.reserved - 1;
        this.reserved = i;
        boolean z = i == 0;
        if (c != null) {
            if (this.subscribers == null) {
                this.subscribers = new ArrayList(1);
            }
            this.subscribers.add(c);
            forwardInitialBuffer(c, z);
            if (this.error != null) {
                c.error(this.error);
            } else if (this.lengthSoFar > this.limits.maxBufferSize()) {
                c.error(new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar));
                upstream.allowDiscard();
            }
            if (this.complete) {
                c.complete();
            }
        } else {
            forwardInitialBuffer(null, z);
        }
        afterSubscribe(z);
        this.working = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ExecutionFlow<F> subscribeFull0(DelayedExecutionFlow<F> delayedExecutionFlow, BufferConsumer.Upstream upstream, boolean z) {
        if (!$assertionsDisabled && this.working) {
            throw new AssertionError();
        }
        if (this.reserved <= 0) {
            throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it");
        }
        DelayedExecutionFlow<F> delayedExecutionFlow2 = delayedExecutionFlow;
        this.working = true;
        int i = this.reserved - 1;
        this.reserved = i;
        boolean z2 = i == 0;
        Throwable th = this.error;
        if (th == null && this.lengthSoFar > this.limits.maxBufferSize()) {
            th = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
            upstream.allowDiscard();
        }
        if (th != null) {
            if (z) {
                delayedExecutionFlow2 = ExecutionFlow.error(th);
            } else {
                delayedExecutionFlow.completeExceptionally(th);
            }
        } else if (this.complete) {
            F subscribeFullResult = subscribeFullResult(z2);
            if (z) {
                delayedExecutionFlow2 = ExecutionFlow.just(subscribeFullResult);
            } else {
                delayedExecutionFlow.complete(subscribeFullResult);
            }
        } else {
            if (this.fullSubscribers == null) {
                this.fullSubscribers = new ArrayList(1);
            }
            this.fullSubscribers.add(delayedExecutionFlow);
        }
        afterSubscribe(z2);
        this.working = false;
        return delayedExecutionFlow2;
    }

    protected abstract void addForward(List<C> list);

    protected void addDoNotBuffer() {
    }

    protected abstract void addBuffer();

    protected abstract void discardBuffer();

    /* JADX INFO: Access modifiers changed from: protected */
    public final void add(int i) {
        if (!$assertionsDisabled && this.working) {
            throw new AssertionError();
        }
        long j = this.lengthSoFar + i;
        long j2 = this.expectedLength;
        if (j2 != -1 && j > j2) {
            throw new IllegalStateException("Received more bytes than specified by Content-Length");
        }
        this.lengthSoFar = j;
        if (this.complete || this.error != null) {
            addDoNotBuffer();
            return;
        }
        if (j > this.limits.maxBodySize()) {
            addDoNotBuffer();
            error(new ContentLengthExceededException(this.limits.maxBodySize(), j));
            this.rootUpstream.allowDiscard();
            return;
        }
        this.working = true;
        if (this.subscribers != null) {
            addForward(this.subscribers);
        }
        if (this.reserved <= 0 && this.fullSubscribers == null) {
            addDoNotBuffer();
        } else if (j > this.limits.maxBufferSize()) {
            addDoNotBuffer();
            discardBuffer();
            if (this.fullSubscribers != null) {
                BufferLengthExceededException bufferLengthExceededException = new BufferLengthExceededException(this.limits.maxBufferSize(), this.lengthSoFar);
                Iterator<DelayedExecutionFlow<F>> it = this.fullSubscribers.iterator();
                while (it.hasNext()) {
                    it.next().completeExceptionally(bufferLengthExceededException);
                }
            }
        } else {
            addBuffer();
        }
        this.working = false;
    }

    public void complete() {
        if (this.expectedLength > this.lengthSoFar) {
            throw new IllegalStateException("Received fewer bytes than specified by Content-Length");
        }
        this.complete = true;
        this.expectedLength = this.lengthSoFar;
        if (this.subscribers != null) {
            Iterator<C> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().complete();
            }
        }
        if (this.fullSubscribers != null) {
            boolean z = this.reserved <= 0;
            Iterator<DelayedExecutionFlow<F>> it2 = this.fullSubscribers.iterator();
            while (it2.hasNext()) {
                it2.next().complete(subscribeFullResult(z && !it2.hasNext()));
            }
        }
    }

    public void error(Throwable th) {
        if (this.error != null) {
            this.error.addSuppressed(th);
            return;
        }
        this.error = th;
        discardBuffer();
        if (this.subscribers != null) {
            Iterator<C> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().error(th);
            }
        }
        if (this.fullSubscribers != null) {
            Iterator<DelayedExecutionFlow<F>> it2 = this.fullSubscribers.iterator();
            while (it2.hasNext()) {
                it2.next().completeExceptionally(th);
            }
        }
    }

    static {
        $assertionsDisabled = !BaseSharedBuffer.class.desiredAssertionStatus();
        SPLIT_LOG_CLASS = ByteBody.class;
        SPLIT_LOG = LoggerFactory.getLogger(SPLIT_LOG_CLASS);
    }
}
