package org.apache.paimon.flink.sorter;

import java.util.stream.IntStream;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;

/* loaded from: input_file:org/apache/paimon/flink/sorter/SortOperator.class */
public class SortOperator extends TableStreamOperator<InternalRow> implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {
    private final RowType keyType;
    private final RowType rowType;
    private final long maxMemory;
    private final int pageSize;
    private final int arity;
    private final int spillSortMaxNumFiles;
    private final CompressOptions spillCompression;
    private final int sinkParallelism;
    private final MemorySize maxDiskSize;
    private transient BinaryExternalSortBuffer buffer;
    private transient IOManager ioManager;

    public SortOperator(RowType rowType, RowType rowType2, long j, int i, int i2, CompressOptions compressOptions, int i3, MemorySize memorySize) {
        this.keyType = rowType;
        this.rowType = rowType2;
        this.maxMemory = j;
        this.pageSize = i;
        this.arity = rowType2.getFieldCount();
        this.spillSortMaxNumFiles = i2;
        this.spillCompression = compressOptions;
        this.sinkParallelism = i3;
        this.maxDiskSize = memorySize;
    }

    public void open() throws Exception {
        super.open();
        initBuffer();
        if (this.sinkParallelism != getRuntimeContext().getNumberOfParallelSubtasks()) {
            throw new IllegalArgumentException("Please ensure that the runtime parallelism of the sink matches the initial configuration to avoid potential issues with skewed range partitioning.");
        }
    }

    @VisibleForTesting
    void initBuffer() {
        this.ioManager = IOManager.create(getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.buffer = BinaryExternalSortBuffer.create(this.ioManager, this.rowType, IntStream.range(0, this.keyType.getFieldCount()).toArray(), this.maxMemory, this.pageSize, this.spillSortMaxNumFiles, this.spillCompression, this.maxDiskSize);
    }

    public void endInput() throws Exception {
        if (this.buffer.size() <= 0) {
            return;
        }
        MutableObjectIterator<BinaryRow> sortedIterator = this.buffer.sortedIterator();
        BinaryRow binaryRow = new BinaryRow(this.arity);
        while (true) {
            BinaryRow next = sortedIterator.next(binaryRow);
            binaryRow = next;
            if (next == null) {
                return;
            } else {
                this.output.collect(new StreamRecord(binaryRow));
            }
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.buffer != null) {
            this.buffer.clear();
        }
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        this.buffer.write((InternalRow) streamRecord.getValue());
    }

    @VisibleForTesting
    BinaryExternalSortBuffer getBuffer() {
        return this.buffer;
    }
}
