package org.apache.paimon.flink.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.metrics.MetricGroup;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/compact/UnawareBucketCompactor.class */
public class UnawareBucketCompactor {
    private static final Logger LOG = LoggerFactory.getLogger(UnawareBucketCompactor.class);
    private final FileStoreTable table;
    private final String commitUser;
    private final transient AppendOnlyFileStoreWrite write;
    protected final transient Queue<Future<CommitMessage>> result = new LinkedList();
    private final transient Supplier<ExecutorService> compactExecutorsupplier;

    @Nullable
    private final transient CompactionMetrics compactionMetrics;

    @Nullable
    private final transient CompactionMetrics.Reporter metricsReporter;

    public UnawareBucketCompactor(FileStoreTable fileStoreTable, String str, Supplier<ExecutorService> supplier, @Nullable MetricGroup metricGroup) {
        this.table = fileStoreTable;
        this.commitUser = str;
        this.write = (AppendOnlyFileStoreWrite) fileStoreTable.store().newWrite(str);
        this.compactExecutorsupplier = supplier;
        this.compactionMetrics = metricGroup == null ? null : new CompactionMetrics(new FlinkMetricRegistry(metricGroup), fileStoreTable.name());
        this.metricsReporter = this.compactionMetrics == null ? null : this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
    }

    public void processElement(UnawareAppendCompactionTask unawareAppendCompactionTask) throws Exception {
        this.result.add(this.compactExecutorsupplier.get().submit(() -> {
            MetricUtils.safeCall(this::startTimer, LOG);
            try {
                long currentTimeMillis = System.currentTimeMillis();
                CommitMessage doCompact = unawareAppendCompactionTask.doCompact(this.table, this.write);
                MetricUtils.safeCall(() -> {
                    if (this.metricsReporter != null) {
                        this.metricsReporter.reportCompactionTime(System.currentTimeMillis() - currentTimeMillis);
                        this.metricsReporter.increaseCompactionsCompletedCount();
                    }
                }, LOG);
                MetricUtils.safeCall(this::stopTimer, LOG);
                MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG);
                return doCompact;
            } catch (Throwable th) {
                MetricUtils.safeCall(this::stopTimer, LOG);
                MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG);
                throw th;
            }
        }));
        recordCompactionsQueuedRequest();
    }

    private void recordCompactionsQueuedRequest() {
        if (this.metricsReporter != null) {
            this.metricsReporter.increaseCompactionsQueuedCount();
        }
    }

    private void decreaseCompactionsQueuedCount() {
        if (this.metricsReporter != null) {
            this.metricsReporter.decreaseCompactionsQueuedCount();
        }
    }

    private void startTimer() {
        if (this.metricsReporter != null) {
            this.metricsReporter.getCompactTimer().start();
        }
    }

    private void stopTimer() {
        if (this.metricsReporter != null) {
            this.metricsReporter.getCompactTimer().finish();
        }
    }

    public void close() throws Exception {
        shutdown();
        if (this.metricsReporter != null) {
            CompactionMetrics.Reporter reporter = this.metricsReporter;
            reporter.getClass();
            MetricUtils.safeCall(reporter::unregister, LOG);
        }
        if (this.compactionMetrics != null) {
            CompactionMetrics compactionMetrics = this.compactionMetrics;
            compactionMetrics.getClass();
            MetricUtils.safeCall(compactionMetrics::close, LOG);
        }
    }

    @VisibleForTesting
    void shutdown() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Future<CommitMessage> future : this.result) {
            if (!future.isDone()) {
                break;
            } else {
                try {
                    arrayList.add(future.get());
                } catch (Exception e) {
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        TableCommitImpl newCommit = this.table.newCommit(this.commitUser);
        Throwable th = null;
        try {
            try {
                newCommit.abort(arrayList);
                if (newCommit != null) {
                    if (0 == 0) {
                        newCommit.close();
                        return;
                    }
                    try {
                        newCommit.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newCommit != null) {
                if (th != null) {
                    try {
                        newCommit.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newCommit.close();
                }
            }
            throw th4;
        }
    }

    public List<Committable> prepareCommit(boolean z, long j) throws IOException {
        ArrayList arrayList = new ArrayList();
        while (!this.result.isEmpty()) {
            try {
                Future<CommitMessage> peek = this.result.peek();
                if (!peek.isDone() && !z) {
                    break;
                }
                this.result.poll();
                arrayList.add(peek.get());
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting tasks done.", e);
            } catch (Exception e2) {
                throw new RuntimeException("Encountered an error while do compaction", e2);
            }
        }
        return (List) arrayList.stream().map(commitMessage -> {
            return new Committable(j, Committable.Kind.FILE, commitMessage);
        }).collect(Collectors.toList());
    }

    public Iterable<Future<CommitMessage>> result() {
        return this.result;
    }
}
