package org.apache.paimon.flink.clone;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/clone/PickFilesForCloneOperator.class */
public class PickFilesForCloneOperator extends AbstractStreamOperator<CloneFileInfo> implements OneInputStreamOperator<Tuple2<String, String>, CloneFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(PickFilesForCloneOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private Catalog sourceCatalog;
    private Catalog targetCatalog;

    public PickFilesForCloneOperator(Map<String, String> map, Map<String, String> map2) {
        this.sourceCatalogConfig = map;
        this.targetCatalogConfig = map2;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) throws Exception {
        String str = (String) ((Tuple2) streamRecord.getValue()).f0;
        Identifier fromString = Identifier.fromString(str);
        String str2 = (String) ((Tuple2) streamRecord.getValue()).f1;
        Identifier fromString2 = Identifier.fromString(str2);
        FileStoreTable fileStoreTable = (FileStoreTable) this.sourceCatalog.getTable(fromString);
        this.targetCatalog.createDatabase(fromString2.getDatabaseName(), true);
        this.targetCatalog.createTable(fromString2, Schema.fromTableSchema(fileStoreTable.schema()), true);
        List<CloneFileInfo> cloneFileInfos = toCloneFileInfos(PickFilesUtil.getUsedFilesForLatestSnapshot(fileStoreTable), fileStoreTable.location(), str, str2);
        if (LOG.isDebugEnabled()) {
            LOG.debug("The CloneFileInfo of table {} is {} : ", fileStoreTable.location(), cloneFileInfos);
        }
        Iterator<CloneFileInfo> it = cloneFileInfos.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(it.next()));
        }
    }

    private List<CloneFileInfo> toCloneFileInfos(List<Path> list, Path path, String str, String str2) {
        ArrayList arrayList = new ArrayList();
        Iterator<Path> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new CloneFileInfo(getPathExcludeTableRoot(it.next(), path).toString(), str, str2));
        }
        return arrayList;
    }

    private Path getPathExcludeTableRoot(Path path, Path path2) {
        String uri = path.toUri().toString();
        String path3 = path2.toString();
        Preconditions.checkState(uri.startsWith(path3), "File absolute path does not start with source table root path. This is unexpected. fileAbsolutePath is: " + uri + ", sourceTableRootPath is: " + path3);
        return new Path(uri.substring(path3.length()));
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}
