package org.apache.paimon.flink.clone;

import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/clone/CopyFileOperator.class */
public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo> implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyFileOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private Catalog sourceCatalog;
    private Catalog targetCatalog;

    public CopyFileOperator(Map<String, String> map, Map<String, String> map2) {
        this.sourceCatalogConfig = map;
        this.targetCatalogConfig = map2;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exception {
        CloneFileInfo cloneFileInfo = (CloneFileInfo) streamRecord.getValue();
        FileIO fileIO = this.sourceCatalog.fileIO();
        FileIO fileIO2 = this.targetCatalog.fileIO();
        Path tableLocation = this.sourceCatalog.getTableLocation(Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
        Path tableLocation2 = this.targetCatalog.getTableLocation(Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
        String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
        Path path = new Path(tableLocation + filePathExcludeTableRoot);
        Path path2 = new Path(tableLocation2 + filePathExcludeTableRoot);
        if (fileIO2.exists(path2) && fileIO2.getFileSize(path2) == fileIO.getFileSize(path)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping clone target file {} because it already exists and has the same size.", path2);
            }
            this.output.collect(streamRecord);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Begin copy file from {} to {}.", path, path2);
            }
            IOUtils.copyBytes(fileIO.newInputStream(path), fileIO2.newOutputStream(path2, true));
            if (LOG.isDebugEnabled()) {
                LOG.debug("End copy file from {} to {}.", path, path2);
            }
            this.output.collect(streamRecord);
        }
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}
