package org.apache.paimon.flink.orphan;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.BoundedTwoInputOperator;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;

/* loaded from: input_file:org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.class */
public class FlinkOrphanFilesClean extends OrphanFilesClean {

    @Nullable
    protected final Integer parallelism;

    public FlinkOrphanFilesClean(FileStoreTable fileStoreTable, long j, SerializableConsumer<Path> serializableConsumer, @Nullable Integer num) {
        super(fileStoreTable, j, serializableConsumer);
        this.parallelism = num;
    }

    @Nullable
    public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironment streamExecutionEnvironment) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        configuration.set(ExecutionOptions.SORT_INPUTS, false);
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, false);
        if (this.parallelism != null) {
            configuration.set(CoreOptions.DEFAULT_PARALLELISM, this.parallelism);
        }
        configuration.setString("execution.batch.adaptive.auto-parallelism.enabled", "false");
        streamExecutionEnvironment.configure(configuration);
        List<String> validBranches = validBranches();
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        Consumer<Path> consumer = path -> {
            atomicLong.incrementAndGet();
        };
        atomicLong2.getClass();
        cleanSnapshotDir(validBranches, consumer, (v1) -> {
            r3.addAndGet(v1);
        });
        final OutputTag<Tuple2<String, String>> outputTag = new OutputTag<Tuple2<String, String>>("manifest-output") { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.1
        };
        DataStream process = streamExecutionEnvironment.fromCollection(validBranches).process(new ProcessFunction<String, Tuple2<String, String>>() { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.3
            public void processElement(String str, ProcessFunction<String, Tuple2<String, String>>.Context context, Collector<Tuple2<String, String>> collector) throws Exception {
                Iterator it = FlinkOrphanFilesClean.this.safelyGetAllSnapshots(str).iterator();
                while (it.hasNext()) {
                    collector.collect(new Tuple2(str, ((Snapshot) it.next()).toJson()));
                }
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((String) obj, (ProcessFunction<String, Tuple2<String, String>>.Context) context, (Collector<Tuple2<String, String>>) collector);
            }
        }).rebalance().process(new ProcessFunction<Tuple2<String, String>, String>() { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.2
            public void processElement(Tuple2<String, String> tuple2, ProcessFunction<Tuple2<String, String>, String>.Context context, Collector<String> collector) throws Exception {
                String str = (String) tuple2.f0;
                Snapshot fromJson = Snapshot.fromJson((String) tuple2.f1);
                OutputTag outputTag2 = outputTag;
                Consumer consumer2 = str2 -> {
                    context.output(outputTag2, new Tuple2(str, str2));
                };
                FlinkOrphanFilesClean flinkOrphanFilesClean = FlinkOrphanFilesClean.this;
                collector.getClass();
                flinkOrphanFilesClean.collectWithoutDataFile(str, fromJson, (v1) -> {
                    r3.collect(v1);
                }, consumer2);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<String, String>) obj, (ProcessFunction<Tuple2<String, String>, String>.Context) context, (Collector<String>) collector);
            }
        });
        DataStream<CleanOrphanFilesResult> transform = process.getSideOutput(outputTag).keyBy(tuple2 -> {
            return ((String) tuple2.f0) + ":" + ((String) tuple2.f1);
        }).transform("datafile-reader", BasicTypeInfo.STRING_TYPE_INFO, new BoundedOneInputOperator<Tuple2<String, String>, String>() { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.4
            private final Set<Tuple2<String, String>> manifests = new HashSet();

            /* JADX WARN: Multi-variable type inference failed */
            public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) {
                this.manifests.add(streamRecord.getValue());
            }

            public void endInput() throws IOException {
                HashMap hashMap = new HashMap();
                for (Tuple2<String, String> tuple22 : this.manifests) {
                    ManifestFile manifestFile = (ManifestFile) hashMap.computeIfAbsent(tuple22.f0, str -> {
                        return FlinkOrphanFilesClean.this.table.switchToBranch(str).store().manifestFileFactory().create();
                    });
                    ((List) FlinkOrphanFilesClean.retryReadingFiles(() -> {
                        return manifestFile.readWithIOException((String) tuple22.f1);
                    }, Collections.emptyList())).forEach(manifestEntry -> {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(manifestEntry.fileName());
                        arrayList.addAll(manifestEntry.file().extraFiles());
                        arrayList.forEach(str2 -> {
                            this.output.collect(new StreamRecord(str2));
                        });
                    });
                }
            }
        }).union(new DataStream[]{process}).keyBy(str -> {
            return str;
        }).connect(streamExecutionEnvironment.fromCollection((List) listPaimonFileDirs().stream().map((v0) -> {
            return v0.toUri();
        }).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).process(new ProcessFunction<String, Pair<String, Long>>() { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.5
            public void processElement(String str2, ProcessFunction<String, Pair<String, Long>>.Context context, Collector<Pair<String, Long>> collector) {
                for (FileStatus fileStatus : FlinkOrphanFilesClean.this.tryBestListingDirs(new Path(str2))) {
                    if (FlinkOrphanFilesClean.this.oldEnough(fileStatus)) {
                        collector.collect(Pair.of(fileStatus.getPath().toUri().toString(), Long.valueOf(fileStatus.getLen())));
                    }
                }
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((String) obj, (ProcessFunction<String, Pair<String, Long>>.Context) context, (Collector<Pair<String, Long>>) collector);
            }
        }).keyBy(pair -> {
            return new Path((String) pair.getKey()).getName();
        })).transform("files_join", TypeInformation.of(CleanOrphanFilesResult.class), new BoundedTwoInputOperator<String, Pair<String, Long>, CleanOrphanFilesResult>() { // from class: org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.6
            private boolean buildEnd;
            private long emittedFilesCount;
            private long emittedFilesLen;
            private final Set<String> used = new HashSet();

            public InputSelection nextSelection() {
                return this.buildEnd ? InputSelection.SECOND : InputSelection.FIRST;
            }

            public void endInput(int i) {
                switch (i) {
                    case 1:
                        Preconditions.checkState(!this.buildEnd, "Should not build ended.");
                        LOG.info("Finish build phase.");
                        this.buildEnd = true;
                        return;
                    case 2:
                        Preconditions.checkState(this.buildEnd, "Should build ended.");
                        LOG.info("Finish probe phase.");
                        LOG.info("Clean files count : {}", Long.valueOf(this.emittedFilesCount));
                        LOG.info("Clean files size : {}", Long.valueOf(this.emittedFilesLen));
                        this.output.collect(new StreamRecord(new CleanOrphanFilesResult(this.emittedFilesCount, this.emittedFilesLen)));
                        return;
                    default:
                        return;
                }
            }

            /* JADX WARN: Multi-variable type inference failed */
            public void processElement1(StreamRecord<String> streamRecord) {
                this.used.add(streamRecord.getValue());
            }

            public void processElement2(StreamRecord<Pair<String, Long>> streamRecord) {
                Preconditions.checkState(this.buildEnd, "Should build ended.");
                Pair pair2 = (Pair) streamRecord.getValue();
                Path path2 = new Path((String) pair2.getLeft());
                if (this.used.contains(path2.getName())) {
                    return;
                }
                this.emittedFilesCount++;
                this.emittedFilesLen += ((Long) pair2.getRight()).longValue();
                FlinkOrphanFilesClean.this.fileCleaner.accept(path2);
                LOG.info("Dry clean: {}", path2);
            }
        });
        if (atomicLong.get() != 0 || atomicLong2.get() != 0) {
            transform = transform.union(new DataStream[]{streamExecutionEnvironment.fromElements(new CleanOrphanFilesResult[]{new CleanOrphanFilesResult(atomicLong.get(), atomicLong2.get())})});
        }
        return transform;
    }

    public static CleanOrphanFilesResult executeDatabaseOrphanFiles(StreamExecutionEnvironment streamExecutionEnvironment, Catalog catalog, long j, SerializableConsumer<Path> serializableConsumer, @Nullable Integer num, String str, @Nullable String str2) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
        List<String> singletonList = Collections.singletonList(str2);
        if (str2 == null || "*".equals(str2)) {
            singletonList = catalog.listTables(str);
        }
        ArrayList<DataStream> arrayList = new ArrayList(singletonList.size());
        Iterator<String> it = singletonList.iterator();
        while (it.hasNext()) {
            Table table = catalog.getTable(new Identifier(str, it.next()));
            org.apache.paimon.utils.Preconditions.checkArgument(table instanceof FileStoreTable, "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", table.getClass().getName());
            DataStream<CleanOrphanFilesResult> doOrphanClean = new FlinkOrphanFilesClean((FileStoreTable) table, j, serializableConsumer, num).doOrphanClean(streamExecutionEnvironment);
            if (doOrphanClean != null) {
                arrayList.add(doOrphanClean);
            }
        }
        DataStream dataStream = null;
        for (DataStream dataStream2 : arrayList) {
            dataStream = dataStream == null ? dataStream2 : dataStream.union(new DataStream[]{dataStream2});
        }
        return sum(dataStream);
    }

    private static CleanOrphanFilesResult sum(DataStream<CleanOrphanFilesResult> dataStream) {
        long j = 0;
        long j2 = 0;
        if (dataStream != null) {
            try {
                CloseableIterator executeAndCollect = dataStream.global().executeAndCollect("OrphanFilesClean");
                while (executeAndCollect.hasNext()) {
                    CleanOrphanFilesResult cleanOrphanFilesResult = (CleanOrphanFilesResult) executeAndCollect.next();
                    j += cleanOrphanFilesResult.getDeletedFileCount();
                    j2 += cleanOrphanFilesResult.getDeletedFileTotalLenInBytes();
                }
                executeAndCollect.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return new CleanOrphanFilesResult(j, j2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1288484411:
                if (implMethodName.equals("lambda$doOrphanClean$53282e5a$1")) {
                    z = false;
                    break;
                }
                break;
            case 1893168109:
                if (implMethodName.equals("lambda$doOrphanClean$cf9c109b$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1893168110:
                if (implMethodName.equals("lambda$doOrphanClean$cf9c109b$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/orphan/FlinkOrphanFilesClean") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/String;")) {
                    return tuple2 -> {
                        return ((String) tuple2.f0) + ":" + ((String) tuple2.f1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/orphan/FlinkOrphanFilesClean") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/utils/Pair;)Ljava/lang/String;")) {
                    return pair -> {
                        return new Path((String) pair.getKey()).getName();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/orphan/FlinkOrphanFilesClean") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
