package org.apache.paimon.flink.clone;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

/* loaded from: input_file:org/apache/paimon/flink/clone/SnapshotHintOperator.class */
public class SnapshotHintOperator extends AbstractStreamOperator<CloneFileInfo> implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo>, BoundedOneInput {
    private final Map<String, String> targetCatalogConfig;
    private Catalog targetCatalog;
    private Set<String> identifiers;

    public SnapshotHintOperator(Map<String, String> map) {
        this.targetCatalogConfig = map;
    }

    public void open() throws Exception {
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
        this.identifiers = new HashSet();
    }

    public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exception {
        this.identifiers.add(((CloneFileInfo) streamRecord.getValue()).getTargetIdentifier());
    }

    public void endInput() throws Exception {
        Iterator<String> it = this.identifiers.iterator();
        while (it.hasNext()) {
            commitSnapshotHintInTargetTable(((FileStoreTable) this.targetCatalog.getTable(Identifier.fromString(it.next()))).snapshotManager());
        }
    }

    private void commitSnapshotHintInTargetTable(SnapshotManager snapshotManager) throws IOException {
        OptionalLong max = snapshotManager.safelyGetAllSnapshots().stream().mapToLong((v0) -> {
            return v0.id();
        }).max();
        if (max.isPresent()) {
            long asLong = max.getAsLong();
            snapshotManager.commitEarliestHint(asLong);
            snapshotManager.commitLatestHint(asLong);
            for (Snapshot snapshot : snapshotManager.safelyGetAllSnapshots()) {
                if (snapshot.id() != asLong) {
                    snapshotManager.deleteSnapshot(snapshot.id());
                }
            }
        }
    }

    public void close() throws Exception {
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}
