package org.apache.paimon.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.class */
public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
    protected final long discoveryInterval;
    protected final Set<Integer> readersAwaitingSplit;
    protected final FileStoreSourceSplitGenerator splitGenerator;
    protected final StreamTableScan scan;
    protected final SplitAssigner splitAssigner;
    protected final ConsumerProgressCalculator consumerProgressCalculator;
    private final int splitMaxNum;
    private final boolean shuffleBucketWithPartition;

    @Nullable
    protected Long nextSnapshotId;
    protected boolean finished = false;
    private boolean stopTriggerScan = false;

    /* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumerator$PlanWithNextSnapshotId.class */
    protected static class PlanWithNextSnapshotId {
        private final TableScan.Plan plan;
        private final Long nextSnapshotId;

        public PlanWithNextSnapshotId(TableScan.Plan plan, Long l) {
            this.plan = plan;
            this.nextSnapshotId = l;
        }

        public TableScan.Plan plan() {
            return this.plan;
        }

        public Long nextSnapshotId() {
            return this.nextSnapshotId;
        }
    }

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext, Collection<FileStoreSourceSplit> collection, @Nullable Long l, long j, StreamTableScan streamTableScan, BucketMode bucketMode, int i, boolean z) {
        Preconditions.checkArgument(j > 0);
        this.context = (SplitEnumeratorContext) Preconditions.checkNotNull(splitEnumeratorContext);
        this.nextSnapshotId = l;
        this.discoveryInterval = j;
        this.readersAwaitingSplit = new LinkedHashSet();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.scan = streamTableScan;
        this.splitAssigner = createSplitAssigner(bucketMode);
        this.splitMaxNum = splitEnumeratorContext.currentParallelism() * i;
        this.shuffleBucketWithPartition = z;
        addSplits(collection);
        this.consumerProgressCalculator = new ConsumerProgressCalculator(splitEnumeratorContext.currentParallelism());
    }

    @VisibleForTesting
    void enableTriggerScan() {
        this.stopTriggerScan = false;
    }

    protected void addSplits(Collection<FileStoreSourceSplit> collection) {
        collection.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit fileStoreSourceSplit) {
        this.splitAssigner.addSplit(assignSuggestedTask(fileStoreSourceSplit), fileStoreSourceSplit);
    }

    public void start() {
        this.context.callAsync(this::scanNextSnapshot, this::processDiscoveredSplits, 0L, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int i) {
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        this.readersAwaitingSplit.add(Integer.valueOf(i));
        assignSplits();
        if (!this.readersAwaitingSplit.contains(Integer.valueOf(i)) || this.stopTriggerScan) {
            return;
        }
        this.stopTriggerScan = true;
        this.context.callAsync(this::scanNextSnapshot, this::processDiscoveredSplits);
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (sourceEvent instanceof ReaderConsumeProgressEvent) {
            this.consumerProgressCalculator.updateConsumeProgress(i, (ReaderConsumeProgressEvent) sourceEvent);
        } else {
            LOG.error("Received unrecognized event: {}", sourceEvent);
        }
    }

    public void addSplitsBack(List<FileStoreSourceSplit> list, int i) {
        LOG.debug("File Source Enumerator adds splits back: {}", list);
        this.splitAssigner.addSplitsBack(i, list);
    }

    @Override // 
    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsCheckpoint mo290snapshotState(long j) throws Exception {
        PendingSplitsCheckpoint pendingSplitsCheckpoint = new PendingSplitsCheckpoint(new ArrayList(this.splitAssigner.remainingSplits()), this.nextSnapshotId);
        this.consumerProgressCalculator.notifySnapshotState(j, this.readersAwaitingSplit, num -> {
            return this.splitAssigner.getNextSnapshotId(num.intValue()).orElse(this.nextSnapshotId);
        }, this.context.currentParallelism());
        LOG.debug("Source Checkpoint is {}", pendingSplitsCheckpoint);
        return pendingSplitsCheckpoint;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        OptionalLong notifyCheckpointComplete = this.consumerProgressCalculator.notifyCheckpointComplete(j);
        StreamTableScan streamTableScan = this.scan;
        streamTableScan.getClass();
        notifyCheckpointComplete.ifPresent((v1) -> {
            r1.notifyCheckpointComplete(v1);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
        return this.splitAssigner.numberOfRemainingSplits() >= this.splitMaxNum ? Optional.empty() : Optional.of(new PlanWithNextSnapshotId(this.scan.plan(), this.scan.checkpoint()));
    }

    protected void processDiscoveredSplits(Optional<PlanWithNextSnapshotId> optional, Throwable th) {
        if (th != null) {
            if (!(th instanceof EndOfScanException)) {
                LOG.error("Failed to enumerate files", th);
                throw new RuntimeException(th);
            }
            LOG.debug("Catching EndOfStreamException, the stream is finished.");
            this.finished = true;
            assignSplits();
            return;
        }
        if (optional.isPresent()) {
            PlanWithNextSnapshotId planWithNextSnapshotId = optional.get();
            this.nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
            TableScan.Plan plan = planWithNextSnapshotId.plan;
            if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
                this.stopTriggerScan = true;
                return;
            }
            this.stopTriggerScan = false;
            if (plan.splits().isEmpty()) {
                return;
            }
            addSplits(this.splitGenerator.createSplits(plan));
            assignSplits();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void assignSplits() {
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = this.readersAwaitingSplit.iterator();
        Set keySet = this.context.registeredReaders().keySet();
        while (it.hasNext()) {
            Integer next = it.next();
            if (keySet.contains(next)) {
                List<FileStoreSourceSplit> next2 = this.splitAssigner.getNext(next.intValue(), null);
                if (!next2.isEmpty()) {
                    hashMap.put(next, next2);
                    this.consumerProgressCalculator.updateAssignInformation(next.intValue(), next2.get(0));
                }
            } else {
                it.remove();
            }
        }
        if (noMoreSplits()) {
            Iterator<Integer> it2 = this.readersAwaitingSplit.iterator();
            while (it2.hasNext()) {
                Integer next3 = it2.next();
                if (!hashMap.containsKey(next3)) {
                    this.context.signalNoMoreSplits(next3.intValue());
                    it2.remove();
                }
            }
        }
        Set keySet2 = hashMap.keySet();
        Set<Integer> set = this.readersAwaitingSplit;
        set.getClass();
        keySet2.forEach((v1) -> {
            r1.remove(v1);
        });
        this.context.assignSplits(new SplitsAssignment(hashMap));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int assignSuggestedTask(FileStoreSourceSplit fileStoreSourceSplit) {
        DataSplit dataSplit = (DataSplit) fileStoreSourceSplit.split();
        return this.shuffleBucketWithPartition ? ChannelComputer.select(dataSplit.partition(), dataSplit.bucket(), this.context.currentParallelism()) : ChannelComputer.select(dataSplit.bucket(), this.context.currentParallelism());
    }

    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
        return bucketMode == BucketMode.BUCKET_UNAWARE ? new FIFOSplitAssigner(Collections.emptyList()) : new PreAssignSplitAssigner(1, this.context, Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean noMoreSplits() {
        return this.finished;
    }
}
