package org.apache.paimon.flink.source;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/BucketUnawareCompactSource.class */
public class BucketUnawareCompactSource extends RichSourceFunction<UnawareAppendCompactionTask> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class);
    private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator";
    private final FileStoreTable table;
    private final boolean streaming;
    private final long scanInterval;
    private final Predicate filter;
    private transient UnawareAppendTableCompactionCoordinator compactionCoordinator;
    private transient SourceFunction.SourceContext<UnawareAppendCompactionTask> ctx;
    private volatile boolean isRunning = true;

    public BucketUnawareCompactSource(FileStoreTable fileStoreTable, boolean z, long j, @Nullable Predicate predicate) {
        this.table = fileStoreTable;
        this.streaming = z;
        this.scanInterval = j;
        this.filter = predicate;
    }

    public void open(Configuration configuration) throws Exception {
        this.compactionCoordinator = new UnawareAppendTableCompactionCoordinator(this.table, this.streaming, this.filter);
        Preconditions.checkArgument(getRuntimeContext().getNumberOfParallelSubtasks() == 1, "Compaction Operator parallelism in paimon MUST be one.");
    }

    public void run(SourceFunction.SourceContext<UnawareAppendCompactionTask> sourceContext) throws Exception {
        boolean isEmpty;
        this.ctx = sourceContext;
        while (this.isRunning) {
            synchronized (this.ctx.getCheckpointLock()) {
                if (!this.isRunning) {
                    return;
                }
                try {
                    List<UnawareAppendCompactionTask> run = this.compactionCoordinator.run();
                    isEmpty = run.isEmpty();
                    SourceFunction.SourceContext<UnawareAppendCompactionTask> sourceContext2 = this.ctx;
                    sourceContext2.getClass();
                    run.forEach((v1) -> {
                        r1.collect(v1);
                    });
                } catch (EndOfScanException e) {
                    LOG.info("Catching EndOfStreamException, the stream is finished.");
                    return;
                }
            }
            if (isEmpty) {
                Thread.sleep(this.scanInterval);
            }
        }
    }

    public void cancel() {
        if (this.ctx == null) {
            this.isRunning = false;
            return;
        }
        synchronized (this.ctx.getCheckpointLock()) {
            this.isRunning = false;
        }
    }

    public static DataStreamSource<UnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment streamExecutionEnvironment, BucketUnawareCompactSource bucketUnawareCompactSource, boolean z, String str) {
        return new DataStreamSource(streamExecutionEnvironment, new CompactionTaskTypeInfo(), new StreamSource(bucketUnawareCompactSource), false, "Compaction Coordinator : " + str, z ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED).setParallelism(1).setMaxParallelism(1);
    }
}
