package org.apache.paimon.flink.lookup;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.TypeUtils;

/* loaded from: input_file:org/apache/paimon/flink/lookup/LookupStreamingReader.class */
public class LookupStreamingReader {
    private final Table table;
    private final int[] projection;
    private final ReadBuilder readBuilder;

    @Nullable
    private final Predicate projectedPredicate;
    private final StreamTableScan scan;
    private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS = Arrays.asList(CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TAG_NAME, CoreOptions.SCAN_VERSION);

    public LookupStreamingReader(Table table, int[] iArr, @Nullable Predicate predicate, Set<Integer> set) {
        Filter<Integer> filter;
        this.table = unsetTimeTravelOptions(table);
        this.projection = iArr;
        ReadBuilder withFilter = this.table.newReadBuilder().withProjection(iArr).withFilter(predicate);
        if (set == null) {
            filter = null;
        } else {
            set.getClass();
            filter = (v1) -> {
                return r2.contains(v1);
            };
        }
        this.readBuilder = withFilter.withBucketFilter(filter);
        this.scan = this.readBuilder.newStreamScan();
        if (predicate == null) {
            this.projectedPredicate = null;
            return;
        }
        List<String> fieldNames = table.rowType().getFieldNames();
        List<String> primaryKeys = table.primaryKeys();
        this.projectedPredicate = PredicateBuilder.transformFieldMapping(predicate, IntStream.range(0, table.rowType().getFieldCount()).map(i -> {
            int indexOf = Ints.indexOf(iArr, i);
            if (primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i))) {
                return indexOf;
            }
            return -1;
        }).toArray()).orElse(null);
    }

    private Table unsetTimeTravelOptions(Table table) {
        FileStoreTable fileStoreTable = (FileStoreTable) table;
        HashMap hashMap = new HashMap(fileStoreTable.options());
        Stream<R> map = TIME_TRAVEL_OPTIONS.stream().map((v0) -> {
            return v0.key();
        });
        hashMap.getClass();
        map.forEach((v1) -> {
            r1.remove(v1);
        });
        CoreOptions.StartupMode startupMode = CoreOptions.fromMap(hashMap).startupMode();
        if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
            startupMode = CoreOptions.StartupMode.LATEST_FULL;
        }
        hashMap.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        return fileStoreTable.copy(fileStoreTable.schema().copy(hashMap));
    }

    public RecordReader<InternalRow> nextBatch(boolean z) throws Exception {
        RecordReader<InternalRow> create;
        List<Split> splits = this.scan.plan().splits();
        CoreOptions fromMap = CoreOptions.fromMap(this.table.options());
        FunctionWithIOException functionWithIOException = split -> {
            return this.readBuilder.newRead().createReader(split);
        };
        RowType project = TypeUtils.project(this.table.rowType(), this.projection);
        if (z) {
            create = SplitsParallelReadUtil.parallelExecute(project, functionWithIOException, splits, fromMap.pageSize(), ((Integer) new Options(this.table.options()).get(FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM)).intValue());
        } else {
            ArrayList arrayList = new ArrayList();
            for (Split split2 : splits) {
                arrayList.add(() -> {
                    return (RecordReader) functionWithIOException.apply(split2);
                });
            }
            create = ConcatRecordReader.create(arrayList);
        }
        if (this.projectedPredicate != null) {
            Predicate predicate = this.projectedPredicate;
            predicate.getClass();
            create = create.filter(predicate::test);
        }
        return create;
    }

    @Nullable
    public Long nextSnapshotId() {
        return this.scan.checkpoint();
    }
}
