package org.apache.paimon.flink.compact.changelog;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.class */
public class ChangelogCompactTask implements Serializable {
    private final long checkpointId;
    private final BinaryRow partition;
    private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
    private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactTask$OutputStream.class */
    public static class OutputStream {
        private Path path;
        private PositionOutputStream out;
        private boolean isInitialized;

        private OutputStream() {
            this.isInitialized = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void init(Path path, PositionOutputStream positionOutputStream) {
            this.path = path;
            this.out = positionOutputStream;
            this.isInitialized = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/compact/changelog/ChangelogCompactTask$Result.class */
    public static class Result {
        private final int bucket;
        private final boolean isCompactResult;
        private final DataFileMeta meta;
        private final long offset;
        private final long length;

        private Result(int i, boolean z, DataFileMeta dataFileMeta, long j, long j2) {
            this.bucket = i;
            this.isCompactResult = z;
            this.meta = dataFileMeta;
            this.offset = j;
            this.length = j2;
        }
    }

    public ChangelogCompactTask(long j, BinaryRow binaryRow, Map<Integer, List<DataFileMeta>> map, Map<Integer, List<DataFileMeta>> map2) {
        this.checkpointId = j;
        this.partition = binaryRow;
        this.newFileChangelogFiles = map;
        this.compactChangelogFiles = map2;
    }

    public long checkpointId() {
        return this.checkpointId;
    }

    public BinaryRow partition() {
        return this.partition;
    }

    public Map<Integer, List<DataFileMeta>> newFileChangelogFiles() {
        return this.newFileChangelogFiles;
    }

    public Map<Integer, List<DataFileMeta>> compactChangelogFiles() {
        return this.compactChangelogFiles;
    }

    public List<Committable> doCompact(FileStoreTable fileStoreTable) throws Exception {
        FileStorePathFactory pathFactory = fileStoreTable.store().pathFactory();
        OutputStream outputStream = new OutputStream();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, List<DataFileMeta>> entry : this.newFileChangelogFiles.entrySet()) {
            int intValue = entry.getKey().intValue();
            DataFilePathFactory createDataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, intValue);
            for (DataFileMeta dataFileMeta : entry.getValue()) {
                copyFile(outputStream, arrayList, fileStoreTable, createDataFilePathFactory.toPath(dataFileMeta), intValue, false, dataFileMeta);
            }
        }
        for (Map.Entry<Integer, List<DataFileMeta>> entry2 : this.compactChangelogFiles.entrySet()) {
            Integer key = entry2.getKey();
            DataFilePathFactory createDataFilePathFactory2 = pathFactory.createDataFilePathFactory(this.partition, key.intValue());
            for (DataFileMeta dataFileMeta2 : entry2.getValue()) {
                copyFile(outputStream, arrayList, fileStoreTable, createDataFilePathFactory2.toPath(dataFileMeta2), key.intValue(), true, dataFileMeta2);
            }
        }
        outputStream.out.close();
        return produceNewCommittables(arrayList, fileStoreTable, pathFactory, outputStream.path);
    }

    private void copyFile(OutputStream outputStream, List<Result> list, FileStoreTable fileStoreTable, Path path, int i, boolean z, DataFileMeta dataFileMeta) throws Exception {
        if (!outputStream.isInitialized) {
            Path path2 = new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
            outputStream.init(path2, fileStoreTable.fileIO().newOutputStream(path2, false));
        }
        long pos = outputStream.out.getPos();
        SeekableInputStream newInputStream = fileStoreTable.fileIO().newInputStream(path);
        Throwable th = null;
        try {
            try {
                IOUtils.copyBytes(newInputStream, outputStream.out, 4096, false);
                if (newInputStream != null) {
                    if (0 != 0) {
                        try {
                            newInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newInputStream.close();
                    }
                }
                fileStoreTable.fileIO().deleteQuietly(path);
                list.add(new Result(i, z, dataFileMeta, pos, outputStream.out.getPos() - pos));
            } finally {
            }
        } catch (Throwable th3) {
            if (newInputStream != null) {
                if (th != null) {
                    try {
                        newInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newInputStream.close();
                }
            }
            throw th3;
        }
    }

    private List<Committable> produceNewCommittables(List<Result> list, FileStoreTable fileStoreTable, FileStorePathFactory fileStorePathFactory, Path path) throws IOException {
        Result result = list.get(0);
        Preconditions.checkArgument(result.offset == 0);
        DataFilePathFactory createDataFilePathFactory = fileStorePathFactory.createDataFilePathFactory(this.partition, result.bucket);
        String str = "compacted-changelog-" + UUID.randomUUID() + Catalog.SYSTEM_TABLE_SPLITTER + result.bucket + "-" + result.length;
        fileStoreTable.fileIO().rename(path, createDataFilePathFactory.toAlignedPath(str + Path.CUR_DIR + CompactedChangelogReadOnlyFormat.getIdentifier(result.meta.fileFormat()), result.meta));
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (Result result2 : list) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(result2.bucket), num -> {
                return new ArrayList();
            })).add(result2);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            for (Result result3 : (List) entry.getValue()) {
                String str2 = (result3.offset == 0 ? str : str + "-" + result3.offset + "-" + result3.length) + Path.CUR_DIR + CompactedChangelogReadOnlyFormat.getIdentifier(result3.meta.fileFormat());
                if (result3.isCompactResult) {
                    arrayList3.add(result3.meta.rename(str2));
                } else {
                    arrayList2.add(result3.meta.rename(str2));
                }
            }
            arrayList.add(new Committable(this.checkpointId, Committable.Kind.FILE, new CommitMessageImpl(this.partition, ((Integer) entry.getKey()).intValue(), new DataIncrement(Collections.emptyList(), Collections.emptyList(), arrayList2), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), arrayList3))));
        }
        return arrayList;
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(this.checkpointId), this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        ChangelogCompactTask changelogCompactTask = (ChangelogCompactTask) obj;
        return this.checkpointId == changelogCompactTask.checkpointId && Objects.equals(this.partition, changelogCompactTask.partition) && Objects.equals(this.newFileChangelogFiles, changelogCompactTask.newFileChangelogFiles) && Objects.equals(this.compactChangelogFiles, changelogCompactTask.compactChangelogFiles);
    }

    public String toString() {
        return String.format("ChangelogCompactionTask {partition = %s, newFileChangelogFiles = %s, compactChangelogFiles = %s}", this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }
}
