package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.data.v2.models.Range;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.DetectNewPartitionsState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.InitialPipelineState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.OrphanedMetadataCleaner;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.class */
public class DetectNewPartitionsAction {
    private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
    private static final Duration DEBUG_WATERMARK_DELAY = Duration.standardMinutes(5);
    private final ChangeStreamMetrics metrics;
    private final MetadataTableDao metadataTableDao;

    @Nullable
    private final Instant endTime;
    private final ProcessNewPartitionsAction processNewPartitionsAction;
    private final GenerateInitialPartitionsAction generateInitialPartitionsAction;
    private final ResumeFromPreviousPipelineAction resumeFromPreviousPipelineAction;
    private transient PartitionReconciler partitionReconciler;
    private transient OrphanedMetadataCleaner orphanedMetadataCleaner;

    public DetectNewPartitionsAction(ChangeStreamMetrics changeStreamMetrics, MetadataTableDao metadataTableDao, @Nullable Instant instant, GenerateInitialPartitionsAction generateInitialPartitionsAction, ResumeFromPreviousPipelineAction resumeFromPreviousPipelineAction, ProcessNewPartitionsAction processNewPartitionsAction) {
        this.metrics = changeStreamMetrics;
        this.metadataTableDao = metadataTableDao;
        this.endTime = instant;
        this.generateInitialPartitionsAction = generateInitialPartitionsAction;
        this.resumeFromPreviousPipelineAction = resumeFromPreviousPipelineAction;
        this.processNewPartitionsAction = processNewPartitionsAction;
    }

    private Optional<Instant> getNewWatermark(List<StreamPartitionWithWatermark> list, List<NewPartition> list2) {
        ArrayList arrayList = new ArrayList();
        Instant ofEpochMilli = Instant.ofEpochMilli(Long.MAX_VALUE);
        ArrayList arrayList2 = new ArrayList();
        for (StreamPartitionWithWatermark streamPartitionWithWatermark : list) {
            if (streamPartitionWithWatermark.getWatermark().plus(DEBUG_WATERMARK_DELAY).isBeforeNow()) {
                arrayList.add(streamPartitionWithWatermark);
            }
            if (streamPartitionWithWatermark.getWatermark().compareTo(ofEpochMilli) < 0) {
                ofEpochMilli = streamPartitionWithWatermark.getWatermark();
            }
            arrayList2.add(streamPartitionWithWatermark.getPartition());
        }
        if (!arrayList.isEmpty()) {
            LOG.warn("DNP: Updating watermark is held back by {} partitions : {}", Integer.valueOf(arrayList.size()), arrayList.stream().map(streamPartitionWithWatermark2 -> {
                return ByteStringRangeHelper.formatByteStringRange(streamPartitionWithWatermark2.getPartition()) + " => " + streamPartitionWithWatermark2.getWatermark();
            }).collect(Collectors.joining(", ", "{", "}")));
        }
        List<Range.ByteStringRange> overlappingPartitions = ByteStringRangeHelper.getOverlappingPartitions(arrayList2);
        if (!overlappingPartitions.isEmpty()) {
            LOG.warn("DNP: Updating watermark failed due to overlapping: {}", ByteStringRangeHelper.partitionsToString(overlappingPartitions));
            return Optional.empty();
        }
        for (NewPartition newPartition : list2) {
            arrayList2.addAll(newPartition.getParentPartitions());
            if (newPartition.getLowWatermark().compareTo(ofEpochMilli) < 0) {
                ofEpochMilli = newPartition.getLowWatermark();
            }
        }
        List<Range.ByteStringRange> missingPartitionsFromEntireKeySpace = ByteStringRangeHelper.getMissingPartitionsFromEntireKeySpace(arrayList2);
        if (missingPartitionsFromEntireKeySpace.isEmpty()) {
            LOG.info("DNP: Updating watermark: " + ofEpochMilli);
            return Optional.of(ofEpochMilli);
        }
        LOG.warn("DNP: Updating watermark failed due to missing {} partitions : {}.", Integer.valueOf(missingPartitionsFromEntireKeySpace.size()), ByteStringRangeHelper.partitionsToString(missingPartitionsFromEntireKeySpace));
        return Optional.empty();
    }

    private void processReconcilerPartitions(DoFn.OutputReceiver<PartitionRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, Instant instant) {
        for (PartitionRecord partitionRecord : this.partitionReconciler.getPartitionsToReconcile(manualWatermarkEstimator.currentWatermark(), instant)) {
            partitionRecord.setUuid(UniqueIdGenerator.getNextId());
            partitionRecord.setEndTime(this.endTime);
            Iterator<NewPartition> it = partitionRecord.getParentPartitions().iterator();
            while (it.hasNext()) {
                this.metadataTableDao.markNewPartitionForDeletion(it.next());
            }
            outputReceiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
            LOG.warn("DNP: Reconciling missing partition: {}", partitionRecord);
        }
    }

    private void cleanUpOrphanedMetadata() {
        for (NewPartition newPartition : this.orphanedMetadataCleaner.getOrphanedNewPartitions()) {
            this.metrics.incOrphanedNewPartitionCleanedCount();
            this.metadataTableDao.markNewPartitionForDeletion(newPartition);
            this.metadataTableDao.deleteNewPartition(newPartition);
        }
    }

    private boolean shouldUpdateWatermark(long j, @Nullable DetectNewPartitionsState detectNewPartitionsState) {
        return j % 2 == 0 && (detectNewPartitionsState == null || detectNewPartitionsState.getWatermarkLastUpdated().plus(Duration.standardSeconds(10L)).isBeforeNow());
    }

    @VisibleForTesting
    public DoFn.ProcessContinuation run(RestrictionTracker<OffsetRange, Long> restrictionTracker, DoFn.OutputReceiver<PartitionRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, InitialPipelineState initialPipelineState) throws Exception {
        LOG.debug("DNP: Watermark: " + manualWatermarkEstimator.getState());
        LOG.debug("DNP: CurrentTracker: " + ((OffsetRange) restrictionTracker.currentRestriction()).getFrom());
        if (((OffsetRange) restrictionTracker.currentRestriction()).getFrom() == 0) {
            if (!restrictionTracker.tryClaim(0L)) {
                LOG.error("Could not claim initial DetectNewPartition restriction. No partitions are outputted.");
                return DoFn.ProcessContinuation.stop();
            }
            manualWatermarkEstimator.setWatermark(initialPipelineState.getStartTime());
            if (initialPipelineState.isResume()) {
                this.resumeFromPreviousPipelineAction.run(outputReceiver);
            } else {
                this.generateInitialPartitionsAction.run(outputReceiver, initialPipelineState.getStartTime());
            }
            return DoFn.ProcessContinuation.resume();
        }
        this.partitionReconciler = new PartitionReconciler(this.metadataTableDao, this.metrics);
        this.orphanedMetadataCleaner = new OrphanedMetadataCleaner();
        DetectNewPartitionsState readDetectNewPartitionsState = this.metadataTableDao.readDetectNewPartitionsState();
        if (readDetectNewPartitionsState != null) {
            manualWatermarkEstimator.setWatermark(readDetectNewPartitionsState.getWatermark());
        }
        if (this.endTime != null && !manualWatermarkEstimator.currentWatermark().isBefore(this.endTime)) {
            restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getTo()));
            return DoFn.ProcessContinuation.stop();
        }
        if (!restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom()))) {
            LOG.warn("DNP: Checkpointing, stopping this run: " + restrictionTracker.currentRestriction());
            return DoFn.ProcessContinuation.stop();
        }
        List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark = shouldUpdateWatermark(((OffsetRange) restrictionTracker.currentRestriction()).getFrom(), readDetectNewPartitionsState) ? this.metadataTableDao.readStreamPartitionsWithWatermark() : null;
        List<NewPartition> readNewPartitions = this.metadataTableDao.readNewPartitions();
        ArrayList arrayList = new ArrayList();
        for (NewPartition newPartition : readNewPartitions) {
            if (this.processNewPartitionsAction.processNewPartition(newPartition, outputReceiver)) {
                arrayList.add(newPartition.getPartition());
            } else if (readStreamPartitionsWithWatermark != null) {
                this.partitionReconciler.addIncompleteNewPartitions(newPartition);
                this.orphanedMetadataCleaner.addIncompleteNewPartitions(newPartition);
            }
        }
        if (readStreamPartitionsWithWatermark != null) {
            Optional<Instant> newWatermark = getNewWatermark(readStreamPartitionsWithWatermark, readNewPartitions);
            MetadataTableDao metadataTableDao = this.metadataTableDao;
            Objects.requireNonNull(metadataTableDao);
            newWatermark.ifPresent(metadataTableDao::updateDetectNewPartitionWatermark);
            if (((OffsetRange) restrictionTracker.currentRestriction()).getFrom() > 50) {
                List list = (List) readStreamPartitionsWithWatermark.stream().map((v0) -> {
                    return v0.getPartition();
                }).collect(Collectors.toList());
                list.addAll(arrayList);
                List<Range.ByteStringRange> missingPartitionsFromEntireKeySpace = ByteStringRangeHelper.getMissingPartitionsFromEntireKeySpace(list);
                this.orphanedMetadataCleaner.addMissingPartitions(missingPartitionsFromEntireKeySpace);
                this.partitionReconciler.addMissingPartitions(missingPartitionsFromEntireKeySpace);
                processReconcilerPartitions(outputReceiver, manualWatermarkEstimator, initialPipelineState.getStartTime());
                cleanUpOrphanedMetadata();
            }
        }
        return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(100L));
    }
}
