package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.class */
public class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PerSubscriptionPartitionSdf.class);
    private final ManagedFactory<TopicBacklogReader> backlogReaderFactory;
    private final ManagedFactory<BlockingCommitter> committerFactory;
    private final SubscriptionPartitionProcessorFactory processorFactory;
    private final SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory;
    private final SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> trackerFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PerSubscriptionPartitionSdf(ManagedFactory<TopicBacklogReader> managedFactory, ManagedFactory<BlockingCommitter> managedFactory2, SerializableFunction<SubscriptionPartition, InitialOffsetReader> serializableFunction, SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress> serializableBiFunction, SubscriptionPartitionProcessorFactory subscriptionPartitionProcessorFactory) {
        this.backlogReaderFactory = managedFactory;
        this.committerFactory = managedFactory2;
        this.processorFactory = subscriptionPartitionProcessorFactory;
        this.offsetReaderFactory = serializableFunction;
        this.trackerFactory = serializableBiFunction;
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        ManagedFactory<BlockingCommitter> managedFactory = this.committerFactory;
        try {
            ManagedFactory<TopicBacklogReader> managedFactory2 = this.backlogReaderFactory;
            if (managedFactory2 != null) {
                managedFactory2.close();
            }
            if (managedFactory != null) {
                managedFactory.close();
            }
        } catch (Throwable th) {
            if (managedFactory != null) {
                try {
                    managedFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimators.MonotonicallyIncreasing newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return new WatermarkEstimators.MonotonicallyIncreasing(instant);
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(RestrictionTracker<OffsetByteRange, OffsetByteProgress> restrictionTracker, @DoFn.Element SubscriptionPartition subscriptionPartition, DoFn.OutputReceiver<SequencedMessage> outputReceiver) throws Exception {
        LOG.debug("Starting process for {} at {}", subscriptionPartition, Instant.now());
        SubscriptionPartitionProcessor newProcessor = this.processorFactory.newProcessor(subscriptionPartition, restrictionTracker, outputReceiver);
        DoFn.ProcessContinuation run = newProcessor.run();
        LOG.debug("Starting commit for {} at {}", subscriptionPartition, Instant.now());
        newProcessor.lastClaimed().ifPresent(offset -> {
            try {
                this.committerFactory.create(subscriptionPartition).commitOffset(Offset.of(offset.value() + 1));
            } catch (Exception e) {
                throw ExtractStatus.toCanonical(e).underlying;
            }
        });
        LOG.debug("Finishing process for {} at {}", subscriptionPartition, Instant.now());
        return run;
    }

    @DoFn.GetInitialRestriction
    public OffsetByteRange getInitialRestriction(@DoFn.Element SubscriptionPartition subscriptionPartition) {
        return OffsetByteRange.of(new OffsetRange(((InitialOffsetReader) this.offsetReaderFactory.apply(subscriptionPartition)).read().value(), Long.MAX_VALUE));
    }

    @DoFn.NewTracker
    public TrackerWithProgress newTracker(@DoFn.Element SubscriptionPartition subscriptionPartition, @DoFn.Restriction OffsetByteRange offsetByteRange) {
        return (TrackerWithProgress) this.trackerFactory.apply(this.backlogReaderFactory.create(subscriptionPartition), offsetByteRange);
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element SubscriptionPartition subscriptionPartition, @DoFn.Restriction OffsetByteRange offsetByteRange) {
        return offsetByteRange.getRange().getTo() != Long.MAX_VALUE ? offsetByteRange.getByteCount() : newTracker(subscriptionPartition, offsetByteRange).getProgress().getWorkRemaining();
    }
}
