package org.apache.paimon.flink.sink;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
import org.apache.paimon.mergetree.localmerge.LocalMerger;
import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.UserDefinedSeqComparator;

/* loaded from: input_file:org/apache/paimon/flink/sink/LocalMergeOperator.class */
public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final TableSchema schema;
    private final boolean ignoreDelete;
    private transient Projection keyProjection;
    private transient RowKindGenerator rowKindGenerator;
    private transient LocalMerger merger;
    private transient long currentWatermark;
    private transient boolean endOfInput;

    /* loaded from: input_file:org/apache/paimon/flink/sink/LocalMergeOperator$Factory.class */
    public static class Factory extends AbstractStreamOperatorFactory<InternalRow> implements OneInputStreamOperatorFactory<InternalRow, InternalRow> {
        private final TableSchema schema;

        public Factory(TableSchema tableSchema) {
            this.chainingStrategy = ChainingStrategy.ALWAYS;
            this.schema = tableSchema;
        }

        public <T extends StreamOperator<InternalRow>> T createStreamOperator(StreamOperatorParameters<InternalRow> streamOperatorParameters) {
            return new LocalMergeOperator(streamOperatorParameters, this.schema);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return LocalMergeOperator.class;
        }
    }

    private LocalMergeOperator(StreamOperatorParameters<InternalRow> streamOperatorParameters, TableSchema tableSchema) {
        Preconditions.checkArgument(tableSchema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys");
        this.schema = tableSchema;
        this.ignoreDelete = CoreOptions.fromMap(tableSchema.options()).ignoreDelete();
        setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
    }

    public void open() throws Exception {
        super.open();
        List<String> primaryKeys = this.schema.primaryKeys();
        RowType logicalRowType = this.schema.logicalRowType();
        CoreOptions coreOptions = new CoreOptions(this.schema.options());
        this.keyProjection = CodeGenUtils.newProjection(logicalRowType, this.schema.projection(primaryKeys));
        this.rowKindGenerator = RowKindGenerator.create(this.schema, coreOptions);
        MergeFunction<KeyValue> create = PrimaryKeyTableUtils.createMergeFunctionFactory(this.schema, new KeyValueFieldsExtractor() { // from class: org.apache.paimon.flink.sink.LocalMergeOperator.1
            private static final long serialVersionUID = 1;

            @Override // org.apache.paimon.schema.KeyValueFieldsExtractor
            public List<DataField> keyFields(TableSchema tableSchema) {
                return PrimaryKeyTableUtils.addKeyNamePrefix(tableSchema.primaryKeysFields());
            }

            @Override // org.apache.paimon.schema.KeyValueFieldsExtractor
            public List<DataField> valueFields(TableSchema tableSchema) {
                return tableSchema.fields();
            }
        }).create();
        boolean z = true;
        Iterator<DataField> it = logicalRowType.getFields().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            DataField next = it.next();
            if (!primaryKeys.contains(next.name()) && !BinaryRow.isInFixedLengthPart(next.type())) {
                z = false;
                break;
            }
        }
        HeapMemorySegmentPool heapMemorySegmentPool = new HeapMemorySegmentPool(coreOptions.localMergeBufferSize(), coreOptions.pageSize());
        UserDefinedSeqComparator create2 = UserDefinedSeqComparator.create(logicalRowType, coreOptions);
        if (z) {
            this.merger = new HashMapLocalMerger(logicalRowType, primaryKeys, heapMemorySegmentPool, create, create2);
        } else {
            RowType addKeyNamePrefix = PrimaryKeyTableUtils.addKeyNamePrefix(this.schema.logicalPrimaryKeysType());
            this.merger = new SortBufferLocalMerger(new SortBufferWriteBuffer(addKeyNamePrefix, logicalRowType, create2, heapMemorySegmentPool, false, MemorySize.MAX_VALUE, coreOptions.localSortMaxNumFileHandles(), coreOptions.spillCompressOptions(), null), new KeyComparatorSupplier(addKeyNamePrefix).get(), create);
        }
        this.currentWatermark = Long.MIN_VALUE;
        this.endOfInput = false;
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        InternalRow internalRow = (InternalRow) streamRecord.getValue();
        RowKind rowKind = RowKindGenerator.getRowKind(this.rowKindGenerator, internalRow);
        if (this.ignoreDelete && rowKind.isRetract()) {
            return;
        }
        internalRow.setRowKind(RowKind.INSERT);
        BinaryRow apply = this.keyProjection.apply(internalRow);
        if (this.merger.put(rowKind, apply, internalRow)) {
            return;
        }
        flushBuffer();
        if (this.merger.put(rowKind, apply, internalRow)) {
            return;
        }
        internalRow.setRowKind(rowKind);
        this.output.collect(streamRecord);
    }

    public void processWatermark(Watermark watermark) {
        this.currentWatermark = watermark.getTimestamp();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.endOfInput) {
            return;
        }
        flushBuffer();
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        flushBuffer();
    }

    public void close() throws Exception {
        if (this.merger != null) {
            this.merger.clear();
        }
        super.close();
    }

    private void flushBuffer() throws Exception {
        if (this.merger.size() == 0) {
            return;
        }
        this.merger.forEach(internalRow -> {
            this.output.collect(new StreamRecord(internalRow));
        });
        this.merger.clear();
        if (this.currentWatermark != Long.MIN_VALUE) {
            super.processWatermark(new Watermark(this.currentWatermark));
            this.currentWatermark = Long.MIN_VALUE;
        }
    }

    @VisibleForTesting
    LocalMerger merger() {
        return this.merger;
    }

    @VisibleForTesting
    void setOutput(Output<StreamRecord<InternalRow>> output) {
        this.output = output;
    }
}
