package org.apache.paimon.fs;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.utils.FixLenByteArrayOutputStream;
import org.apache.paimon.utils.ThreadUtils;

/* loaded from: input_file:org/apache/paimon/fs/AsyncPositionOutputStream.class */
public class AsyncPositionOutputStream extends PositionOutputStream {
    public static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(ThreadUtils.newDaemonThreadFactory("AsyncOutputStream"));
    public static final int AWAIT_TIMEOUT_SECONDS = 10;
    public static final int BUFFER_SIZE = 65536;
    public static final int MAX_BUFFER = 1024;
    private final PositionOutputStream out;
    private int totalBuffers;
    private boolean closed = false;
    private final LinkedBlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<AsyncEvent> eventQueue = new LinkedBlockingQueue<>();
    private final AtomicReference<Throwable> exception = new AtomicReference<>();
    private long position = 0;
    private final Future<?> future = EXECUTOR_SERVICE.submit(this::execute);
    private final FixLenByteArrayOutputStream buffer = new FixLenByteArrayOutputStream();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/fs/AsyncPositionOutputStream$AsyncEvent.class */
    public interface AsyncEvent {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/fs/AsyncPositionOutputStream$DataEvent.class */
    public static class DataEvent implements AsyncEvent {
        private final byte[] data;
        private final int length;

        public DataEvent(byte[] bArr, int i) {
            this.data = bArr;
            this.length = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/fs/AsyncPositionOutputStream$EndEvent.class */
    public static class EndEvent implements AsyncEvent {
        private EndEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/fs/AsyncPositionOutputStream$FlushEvent.class */
    public static class FlushEvent implements AsyncEvent {
        private final CountDownLatch latch;

        private FlushEvent() {
            this.latch = new CountDownLatch(1);
        }
    }

    public AsyncPositionOutputStream(PositionOutputStream positionOutputStream) {
        this.out = positionOutputStream;
        this.buffer.setBuffer(new byte[65536]);
        this.totalBuffers = 1;
    }

    @VisibleForTesting
    LinkedBlockingQueue<byte[]> getBufferQueue() {
        return this.bufferQueue;
    }

    private void execute() {
        try {
            doWork();
        } catch (Throwable th) {
            this.exception.set(th);
            throw new RuntimeException(th);
        }
    }

    private void doWork() throws InterruptedException, IOException {
        while (true) {
            try {
                AsyncEvent poll = this.eventQueue.poll(10L, TimeUnit.SECONDS);
                if (poll != null) {
                    if (poll instanceof EndEvent) {
                        return;
                    }
                    if (poll instanceof DataEvent) {
                        DataEvent dataEvent = (DataEvent) poll;
                        this.out.write(dataEvent.data, 0, dataEvent.length);
                        this.bufferQueue.add(dataEvent.data);
                    }
                    if (poll instanceof FlushEvent) {
                        this.out.flush();
                        ((FlushEvent) poll).latch.countDown();
                    }
                }
            } finally {
                this.out.close();
            }
        }
    }

    @Override // org.apache.paimon.fs.PositionOutputStream
    public long getPos() throws IOException {
        checkException();
        return this.position;
    }

    /* JADX WARN: Removed duplicated region for block: B:15:0x0070  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void flushBuffer() throws java.io.IOException {
        /*
            r6 = this;
            r0 = r6
            org.apache.paimon.utils.FixLenByteArrayOutputStream r0 = r0.buffer
            int r0 = r0.getCount()
            if (r0 != 0) goto Lb
            return
        Lb:
            r0 = r6
            org.apache.paimon.fs.AsyncPositionOutputStream$DataEvent r1 = new org.apache.paimon.fs.AsyncPositionOutputStream$DataEvent
            r2 = r1
            r3 = r6
            org.apache.paimon.utils.FixLenByteArrayOutputStream r3 = r3.buffer
            byte[] r3 = r3.getBuffer()
            r4 = r6
            org.apache.paimon.utils.FixLenByteArrayOutputStream r4 = r4.buffer
            int r4 = r4.getCount()
            r2.<init>(r3, r4)
            r0.putEvent(r1)
            r0 = r6
            int r0 = r0.totalBuffers
            r1 = 1024(0x400, float:1.435E-42)
            if (r0 < r1) goto L61
        L2e:
            r0 = r6
            r0.checkException()
            r0 = r6
            java.util.concurrent.LinkedBlockingQueue<byte[]> r0 = r0.bufferQueue     // Catch: java.lang.InterruptedException -> L4d
            r1 = 10
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.InterruptedException -> L4d
            java.lang.Object r0 = r0.poll(r1, r2)     // Catch: java.lang.InterruptedException -> L4d
            byte[] r0 = (byte[]) r0     // Catch: java.lang.InterruptedException -> L4d
            r7 = r0
            r0 = r7
            if (r0 == 0) goto L4a
            goto L6c
        L4a:
            goto L2e
        L4d:
            r8 = move-exception
            r0 = r6
            r0.sendEndEvent()
            java.lang.Thread r0 = java.lang.Thread.currentThread()
            r0.interrupt()
            java.lang.RuntimeException r0 = new java.lang.RuntimeException
            r1 = r0
            r2 = r8
            r1.<init>(r2)
            throw r0
        L61:
            r0 = r6
            java.util.concurrent.LinkedBlockingQueue<byte[]> r0 = r0.bufferQueue
            java.lang.Object r0 = r0.poll()
            byte[] r0 = (byte[]) r0
            r7 = r0
        L6c:
            r0 = r7
            if (r0 != 0) goto L7f
            r0 = 65536(0x10000, float:9.1835E-41)
            byte[] r0 = new byte[r0]
            r7 = r0
            r0 = r6
            r1 = r0
            int r1 = r1.totalBuffers
            r2 = 1
            int r1 = r1 + r2
            r0.totalBuffers = r1
        L7f:
            r0 = r6
            org.apache.paimon.utils.FixLenByteArrayOutputStream r0 = r0.buffer
            r1 = r7
            r0.setBuffer(r1)
            r0 = r6
            org.apache.paimon.utils.FixLenByteArrayOutputStream r0 = r0.buffer
            r1 = 0
            r0.setCount(r1)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.paimon.fs.AsyncPositionOutputStream.flushBuffer():void");
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        checkException();
        this.position++;
        while (this.buffer.write((byte) i) != 1) {
            flushBuffer();
        }
    }

    @Override // org.apache.paimon.fs.PositionOutputStream, java.io.OutputStream
    public void write(byte[] bArr) throws IOException {
        write(bArr, 0, bArr.length);
    }

    @Override // org.apache.paimon.fs.PositionOutputStream, java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        checkException();
        this.position += i2;
        while (true) {
            int write = this.buffer.write(bArr, i, i2);
            i += write;
            i2 -= write;
            if (i2 == 0) {
                return;
            } else {
                flushBuffer();
            }
        }
    }

    @Override // org.apache.paimon.fs.PositionOutputStream, java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        if (this.closed) {
            throw new IOException("Already closed");
        }
        checkException();
        flushBuffer();
        FlushEvent flushEvent = new FlushEvent();
        putEvent(flushEvent);
        while (!flushEvent.latch.await(10L, TimeUnit.SECONDS)) {
            try {
                checkException();
            } catch (InterruptedException e) {
                sendEndEvent();
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.paimon.fs.PositionOutputStream, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        checkException();
        flushBuffer();
        sendEndEvent();
        try {
            this.future.get();
            this.closed = true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    private void sendEndEvent() {
        putEvent(new EndEvent());
    }

    private void putEvent(AsyncEvent asyncEvent) {
        try {
            this.eventQueue.put(asyncEvent);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void checkException() throws IOException {
        Throwable th = this.exception.get();
        if (th != null) {
            if (th instanceof IOException) {
                throw ((IOException) th);
            }
            if (!(th instanceof RuntimeException)) {
                throw new IOException(th);
            }
            throw ((RuntimeException) th);
        }
    }
}
