package org.apache.paimon.flink.service;

import java.time.Duration;
import java.util.ArrayList;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
import org.apache.paimon.utils.SerializationUtils;

/* loaded from: input_file:org/apache/paimon/flink/service/QueryFileMonitor.class */
public class QueryFileMonitor extends AbstractNonCoordinatedSource<InternalRow> {
    private static final long serialVersionUID = 1;
    private final Table table;
    private final long monitorInterval;

    /* loaded from: input_file:org/apache/paimon/flink/service/QueryFileMonitor$FileMonitorChannelComputer.class */
    private static class FileMonitorChannelComputer implements ChannelComputer<InternalRow> {
        private int numChannels;

        private FileMonitorChannelComputer() {
        }

        @Override // org.apache.paimon.table.sink.ChannelComputer
        public void setup(int i) {
            this.numChannels = i;
        }

        @Override // org.apache.paimon.table.sink.ChannelComputer
        public int channel(InternalRow internalRow) {
            return ChannelComputer.select(SerializationUtils.deserializeBinaryRow(internalRow.getBinary(1)), internalRow.getInt(2), this.numChannels);
        }

        public String toString() {
            return "FileMonitorChannelComputer{numChannels=" + this.numChannels + '}';
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/service/QueryFileMonitor$Reader.class */
    private class Reader extends AbstractNonCoordinatedSourceReader<InternalRow> {
        private transient StreamTableScan scan;
        private transient TableRead read;

        private Reader() {
        }

        @Override // org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader
        public void start() {
            ReadBuilder dropStats = new FileMonitorTable((FileStoreTable) QueryFileMonitor.this.table).newReadBuilder().dropStats();
            this.scan = dropStats.newStreamScan();
            this.read = dropStats.newRead();
        }

        public InputStatus pollNext(ReaderOutput<InternalRow> readerOutput) throws Exception {
            if (doScan(readerOutput)) {
                Thread.sleep(QueryFileMonitor.this.monitorInterval);
            }
            return InputStatus.MORE_AVAILABLE;
        }

        private boolean doScan(ReaderOutput<InternalRow> readerOutput) throws Exception {
            ArrayList arrayList = new ArrayList();
            RecordReader<InternalRow> createReader = this.read.createReader(this.scan.plan());
            arrayList.getClass();
            createReader.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            readerOutput.getClass();
            arrayList.forEach((v1) -> {
                r1.collect(v1);
            });
            return arrayList.isEmpty();
        }
    }

    public QueryFileMonitor(Table table) {
        this.table = table;
        this.monitorInterval = ((Duration) Options.fromMap(table.options()).get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis();
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<InternalRow, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<InternalRow> build(StreamExecutionEnvironment streamExecutionEnvironment, Table table) {
        return streamExecutionEnvironment.fromSource(new QueryFileMonitor(table), WatermarkStrategy.noWatermarks(), "FileMonitor-" + table.name(), InternalTypeInfo.fromRowType(FileMonitorTable.getRowType())).setParallelism(1);
    }

    public static ChannelComputer<InternalRow> createChannelComputer() {
        return new FileMonitorChannelComputer();
    }
}
