package org.apache.paimon.flink.sink.partition;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

/* loaded from: input_file:org/apache/paimon/flink/sink/partition/ReportPartStatsListener.class */
public class ReportPartStatsListener implements PartitionListener {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_REPORT_STATE_DESC = new ListStateDescriptor<>("pending-report-hms-partition", new MapSerializer(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final InternalRowPartitionComputer partitionComputer;
    private final PartitionStatisticsReporter partitionStatisticsReporter;
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions = new HashMap();
    private final long idleTime;

    private ReportPartStatsListener(InternalRowPartitionComputer internalRowPartitionComputer, PartitionStatisticsReporter partitionStatisticsReporter, OperatorStateStore operatorStateStore, boolean z, long j) throws Exception {
        this.partitionComputer = internalRowPartitionComputer;
        this.partitionStatisticsReporter = partitionStatisticsReporter;
        this.pendingPartitionsState = operatorStateStore.getListState(PENDING_REPORT_STATE_DESC);
        if (z) {
            Iterator it = ((Iterable) this.pendingPartitionsState.get()).iterator();
            if (it.hasNext()) {
                this.pendingPartitions.putAll((Map) it.next());
            }
        }
        this.idleTime = j;
    }

    @Override // org.apache.paimon.flink.sink.partition.PartitionListener
    public void notifyCommittable(List<ManifestCommittable> list) {
        HashSet hashSet = new HashSet();
        boolean z = false;
        for (ManifestCommittable manifestCommittable : list) {
            Iterator<CommitMessage> it = manifestCommittable.fileCommittables().iterator();
            while (it.hasNext()) {
                CommitMessageImpl commitMessageImpl = (CommitMessageImpl) it.next();
                if (!commitMessageImpl.newFilesIncrement().isEmpty() || !commitMessageImpl.compactIncrement().isEmpty()) {
                    hashSet.add(PartitionPathUtils.generatePartitionPath(this.partitionComputer.generatePartValues(commitMessageImpl.partition())));
                }
            }
            if (manifestCommittable.identifier() == Long.MAX_VALUE) {
                z = true;
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        hashSet.forEach(str -> {
            this.pendingPartitions.put(str, Long.valueOf(currentTimeMillis));
        });
        try {
            for (Map.Entry<String, Long> entry : reportPartition(z).entrySet()) {
                this.partitionStatisticsReporter.report(entry.getKey(), entry.getValue().longValue());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Map<String, Long> reportPartition(boolean z) {
        if (z) {
            return this.pendingPartitions;
        }
        Iterator<Map.Entry<String, Long>> it = this.pendingPartitions.entrySet().iterator();
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis();
        while (it.hasNext()) {
            Map.Entry<String, Long> next = it.next();
            if (currentTimeMillis - next.getValue().longValue() > this.idleTime) {
                hashMap.put(next.getKey(), next.getValue());
                it.remove();
            }
        }
        return hashMap;
    }

    @Override // org.apache.paimon.flink.sink.partition.PartitionListener
    public void snapshotState() throws Exception {
        this.pendingPartitionsState.update(Collections.singletonList(this.pendingPartitions));
    }

    public static Optional<ReportPartStatsListener> create(boolean z, OperatorStateStore operatorStateStore, FileStoreTable fileStoreTable) throws Exception {
        CoreOptions coreOptions = fileStoreTable.coreOptions();
        Options configuration = coreOptions.toConfiguration();
        if (((Duration) configuration.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)).toMillis() > 0 && !fileStoreTable.partitionKeys().isEmpty() && coreOptions.partitionedTableInMetastore() && fileStoreTable.catalogEnvironment().metastoreClientFactory() != null) {
            return Optional.of(new ReportPartStatsListener(new InternalRowPartitionComputer(coreOptions.partitionDefaultName(), fileStoreTable.schema().logicalPartitionType(), (String[]) fileStoreTable.partitionKeys().toArray(new String[0]), coreOptions.legacyPartitionName()), new PartitionStatisticsReporter(fileStoreTable, fileStoreTable.catalogEnvironment().metastoreClientFactory().create()), operatorStateStore, z, ((Duration) configuration.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)).toMillis()));
        }
        return Optional.empty();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.partitionStatisticsReporter != null) {
            this.partitionStatisticsReporter.close();
        }
    }
}
