package org.apache.paimon.flink.source.operator;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/MonitorSource.class */
public class MonitorSource extends AbstractNonCoordinatedSource<Split> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);
    private final ReadBuilder readBuilder;
    private final long monitorInterval;
    private final boolean emitSnapshotWatermark;

    /* loaded from: input_file:org/apache/paimon/flink/source/operator/MonitorSource$Reader.class */
    private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
        private static final String CHECKPOINT_STATE = "CS";
        private static final String NEXT_SNAPSHOT_STATE = "NSS";
        private final StreamTableScan scan;
        private final SplitListState<Long> checkpointState;
        private final SplitListState<Tuple2<Long, Long>> nextSnapshotState;
        private final TreeMap<Long, Long> nextSnapshotPerCheckpoint;

        private Reader() {
            this.scan = MonitorSource.this.readBuilder.newStreamScan();
            this.checkpointState = new SplitListState<>(CHECKPOINT_STATE, l -> {
                return Long.toString(l.longValue());
            }, Long::parseLong);
            this.nextSnapshotState = new SplitListState<>(NEXT_SNAPSHOT_STATE, tuple2 -> {
                return tuple2.f0 + ":" + tuple2.f1;
            }, str -> {
                return Tuple2.of(Long.valueOf(Long.parseLong(str.split(":")[0])), Long.valueOf(Long.parseLong(str.split(":")[1])));
            });
            this.nextSnapshotPerCheckpoint = new TreeMap<>();
        }

        public void notifyCheckpointComplete(long j) {
            NavigableMap<Long, Long> headMap = this.nextSnapshotPerCheckpoint.headMap(Long.valueOf(j), true);
            OptionalLong max = headMap.values().stream().mapToLong((v0) -> {
                return v0.longValue();
            }).max();
            StreamTableScan streamTableScan = this.scan;
            streamTableScan.getClass();
            max.ifPresent((v1) -> {
                r1.notifyCheckpointComplete(v1);
            });
            headMap.clear();
        }

        @Override // org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader
        public List<SimpleSourceSplit> snapshotState(long j) {
            this.checkpointState.clear();
            Long checkpoint = this.scan.checkpoint();
            if (checkpoint != null) {
                this.checkpointState.add(checkpoint);
                this.nextSnapshotPerCheckpoint.put(Long.valueOf(j), checkpoint);
            }
            ArrayList arrayList = new ArrayList();
            this.nextSnapshotPerCheckpoint.forEach((l, l2) -> {
                arrayList.add(new Tuple2(l, l2));
            });
            this.nextSnapshotState.update(arrayList);
            if (MonitorSource.LOG.isDebugEnabled()) {
                MonitorSource.LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), checkpoint);
            }
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(this.checkpointState.snapshotState());
            arrayList2.addAll(this.nextSnapshotState.snapshotState());
            return arrayList2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader
        public void addSplits(List<SimpleSourceSplit> list) {
            MonitorSource.LOG.info("Restoring state for the {}.", getClass().getSimpleName());
            this.checkpointState.restoreState(list);
            this.nextSnapshotState.restoreState(list);
            List<Long> m336get = this.checkpointState.m336get();
            Preconditions.checkArgument(m336get.size() <= 1, getClass().getSimpleName() + " retrieved invalid state.");
            if (m336get.size() == 1) {
                this.scan.restore(m336get.get(0));
            }
            for (Tuple2<Long, Long> tuple2 : this.nextSnapshotState.m336get()) {
                this.nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
            }
        }

        public InputStatus pollNext(ReaderOutput<Split> readerOutput) throws Exception {
            Long watermark;
            try {
                List<Split> splits = this.scan.plan().splits();
                boolean isEmpty = splits.isEmpty();
                readerOutput.getClass();
                splits.forEach((v1) -> {
                    r1.collect(v1);
                });
                if (MonitorSource.this.emitSnapshotWatermark && (watermark = this.scan.watermark()) != null) {
                    readerOutput.emitWatermark(new Watermark(watermark.longValue()));
                }
                if (isEmpty) {
                    Thread.sleep(MonitorSource.this.monitorInterval);
                }
                return InputStatus.MORE_AVAILABLE;
            } catch (EndOfScanException e) {
                MonitorSource.LOG.info("Catching EndOfStreamException, the stream is finished.");
                return InputStatus.END_OF_INPUT;
            }
        }
    }

    public MonitorSource(ReadBuilder readBuilder, long j, boolean z) {
        this.readBuilder = readBuilder;
        this.monitorInterval = j;
        this.emitSnapshotWatermark = z;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<Split, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment streamExecutionEnvironment, String str, TypeInformation<RowData> typeInformation, ReadBuilder readBuilder, long j, boolean z, boolean z2, BucketMode bucketMode, NestedProjectedRowData nestedProjectedRowData) {
        SingleOutputStreamOperator forceNonParallel = streamExecutionEnvironment.fromSource(new MonitorSource(readBuilder, j, z), WatermarkStrategy.noWatermarks(), str + "-Monitor", new JavaTypeInfo(Split.class)).forceNonParallel();
        return (bucketMode == BucketMode.BUCKET_UNAWARE ? shuffleUnwareBucket(forceNonParallel) : shuffleNonUnwareBucket(forceNonParallel, z2)).transform(str + "-Reader", typeInformation, new ReadOperator(readBuilder, nestedProjectedRowData));
    }

    private static DataStream<Split> shuffleUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
        return singleOutputStreamOperator.rebalance();
    }

    private static DataStream<Split> shuffleNonUnwareBucket(SingleOutputStreamOperator<Split> singleOutputStreamOperator, boolean z) {
        return singleOutputStreamOperator.partitionCustom((tuple2, i) -> {
            return z ? ChannelComputer.select((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), i) : ChannelComputer.select(((Integer) tuple2.f1).intValue(), i);
        }, split -> {
            DataSplit dataSplit = (DataSplit) split;
            return Tuple2.of(dataSplit.partition(), Integer.valueOf(dataSplit.bucket()));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1314721904:
                if (implMethodName.equals("lambda$shuffleNonUnwareBucket$3fe6d9a8$1")) {
                    z = false;
                    break;
                }
                break;
            case -761658187:
                if (implMethodName.equals("lambda$shuffleNonUnwareBucket$96770dbf$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/Partitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals(ActionFactory.PARTITION) && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;I)I") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/source/operator/MonitorSource") && serializedLambda.getImplMethodSignature().equals("(ZLorg/apache/flink/api/java/tuple/Tuple2;I)I")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    return (tuple2, i) -> {
                        return booleanValue ? ChannelComputer.select((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), i) : ChannelComputer.select(((Integer) tuple2.f1).intValue(), i);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/source/operator/MonitorSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/source/Split;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return split -> {
                        DataSplit dataSplit = (DataSplit) split;
                        return Tuple2.of(dataSplit.partition(), Integer.valueOf(dataSplit.bucket()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
