package io.micronaut.http.body;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.io.buffer.ByteArrayBufferFactory;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.stream.AvailableByteArrayBody;
import io.micronaut.http.body.stream.BaseSharedBuffer;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;
import io.micronaut.http.body.stream.PublisherAsBlocking;
import io.micronaut.http.body.stream.UpstreamBalancer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Internal
/* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody.class */
public final class ReactiveByteBufferByteBody implements CloseableByteBody, InternalByteBody {
    private final SharedBuffer sharedBuffer;
    private BufferConsumer.Upstream upstream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody$AsFlux.class */
    public static final class AsFlux extends BaseSharedBuffer.AsFlux<ByteBuffer> implements ByteBufferConsumer {
        AsFlux(BaseSharedBuffer<?, ?> baseSharedBuffer) {
            super(baseSharedBuffer);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.micronaut.http.body.stream.BaseSharedBuffer.AsFlux
        public int size(ByteBuffer byteBuffer) {
            return byteBuffer.remaining();
        }

        @Override // io.micronaut.http.body.ReactiveByteBufferByteBody.ByteBufferConsumer
        public void add(ByteBuffer byteBuffer) {
            add0(byteBuffer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody$ByteBufferConsumer.class */
    public interface ByteBufferConsumer extends BufferConsumer {
        void add(@NonNull ByteBuffer byteBuffer);
    }

    /* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody$SharedBuffer.class */
    public static final class SharedBuffer extends BaseSharedBuffer<ByteBufferConsumer, ByteBuffer> implements ByteBufferConsumer {
        private final ReentrantLock lock;
        private final ConcurrentLinkedQueue<Runnable> workQueue;
        private SnapshotByteArrayOutputStream buffer;
        private ByteBuffer adding;

        public SharedBuffer(BodySizeLimits bodySizeLimits, BufferConsumer.Upstream upstream) {
            super(bodySizeLimits, upstream);
            this.lock = new ReentrantLock();
            this.workQueue = new ConcurrentLinkedQueue<>();
        }

        private void submit(Runnable runnable) {
            this.workQueue.add(runnable);
            while (!this.workQueue.isEmpty() && this.lock.tryLock()) {
                try {
                    Runnable poll = this.workQueue.poll();
                    if (poll != null) {
                        poll.run();
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        }

        void reserve() {
            submit(() -> {
                this.reserve0();
            });
        }

        void subscribe(@Nullable ByteBufferConsumer byteBufferConsumer, BufferConsumer.Upstream upstream) {
            submit(() -> {
                subscribe0(byteBufferConsumer, upstream);
            });
        }

        public DelayedExecutionFlow<ByteBuffer> subscribeFull(BufferConsumer.Upstream upstream) {
            DelayedExecutionFlow<ByteBuffer> create = DelayedExecutionFlow.create();
            submit(() -> {
                subscribeFull0(create, upstream, false);
            });
            return create;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.micronaut.http.body.stream.BaseSharedBuffer
        public void forwardInitialBuffer(@Nullable ByteBufferConsumer byteBufferConsumer, boolean z) {
            if (this.buffer != null) {
                if (byteBufferConsumer != null) {
                    byteBufferConsumer.add(this.buffer.snapshot());
                }
                if (z) {
                    this.buffer = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.micronaut.http.body.stream.BaseSharedBuffer
        public ByteBuffer subscribeFullResult(boolean z) {
            if (this.buffer == null) {
                return ByteBuffer.allocate(0);
            }
            ByteBuffer snapshot = this.buffer.snapshot();
            if (z) {
                this.buffer = null;
            }
            return snapshot;
        }

        @Override // io.micronaut.http.body.stream.BaseSharedBuffer
        protected void addForward(List<ByteBufferConsumer> list) {
            Iterator<ByteBufferConsumer> it = list.iterator();
            while (it.hasNext()) {
                it.next().add(this.adding.asReadOnlyBuffer());
            }
        }

        @Override // io.micronaut.http.body.stream.BaseSharedBuffer
        protected void addBuffer() {
            if (this.buffer == null) {
                this.buffer = new SnapshotByteArrayOutputStream();
            }
            this.buffer.write(this.adding);
        }

        @Override // io.micronaut.http.body.stream.BaseSharedBuffer
        protected void discardBuffer() {
            this.buffer = null;
        }

        @Override // io.micronaut.http.body.ReactiveByteBufferByteBody.ByteBufferConsumer
        public void add(ByteBuffer byteBuffer) {
            submit(() -> {
                this.adding = byteBuffer;
                add(byteBuffer.remaining());
                this.adding = null;
            });
        }

        @Override // io.micronaut.http.body.stream.BaseSharedBuffer, io.micronaut.http.body.stream.BufferConsumer
        public void error(Throwable th) {
            submit(() -> {
                super.error(th);
            });
        }

        @Override // io.micronaut.http.body.stream.BaseSharedBuffer, io.micronaut.http.body.stream.BufferConsumer
        public void complete() {
            submit(() -> {
                super.complete();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody$SnapshotByteArrayOutputStream.class */
    public static final class SnapshotByteArrayOutputStream extends ByteArrayOutputStream {
        private SnapshotByteArrayOutputStream() {
        }

        public ByteBuffer snapshot() {
            return ByteBuffer.wrap(this.buf, 0, this.count).asReadOnlyBuffer();
        }

        public void write(ByteBuffer byteBuffer) {
            if (byteBuffer.hasArray()) {
                write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining());
                return;
            }
            byte[] bArr = new byte[byteBuffer.remaining()];
            byteBuffer.get(byteBuffer.position(), bArr);
            write(bArr, 0, bArr.length);
        }
    }

    /* loaded from: input_file:io/micronaut/http/body/ReactiveByteBufferByteBody$WorkState.class */
    private enum WorkState {
        CLEAN,
        WORKING_THEN_CLEAN,
        WORKING_THEN_DIRTY
    }

    public ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) {
        this(sharedBuffer, sharedBuffer.getRootUpstream());
    }

    private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Upstream upstream) {
        this.sharedBuffer = sharedBuffer;
        this.upstream = upstream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferConsumer.Upstream primary(ByteBufferConsumer byteBufferConsumer) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        this.sharedBuffer.subscribe(byteBufferConsumer, upstream);
        return upstream;
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    public CloseableByteBody split(@NonNull ByteBody.SplitBackpressureMode splitBackpressureMode) {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        UpstreamBalancer.UpstreamPair balancer = UpstreamBalancer.balancer(upstream, splitBackpressureMode);
        this.upstream = balancer.left();
        this.sharedBuffer.reserve();
        return new ReactiveByteBufferByteBody(this.sharedBuffer, balancer.right());
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    public OptionalLong expectedLength() {
        return this.sharedBuffer.getExpectedLength();
    }

    private Flux<ByteBuffer> toNioBufferPublisher() {
        AsFlux asFlux = new AsFlux(this.sharedBuffer);
        return asFlux.asFlux(primary(asFlux));
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    public InputStream toInputStream() {
        final PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking();
        toNioBufferPublisher().subscribe(publisherAsBlocking);
        return new InputStream() { // from class: io.micronaut.http.body.ReactiveByteBufferByteBody.1
            private ByteBuffer buffer;

            @Override // java.io.InputStream
            public int read() throws IOException {
                byte[] bArr = new byte[1];
                if (read(bArr) == -1) {
                    return -1;
                }
                return bArr[0] & 255;
            }

            @Override // java.io.InputStream
            public int read(byte[] bArr, int i, int i2) throws IOException {
                while (this.buffer == null) {
                    try {
                        ByteBuffer byteBuffer = (ByteBuffer) publisherAsBlocking.take();
                        if (byteBuffer == null) {
                            Throwable failure = publisherAsBlocking.getFailure();
                            if (failure == null) {
                                return -1;
                            }
                            throw new IOException(failure);
                        }
                        if (byteBuffer.hasRemaining()) {
                            this.buffer = byteBuffer;
                        }
                    } catch (InterruptedException e) {
                        throw new InterruptedIOException();
                    }
                }
                int min = Math.min(i2, this.buffer.remaining());
                this.buffer.get(bArr, i, min);
                if (this.buffer.remaining() == 0) {
                    this.buffer = null;
                }
                return min;
            }

            @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                publisherAsBlocking.close();
            }
        };
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    /* renamed from: toByteArrayPublisher, reason: merged with bridge method [inline-methods] */
    public Flux<byte[]> mo38toByteArrayPublisher() {
        return toNioBufferPublisher().map(ReactiveByteBufferByteBody::toByteArray);
    }

    private static byte[] toByteArray(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        return bArr;
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    public Publisher<io.micronaut.core.io.buffer.ByteBuffer<?>> toByteBufferPublisher() {
        Flux<byte[]> mo38toByteArrayPublisher = mo38toByteArrayPublisher();
        ByteArrayBufferFactory byteArrayBufferFactory = ByteArrayBufferFactory.INSTANCE;
        Objects.requireNonNull(byteArrayBufferFactory);
        return mo38toByteArrayPublisher.map(byteArrayBufferFactory::wrap);
    }

    @Override // io.micronaut.http.body.ByteBody
    @NonNull
    public CloseableByteBody move() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        return new ReactiveByteBufferByteBody(this.sharedBuffer, upstream);
    }

    @Override // io.micronaut.http.body.InternalByteBody
    @NonNull
    public ExecutionFlow<? extends CloseableAvailableByteBody> bufferFlow() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.start();
        upstream.onBytesConsumed(Long.MAX_VALUE);
        return this.sharedBuffer.subscribeFull(upstream).map(byteBuffer -> {
            return AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, toByteArray(byteBuffer));
        });
    }

    @Override // io.micronaut.http.body.CloseableByteBody, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            return;
        }
        this.upstream = null;
        BaseSharedBuffer.logClaim();
        upstream.allowDiscard();
        upstream.disregardBackpressure();
        upstream.start();
        this.sharedBuffer.subscribe(null, upstream);
    }

    @Override // io.micronaut.http.body.CloseableByteBody, io.micronaut.http.body.ByteBody
    @NonNull
    public CloseableByteBody allowDiscard() {
        BufferConsumer.Upstream upstream = this.upstream;
        if (upstream == null) {
            BaseSharedBuffer.failClaim();
        }
        upstream.allowDiscard();
        return this;
    }
}
