package org.apache.paimon.flink.sink.partition;

import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

/* loaded from: input_file:org/apache/paimon/flink/sink/partition/PartitionMarkDone.class */
public class PartitionMarkDone implements PartitionListener {
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionMarkDoneTrigger trigger;
    private final List<PartitionMarkDoneAction> actions;
    private final boolean waitCompaction;

    public static Optional<PartitionMarkDone> create(boolean z, boolean z2, OperatorStateStore operatorStateStore, FileStoreTable fileStoreTable) throws Exception {
        CoreOptions coreOptions = fileStoreTable.coreOptions();
        if (disablePartitionMarkDone(z, fileStoreTable, coreOptions.toConfiguration())) {
            return Optional.empty();
        }
        return Optional.of(new PartitionMarkDone(new InternalRowPartitionComputer(coreOptions.partitionDefaultName(), fileStoreTable.schema().logicalPartitionType(), (String[]) fileStoreTable.partitionKeys().toArray(new String[0]), coreOptions.legacyPartitionName()), PartitionMarkDoneTrigger.create(coreOptions, z2, operatorStateStore), PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions), !fileStoreTable.primaryKeys().isEmpty() && (coreOptions.deletionVectorsEnabled() || coreOptions.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW)));
    }

    private static boolean disablePartitionMarkDone(boolean z, FileStoreTable fileStoreTable, Options options) {
        boolean booleanValue = ((Boolean) options.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)).booleanValue();
        if (!z && !booleanValue) {
            return true;
        }
        Duration duration = (Duration) options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE);
        if (z && duration == null) {
            return true;
        }
        return fileStoreTable.partitionKeys().isEmpty();
    }

    public PartitionMarkDone(InternalRowPartitionComputer internalRowPartitionComputer, PartitionMarkDoneTrigger partitionMarkDoneTrigger, List<PartitionMarkDoneAction> list, boolean z) {
        this.partitionComputer = internalRowPartitionComputer;
        this.trigger = partitionMarkDoneTrigger;
        this.actions = list;
        this.waitCompaction = z;
    }

    @Override // org.apache.paimon.flink.sink.partition.PartitionListener
    public void notifyCommittable(List<ManifestCommittable> list) {
        HashSet hashSet = new HashSet();
        boolean z = false;
        for (ManifestCommittable manifestCommittable : list) {
            Iterator<CommitMessage> it = manifestCommittable.fileCommittables().iterator();
            while (it.hasNext()) {
                CommitMessageImpl commitMessageImpl = (CommitMessageImpl) it.next();
                if (this.waitCompaction || !commitMessageImpl.indexIncrement().isEmpty() || !commitMessageImpl.newFilesIncrement().isEmpty()) {
                    hashSet.add(commitMessageImpl.partition());
                }
            }
            if (manifestCommittable.identifier() == Long.MAX_VALUE) {
                z = true;
            }
        }
        Stream stream = hashSet.stream();
        InternalRowPartitionComputer internalRowPartitionComputer = this.partitionComputer;
        internalRowPartitionComputer.getClass();
        Stream map = stream.map((v1) -> {
            return r1.generatePartValues(v1);
        }).map(PartitionPathUtils::generatePartitionPath);
        PartitionMarkDoneTrigger partitionMarkDoneTrigger = this.trigger;
        partitionMarkDoneTrigger.getClass();
        map.forEach(partitionMarkDoneTrigger::notifyPartition);
        markDone(this.trigger.donePartitions(z), this.actions);
    }

    public static void markDone(List<String> list, List<PartitionMarkDoneAction> list2) {
        for (String str : list) {
            try {
                Iterator<PartitionMarkDoneAction> it = list2.iterator();
                while (it.hasNext()) {
                    it.next().markDone(str);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.paimon.flink.sink.partition.PartitionListener
    public void snapshotState() throws Exception {
        this.trigger.snapshotState();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeAllQuietly(this.actions);
    }
}
