package org.apache.paimon.flink.source;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/BucketUnawareCompactSource.class */
public class BucketUnawareCompactSource extends AbstractNonCoordinatedSource<UnawareAppendCompactionTask> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class);
    private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator";
    private final FileStoreTable table;
    private final boolean streaming;
    private final long scanInterval;
    private final Predicate filter;

    /* loaded from: input_file:org/apache/paimon/flink/source/BucketUnawareCompactSource$BucketUnawareCompactSourceReader.class */
    public static class BucketUnawareCompactSourceReader extends AbstractNonCoordinatedSourceReader<UnawareAppendCompactionTask> {
        private final UnawareAppendTableCompactionCoordinator compactionCoordinator;
        private final long scanInterval;

        public BucketUnawareCompactSourceReader(FileStoreTable fileStoreTable, boolean z, Predicate predicate, long j) {
            this.scanInterval = j;
            this.compactionCoordinator = new UnawareAppendTableCompactionCoordinator(fileStoreTable, z, predicate);
        }

        public InputStatus pollNext(ReaderOutput<UnawareAppendCompactionTask> readerOutput) throws Exception {
            try {
                List<UnawareAppendCompactionTask> run = this.compactionCoordinator.run();
                boolean isEmpty = run.isEmpty();
                readerOutput.getClass();
                run.forEach((v1) -> {
                    r1.collect(v1);
                });
                if (isEmpty) {
                    Thread.sleep(this.scanInterval);
                }
                return InputStatus.MORE_AVAILABLE;
            } catch (EndOfScanException e) {
                BucketUnawareCompactSource.LOG.info("Catching EndOfStreamException, the stream is finished.");
                return InputStatus.END_OF_INPUT;
            }
        }
    }

    public BucketUnawareCompactSource(FileStoreTable fileStoreTable, boolean z, long j, @Nullable Predicate predicate) {
        this.table = fileStoreTable;
        this.streaming = z;
        this.scanInterval = j;
        this.filter = predicate;
    }

    public Boundedness getBoundedness() {
        return this.streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<UnawareAppendCompactionTask, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        Preconditions.checkArgument(sourceReaderContext.currentParallelism() == 1, "Compaction Operator parallelism in paimon MUST be one.");
        return new BucketUnawareCompactSourceReader(this.table, this.streaming, this.filter, this.scanInterval);
    }

    public static DataStreamSource<UnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment streamExecutionEnvironment, BucketUnawareCompactSource bucketUnawareCompactSource, String str) {
        return streamExecutionEnvironment.fromSource(bucketUnawareCompactSource, WatermarkStrategy.noWatermarks(), "Compaction Coordinator : " + str, new CompactionTaskTypeInfo()).setParallelism(1).setMaxParallelism(1);
    }
}
