package io.micronaut.http.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.body.ReactiveByteBufferByteBody;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;

@Internal
/* loaded from: input_file:io/micronaut/http/body/ConcatenatingSubscriber.class */
public abstract class ConcatenatingSubscriber implements BufferConsumer.Upstream, CoreSubscriber<ByteBody>, BufferConsumer {
    private long forwarded;
    private long consumed;
    private Subscription subscription;
    private boolean cancelled;
    private volatile boolean disregardBackpressure;
    private BufferConsumer.Upstream currentComponent;
    private boolean first = true;
    private boolean start = false;
    private boolean delayedSubscriberCompletion = false;
    private boolean currentComponentDone = false;

    /* loaded from: input_file:io/micronaut/http/body/ConcatenatingSubscriber$ByteBufferConcatenatingSubscriber.class */
    public static class ByteBufferConcatenatingSubscriber extends ConcatenatingSubscriber implements ReactiveByteBufferByteBody.ByteBufferConsumer {
        final ReactiveByteBufferByteBody.SharedBuffer sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(BodySizeLimits.UNLIMITED, this);

        private ByteBufferConcatenatingSubscriber() {
        }

        public static CloseableByteBody concatenate(Publisher<ByteBody> publisher) {
            ByteBufferConcatenatingSubscriber byteBufferConcatenatingSubscriber = new ByteBufferConcatenatingSubscriber();
            publisher.subscribe(byteBufferConcatenatingSubscriber);
            return new ReactiveByteBufferByteBody(byteBufferConcatenatingSubscriber.sharedBuffer);
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        protected BufferConsumer.Upstream forward(ByteBody byteBody) {
            return ByteBufferBodyAdapter.adapt(Flux.from(byteBody.mo38toByteArrayPublisher()).map(ByteBuffer::wrap)).primary(this);
        }

        @Override // io.micronaut.http.body.ReactiveByteBufferByteBody.ByteBufferConsumer
        public void add(@NonNull ByteBuffer byteBuffer) {
            onForward(byteBuffer.remaining());
            this.sharedBuffer.add(byteBuffer);
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        protected void forwardComplete() {
            this.sharedBuffer.complete();
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        protected void forwardError(Throwable th) {
            this.sharedBuffer.error(th);
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        public /* bridge */ /* synthetic */ void onNext(Object obj) {
            super.onNext((ByteBody) obj);
        }
    }

    /* loaded from: input_file:io/micronaut/http/body/ConcatenatingSubscriber$JsonByteBufferConcatenatingSubscriber.class */
    public static final class JsonByteBufferConcatenatingSubscriber extends ByteBufferConcatenatingSubscriber {
        private static final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes(StandardCharsets.UTF_8));
        private static final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes(StandardCharsets.UTF_8));
        private static final ByteBuffer SEPARATOR = ByteBuffer.wrap(",".getBytes(StandardCharsets.UTF_8));
        private static final ByteBuffer EMPTY_ARRAY = ByteBuffer.wrap("[]".getBytes(StandardCharsets.UTF_8));

        private JsonByteBufferConcatenatingSubscriber() {
        }

        public static CloseableByteBody concatenateJson(Publisher<ByteBody> publisher) {
            JsonByteBufferConcatenatingSubscriber jsonByteBufferConcatenatingSubscriber = new JsonByteBufferConcatenatingSubscriber();
            publisher.subscribe(jsonByteBufferConcatenatingSubscriber);
            return new ReactiveByteBufferByteBody(jsonByteBufferConcatenatingSubscriber.sharedBuffer);
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        protected long emitLeadingSeparator(boolean z) {
            this.sharedBuffer.add((z ? START_ARRAY : SEPARATOR).asReadOnlyBuffer());
            return 1L;
        }

        @Override // io.micronaut.http.body.ConcatenatingSubscriber
        protected long emitFinalSeparator(boolean z) {
            this.sharedBuffer.add((z ? EMPTY_ARRAY : END_ARRAY).asReadOnlyBuffer());
            return z ? 2L : 1L;
        }
    }

    public final void onSubscribe(Subscription subscription) {
        boolean z;
        boolean z2;
        synchronized (this) {
            this.subscription = subscription;
            z = this.cancelled;
            z2 = this.start;
        }
        if (z) {
            subscription.cancel();
        } else if (z2) {
            subscription.request(1L);
        }
    }

    protected long emitLeadingSeparator(boolean z) {
        return 0L;
    }

    protected long emitFinalSeparator(boolean z) {
        return 0L;
    }

    public final void onComplete() {
        synchronized (this) {
            if (this.currentComponent != null) {
                this.delayedSubscriberCompletion = true;
                return;
            }
            long emitFinalSeparator = emitFinalSeparator(this.first);
            if (emitFinalSeparator != 0) {
                synchronized (this) {
                    this.forwarded += emitFinalSeparator;
                }
            }
            forwardComplete();
        }
    }

    public final void onError(Throwable th) {
        forwardError(th);
    }

    @Nullable
    protected abstract BufferConsumer.Upstream forward(ByteBody byteBody);

    protected final void onForward(long j) {
        synchronized (this) {
            this.forwarded += j;
        }
    }

    @Override // 
    public final void onNext(ByteBody byteBody) {
        long j;
        onForward(emitLeadingSeparator(this.first));
        this.first = false;
        BufferConsumer.Upstream forward = forward(byteBody);
        if (forward == null) {
            return;
        }
        synchronized (this) {
            j = this.consumed - this.forwarded;
            this.currentComponent = forward;
        }
        forward.start();
        if (this.disregardBackpressure) {
            forward.disregardBackpressure();
        } else if (j > 0) {
            forward.onBytesConsumed(j);
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer.Upstream
    public final void start() {
        Subscription subscription;
        synchronized (this) {
            subscription = this.subscription;
            this.start = true;
        }
        if (subscription != null) {
            subscription.request(1L);
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer.Upstream
    public final void onBytesConsumed(long j) {
        long j2;
        BufferConsumer.Upstream upstream;
        boolean z;
        synchronized (this) {
            long j3 = this.consumed + j;
            if (j3 < this.consumed) {
                j3 = Long.MAX_VALUE;
            }
            j2 = j3 - this.consumed;
            this.consumed = j3;
            upstream = this.currentComponent;
            z = upstream == null && this.currentComponentDone && j3 >= this.forwarded;
        }
        if (upstream != null && j2 > 0) {
            upstream.onBytesConsumed(j);
        } else if (z) {
            this.subscription.request(1L);
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer.Upstream
    public final void allowDiscard() {
        BufferConsumer.Upstream upstream;
        Subscription subscription;
        synchronized (this) {
            upstream = this.currentComponent;
            subscription = this.subscription;
            this.cancelled = true;
        }
        if (subscription != null) {
            subscription.cancel();
        }
        if (upstream != null) {
            upstream.allowDiscard();
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer.Upstream
    public final void disregardBackpressure() {
        BufferConsumer.Upstream upstream;
        synchronized (this) {
            upstream = this.currentComponent;
            this.disregardBackpressure = true;
        }
        if (upstream != null) {
            upstream.disregardBackpressure();
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer
    public final void complete() {
        boolean z;
        boolean z2;
        synchronized (this) {
            this.currentComponent = null;
            z = this.delayedSubscriberCompletion;
            z2 = !z && (this.disregardBackpressure || this.consumed >= this.forwarded);
            this.currentComponentDone = !z2;
        }
        if (z) {
            onComplete();
        } else if (z2) {
            this.subscription.request(1L);
        }
    }

    @Override // io.micronaut.http.body.stream.BufferConsumer
    public final void error(Throwable th) {
        this.subscription.cancel();
        forwardError(th);
    }

    protected abstract void forwardComplete();

    protected abstract void forwardError(Throwable th);
}
