package org.apache.paimon.flink.compact.changelog;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

/* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.class */
public class ChangelogCompactCoordinateOperator extends AbstractStreamOperator<Either<Committable, ChangelogCompactTask>> implements OneInputStreamOperator<Committable, Either<Committable, ChangelogCompactTask>>, BoundedOneInput {
    private final FileStoreTable table;
    private transient long checkpointId;
    private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator$PartitionChangelog.class */
    public static class PartitionChangelog {
        private long totalFileSize = 0;
        private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles = new HashMap();
        private final Map<Integer, List<DataFileMeta>> compactChangelogFiles = new HashMap();

        public void addNewChangelogFile(Integer num, DataFileMeta dataFileMeta) {
            this.totalFileSize += dataFileMeta.fileSize();
            this.newFileChangelogFiles.computeIfAbsent(num, num2 -> {
                return new ArrayList();
            }).add(dataFileMeta);
        }

        public void addCompactChangelogFile(Integer num, DataFileMeta dataFileMeta) {
            this.totalFileSize += dataFileMeta.fileSize();
            this.compactChangelogFiles.computeIfAbsent(num, num2 -> {
                return new ArrayList();
            }).add(dataFileMeta);
        }
    }

    public ChangelogCompactCoordinateOperator(FileStoreTable fileStoreTable) {
        this.table = fileStoreTable;
    }

    public void open() throws Exception {
        super.open();
        this.checkpointId = Long.MIN_VALUE;
        this.partitionChangelogs = new HashMap();
    }

    public void processElement(StreamRecord<Committable> streamRecord) {
        Committable committable = (Committable) streamRecord.getValue();
        this.checkpointId = Math.max(this.checkpointId, committable.checkpointId());
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect(new StreamRecord(Either.Left(streamRecord.getValue())));
            return;
        }
        CommitMessageImpl commitMessageImpl = (CommitMessageImpl) committable.wrappedCommittable();
        if (commitMessageImpl.newFilesIncrement().changelogFiles().isEmpty() && commitMessageImpl.compactIncrement().changelogFiles().isEmpty()) {
            this.output.collect(new StreamRecord(Either.Left(streamRecord.getValue())));
            return;
        }
        BinaryRow partition = commitMessageImpl.partition();
        Integer valueOf = Integer.valueOf(commitMessageImpl.bucket());
        long targetFileSize = this.table.coreOptions().targetFileSize(false);
        Iterator<DataFileMeta> it = commitMessageImpl.newFilesIncrement().changelogFiles().iterator();
        while (it.hasNext()) {
            this.partitionChangelogs.computeIfAbsent(partition, binaryRow -> {
                return new PartitionChangelog();
            }).addNewChangelogFile(valueOf, it.next());
            if (this.partitionChangelogs.get(partition).totalFileSize >= targetFileSize) {
                emitPartitionChanglogCompactTask(partition);
            }
        }
        Iterator<DataFileMeta> it2 = commitMessageImpl.compactIncrement().changelogFiles().iterator();
        while (it2.hasNext()) {
            this.partitionChangelogs.computeIfAbsent(partition, binaryRow2 -> {
                return new PartitionChangelog();
            }).addCompactChangelogFile(valueOf, it2.next());
            if (this.partitionChangelogs.get(partition).totalFileSize >= targetFileSize) {
                emitPartitionChanglogCompactTask(partition);
            }
        }
        this.output.collect(new StreamRecord(Either.Left(new Committable(committable.checkpointId(), Committable.Kind.FILE, new CommitMessageImpl(commitMessageImpl.partition(), commitMessageImpl.bucket(), new DataIncrement(commitMessageImpl.newFilesIncrement().newFiles(), commitMessageImpl.newFilesIncrement().deletedFiles(), Collections.emptyList()), new CompactIncrement(commitMessageImpl.compactIncrement().compactBefore(), commitMessageImpl.compactIncrement().compactAfter(), Collections.emptyList()), commitMessageImpl.indexIncrement())))));
    }

    public void prepareSnapshotPreBarrier(long j) {
        emitAllPartitionsChanglogCompactTask();
    }

    public void endInput() {
        emitAllPartitionsChanglogCompactTask();
    }

    private void emitPartitionChanglogCompactTask(BinaryRow binaryRow) {
        PartitionChangelog partitionChangelog = this.partitionChangelogs.get(binaryRow);
        this.output.collect(new StreamRecord(Either.Right(new ChangelogCompactTask(this.checkpointId, binaryRow, partitionChangelog.newFileChangelogFiles, partitionChangelog.compactChangelogFiles))));
        this.partitionChangelogs.remove(binaryRow);
    }

    private void emitAllPartitionsChanglogCompactTask() {
        Iterator it = new ArrayList(this.partitionChangelogs.keySet()).iterator();
        while (it.hasNext()) {
            emitPartitionChanglogCompactTask((BinaryRow) it.next());
        }
    }
}
