package org.apache.paimon.flink.source.operator;

import java.util.regex.Pattern;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.class */
public class CombinedUnawareStreamingSource extends CombinedCompactorSource<MultiTableUnawareAppendCompactionTask> {
    private final long monitorInterval;

    /* loaded from: input_file:org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource$Reader.class */
    private class Reader extends AbstractNonCoordinatedSourceReader<MultiTableUnawareAppendCompactionTask> {
        private MultiTableScanBase<MultiTableUnawareAppendCompactionTask> tableScan;

        private Reader() {
        }

        @Override // org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader
        public void start() {
            super.start();
            this.tableScan = new MultiUnawareBucketTableScan(CombinedUnawareStreamingSource.this.catalogLoader, CombinedUnawareStreamingSource.this.includingPattern, CombinedUnawareStreamingSource.this.excludingPattern, CombinedUnawareStreamingSource.this.databasePattern, CombinedUnawareStreamingSource.this.isStreaming);
        }

        public InputStatus pollNext(ReaderOutput<MultiTableUnawareAppendCompactionTask> readerOutput) throws Exception {
            MultiTableScanBase.ScanResult scanTable = this.tableScan.scanTable(readerOutput);
            if (scanTable == MultiTableScanBase.ScanResult.FINISHED) {
                return InputStatus.END_OF_INPUT;
            }
            if (scanTable == MultiTableScanBase.ScanResult.IS_EMPTY) {
                Thread.sleep(CombinedUnawareStreamingSource.this.monitorInterval);
            }
            return InputStatus.MORE_AVAILABLE;
        }

        @Override // org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader
        public void close() throws Exception {
            super.close();
            if (this.tableScan != null) {
                this.tableScan.close();
            }
        }
    }

    public CombinedUnawareStreamingSource(CatalogLoader catalogLoader, Pattern pattern, Pattern pattern2, Pattern pattern3, long j) {
        super(catalogLoader, pattern, pattern2, pattern3, true);
        this.monitorInterval = j;
    }

    public SourceReader<MultiTableUnawareAppendCompactionTask, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<MultiTableUnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment streamExecutionEnvironment, String str, CatalogLoader catalogLoader, Pattern pattern, Pattern pattern2, Pattern pattern3, long j) {
        return streamExecutionEnvironment.fromSource(new CombinedUnawareStreamingSource(catalogLoader, pattern, pattern2, pattern3, j), WatermarkStrategy.noWatermarks(), str, new MultiTableCompactionTaskTypeInfo()).forceNonParallel();
    }
}
