package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.math.BigDecimal;
import java.math.RoundingMode;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullSizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@DoFn.UnboundedPerElement
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.class */
public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionRecord, KV<ByteString, ChangeStreamRecord>> {
    private static final long serialVersionUID = 4418739381635104479L;
    private static final BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE);
    private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
    private static final Duration HEARTBEAT_DURATION = Duration.standardSeconds(1);
    private final DaoFactory daoFactory;
    private final ChangeStreamMetrics metrics;
    private final ActionFactory actionFactory;
    private final Duration backlogReplicationAdjustment;
    private SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator;
    private ReadChangeStreamPartitionAction readChangeStreamPartitionAction;
    private final SerializableSupplier<Instant> clock;

    public ReadChangeStreamPartitionDoFn(DaoFactory daoFactory, ActionFactory actionFactory, ChangeStreamMetrics changeStreamMetrics, Duration duration) {
        this(daoFactory, actionFactory, changeStreamMetrics, duration, Instant::now);
    }

    @VisibleForTesting
    ReadChangeStreamPartitionDoFn(DaoFactory daoFactory, ActionFactory actionFactory, ChangeStreamMetrics changeStreamMetrics, Duration duration, SerializableSupplier<Instant> serializableSupplier) {
        this.daoFactory = daoFactory;
        this.metrics = changeStreamMetrics;
        this.actionFactory = actionFactory;
        this.backlogReplicationAdjustment = duration;
        this.sizeEstimator = new NullSizeEstimator();
        this.clock = serializableSupplier;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Element PartitionRecord partitionRecord) {
        return partitionRecord.getParentLowWatermark();
    }

    @DoFn.NewWatermarkEstimator
    public ManualWatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.Manual(instant);
    }

    @DoFn.GetInitialRestriction
    public StreamProgress initialRestriction() {
        this.metrics.incPartitionStreamCount();
        return new StreamProgress();
    }

    @DoFn.NewTracker
    public ReadChangeStreamPartitionProgressTracker restrictionTracker(@DoFn.Restriction StreamProgress streamProgress) {
        return new ReadChangeStreamPartitionProgressTracker(streamProgress);
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Restriction StreamProgress streamProgress) {
        if (streamProgress == null) {
            return 0.0d;
        }
        Instant estimatedLowWatermark = streamProgress.getEstimatedLowWatermark();
        BigDecimal throughputEstimate = streamProgress.getThroughputEstimate();
        Instant lastRunTimestamp = streamProgress.getLastRunTimestamp();
        if (estimatedLowWatermark == null || throughputEstimate == null || lastRunTimestamp == null) {
            return 0.0d;
        }
        String str = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
        if (streamProgress.getCurrentToken() != null) {
            str = ByteStringRangeHelper.formatByteStringRange(((ChangeStreamContinuationToken) Preconditions.checkNotNull(streamProgress.getCurrentToken())).getPartition());
        }
        long millis = streamProgress.isHeartbeat() ? ((Instant) this.clock.get()).getMillis() - streamProgress.getLastRunTimestamp().getMillis() : Math.max(0L, Duration.millis(((Instant) this.clock.get()).getMillis() - estimatedLowWatermark.getMillis()).minus(this.backlogReplicationAdjustment).getMillis());
        double doubleValue = throughputEstimate.multiply(BigDecimal.valueOf(millis)).divide(BigDecimal.valueOf(1000L), 3, RoundingMode.DOWN).min(MAX_DOUBLE).max(BigDecimal.ZERO).doubleValue();
        LOG.debug("Estimated size (per second): partition: {}, isHeartbeat: {}, throughputBytes: {} x watermarkLagMillis {} = {}, lastRun = {}", new Object[]{str, Boolean.valueOf(streamProgress.isHeartbeat()), throughputEstimate, Long.valueOf(millis), Double.valueOf(doubleValue), streamProgress.getLastRunTimestamp()});
        return doubleValue;
    }

    @DoFn.Setup
    public void setup() throws IOException {
        this.readChangeStreamPartitionAction = this.actionFactory.readChangeStreamPartitionAction(this.daoFactory.getMetadataTableDao(), this.daoFactory.getChangeStreamDao(), this.metrics, this.actionFactory.changeStreamAction(this.metrics), HEARTBEAT_DURATION, this.sizeEstimator);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element PartitionRecord partitionRecord, RestrictionTracker<StreamProgress, StreamProgress> restrictionTracker, DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws InterruptedException, IOException {
        return this.readChangeStreamPartitionAction.run(partitionRecord, restrictionTracker, outputReceiver, manualWatermarkEstimator);
    }

    public void setSizeEstimator(CoderSizeEstimator<KV<ByteString, ChangeStreamRecord>> coderSizeEstimator) {
        this.sizeEstimator = coderSizeEstimator;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 109270:
                if (implMethodName.equals("now")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/util/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/joda/time/Instant") && serializedLambda.getImplMethodSignature().equals("()Lorg/joda/time/Instant;")) {
                    return Instant::now;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
