package org.apache.beam.sdk.io.kafka;

import java.io.Closeable;
import java.lang.invoke.SerializedLambda;
import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.MemoizingPerInstantiationSerializableSupplier;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.class */
public abstract class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final Joiner COMMA_JOINER = Joiner.on(',');
    private final CheckStopReadingFn checkStopReadingFn;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn;
    private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private final BadRecordRouter badRecordRouter;
    private final TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> recordTag;
    private final SerializableSupplier<LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>> avgRecordSizeCacheSupplier;
    private final SerializableSupplier<LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>> latestOffsetEstimatorCacheSupplier;
    private final SerializableSupplier<LoadingCache<KafkaSourceDescriptor, Consumer<byte[], byte[]>>> pollConsumerCacheSupplier;
    private transient Deserializer<K> keyDeserializerInstance;
    private transient Deserializer<V> valueDeserializerInstance;
    private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2;

    @VisibleForTesting
    final Duration consumerPollingTimeout;

    @VisibleForTesting
    final DeserializerProvider<K> keyDeserializerProvider;

    @VisibleForTesting
    final DeserializerProvider<V> valueDeserializerProvider;

    @VisibleForTesting
    final Map<String, Object> consumerConfig;

    @VisibleForTesting
    static final String METRIC_NAMESPACE = "KafkaIOReader";

    @VisibleForTesting
    static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Bounded.class */
    public static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Bounded(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
            super(readSourceDescriptors, tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$KafkaLatestOffsetEstimator.class */
    public static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable {
        private static final AtomicReferenceFieldUpdater<KafkaLatestOffsetEstimator, Runnable> CURRENT_REFRESH_TASK = AtomicReferenceFieldUpdater.newUpdater(KafkaLatestOffsetEstimator.class, Runnable.class, "currentRefreshTask");
        private final Consumer<byte[], byte[]> offsetConsumer;
        private final TopicPartition topicPartition;
        private final Executor executor = Executors.newSingleThreadExecutor();
        private long lastRefreshEndOffset = -1;
        private long nextRefreshNanos = Long.MIN_VALUE;
        private volatile Runnable currentRefreshTask = null;

        KafkaLatestOffsetEstimator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
            this.offsetConsumer = consumer;
            this.topicPartition = topicPartition;
        }

        public long estimate() {
            if (this.currentRefreshTask == null) {
                long j = this.nextRefreshNanos;
                long nanoTime = System.nanoTime();
                if (j < nanoTime && CURRENT_REFRESH_TASK.compareAndSet(this, null, this::refresh)) {
                    try {
                        this.executor.execute(this::refresh);
                    } catch (RejectedExecutionException e) {
                        ReadFromKafkaDoFn.LOG.error("Execution of end offset refresh rejected for {}", this.topicPartition, e);
                        this.nextRefreshNanos = nanoTime + TimeUnit.SECONDS.toNanos(1L);
                        CURRENT_REFRESH_TASK.lazySet(this, null);
                    }
                }
            }
            return this.lastRefreshEndOffset;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.offsetConsumer.close();
        }

        private void refresh() {
            try {
                Long l = (Long) this.offsetConsumer.endOffsets(Collections.singleton(this.topicPartition)).get(this.topicPartition);
                if (l == null) {
                    ReadFromKafkaDoFn.LOG.warn("No end offset found for partition {}.", this.topicPartition);
                } else {
                    this.lastRefreshEndOffset = l.longValue();
                }
                this.nextRefreshNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
                CURRENT_REFRESH_TASK.lazySet(this, null);
            } catch (Throwable th) {
                CURRENT_REFRESH_TASK.lazySet(this, null);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Unbounded.class */
    public static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Unbounded(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
            super(readSourceDescriptors, tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> ReadFromKafkaDoFn<K, V> create(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
        return readSourceDescriptors.isBounded() ? new Bounded(readSourceDescriptors, tupleTag) : new Unbounded(readSourceDescriptors, tupleTag);
    }

    private ReadFromKafkaDoFn(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
        this.keyDeserializerInstance = null;
        this.valueDeserializerInstance = null;
        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
        this.consumerConfig = readSourceDescriptors.getConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getValueDeserializerProvider());
        this.extractOutputTimestampFn = readSourceDescriptors.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = readSourceDescriptors.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = readSourceDescriptors.getTimestampPolicyFactory();
        this.checkStopReadingFn = readSourceDescriptors.getCheckStopReadingFn();
        this.badRecordRouter = readSourceDescriptors.getBadRecordRouter();
        this.recordTag = tupleTag;
        this.avgRecordSizeCacheSupplier = new MemoizingPerInstantiationSerializableSupplier(() -> {
            return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().build(new CacheLoader<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.1
                public KafkaIOUtils.MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
                    return new KafkaIOUtils.MovingAvg();
                }
            });
        });
        this.latestOffsetEstimatorCacheSupplier = new MemoizingPerInstantiationSerializableSupplier(() -> {
            return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(removalNotification -> {
                KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator;
                if (removalNotification.getCause() != RemovalCause.COLLECTED || (kafkaLatestOffsetEstimator = (KafkaLatestOffsetEstimator) removalNotification.getValue()) == null) {
                    return;
                }
                kafkaLatestOffsetEstimator.close();
            }).build(new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.2
                public KafkaLatestOffsetEstimator load(KafkaSourceDescriptor kafkaSourceDescriptor) {
                    ReadFromKafkaDoFn.LOG.info("Creating Kafka consumer for offset estimation for {}", kafkaSourceDescriptor);
                    return new KafkaLatestOffsetEstimator((Consumer) consumerFactoryFn.apply(KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, kafkaSourceDescriptor)), kafkaSourceDescriptor.getTopicPartition());
                }
            });
        });
        this.pollConsumerCacheSupplier = new MemoizingPerInstantiationSerializableSupplier(() -> {
            return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(removalNotification -> {
                Consumer consumer;
                if (removalNotification.getCause() != RemovalCause.COLLECTED || (consumer = (Consumer) removalNotification.getValue()) == null) {
                    return;
                }
                consumer.close();
            }).build(new CacheLoader<KafkaSourceDescriptor, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.3
                public Consumer<byte[], byte[]> load(KafkaSourceDescriptor kafkaSourceDescriptor) {
                    ReadFromKafkaDoFn.LOG.info("Creating Kafka consumer for restriction processing for {}", kafkaSourceDescriptor);
                    Consumer<byte[], byte[]> consumer = (Consumer) consumerFactoryFn.apply(KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, kafkaSourceDescriptor));
                    consumer.assign(Collections.singleton(kafkaSourceDescriptor.getTopicPartition()));
                    return consumer;
                }
            });
        });
        this.consumerPollingTimeout = Duration.ofSeconds(readSourceDescriptors.getConsumerPollingTimeout() > 0 ? readSourceDescriptors.getConsumerPollingTimeout() : DEFAULT_KAFKA_POLL_TIMEOUT);
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor) {
        Consumer consumer = (Consumer) ((LoadingCache) this.pollConsumerCacheSupplier.get()).getUnchecked(kafkaSourceDescriptor);
        Long startReadOffset = kafkaSourceDescriptor.getStartReadOffset();
        Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
        long longValue = startReadOffset != null ? startReadOffset.longValue() : startReadTime != null ? ((OffsetAndTimestamp) Preconditions.checkStateNotNull((OffsetAndTimestamp) consumer.offsetsForTimes(Collections.singletonMap(kafkaSourceDescriptor.getTopicPartition(), Long.valueOf(startReadTime.getMillis()))).get(kafkaSourceDescriptor.getTopicPartition()))).offset() : consumer.position(kafkaSourceDescriptor.getTopicPartition());
        Long stopReadOffset = kafkaSourceDescriptor.getStopReadOffset();
        Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
        OffsetRange offsetRange = new OffsetRange(longValue, stopReadOffset != null ? stopReadOffset.longValue() : stopReadTime != null ? ((OffsetAndTimestamp) Preconditions.checkStateNotNull((OffsetAndTimestamp) consumer.offsetsForTimes(Collections.singletonMap(kafkaSourceDescriptor.getTopicPartition(), Long.valueOf(stopReadTime.getMillis()))).get(kafkaSourceDescriptor.getTopicPartition()))).offset() : Long.MAX_VALUE);
        Lineage sources = Lineage.getSources();
        Optional map = Optional.ofNullable(KafkaIOUtils.overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor).get("bootstrap.servers")).map(obj -> {
            return (List) ConfigDef.parseType("bootstrap.servers", obj, ConfigDef.Type.LIST);
        }).map((v0) -> {
            return ImmutableSet.copyOf(v0);
        });
        Joiner joiner = COMMA_JOINER;
        Objects.requireNonNull(joiner);
        sources.add("kafka", ImmutableList.of((String) map.map((v1) -> {
            return r3.join(v1);
        }).get(), (String) MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getTopicPartition().topic())));
        return offsetRange;
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return (WatermarkEstimator) ((SerializableFunction) Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn)).apply(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        KafkaIOUtils.MovingAvg movingAvg = (KafkaIOUtils.MovingAvg) ((LoadingCache) this.avgRecordSizeCacheSupplier.get()).getIfPresent(kafkaSourceDescriptor);
        double workRemaining = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return movingAvg == null ? workRemaining : workRemaining * movingAvg.get();
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        return offsetRange.getTo() < Long.MAX_VALUE ? new OffsetRangeTracker(offsetRange) : new GrowableOffsetRangeTracker(offsetRange.getFrom(), (GrowableOffsetRangeTracker.RangeEndEstimator) ((LoadingCache) this.latestOffsetEstimatorCacheSupplier.get()).getUnchecked(kafkaSourceDescriptor));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v46, types: [org.apache.beam.sdk.io.kafka.KafkaMetrics] */
    /* JADX WARN: Type inference failed for: r3v11 */
    /* JADX WARN: Type inference failed for: r3v12 */
    /* JADX WARN: Type inference failed for: r3v14 */
    /* JADX WARN: Type inference failed for: r3v16, types: [int] */
    /* JADX WARN: Type inference failed for: r3v23 */
    /* JADX WARN: Type inference failed for: r3v24 */
    /* JADX WARN: Type inference failed for: r3v4 */
    /* JADX WARN: Type inference failed for: r3v5, types: [long] */
    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator<Instant> watermarkEstimator, DoFn.MultiOutputReceiver multiOutputReceiver) throws Exception {
        long j;
        Instant instant;
        RestrictionTracker<OffsetRange, Long> restrictionTracker2;
        KafkaIOUtils.MovingAvg movingAvg = (KafkaIOUtils.MovingAvg) ((LoadingCache) this.avgRecordSizeCacheSupplier.get()).get(kafkaSourceDescriptor);
        KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator = (KafkaLatestOffsetEstimator) ((LoadingCache) this.latestOffsetEstimatorCacheSupplier.get()).get(kafkaSourceDescriptor);
        Consumer consumer = (Consumer) ((LoadingCache) this.pollConsumerCacheSupplier.get()).get(kafkaSourceDescriptor);
        Deserializer deserializer = (Deserializer) Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer deserializer2 = (Deserializer) Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
        Distribution distribution = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
        Gauge gauge = Metrics.gauge(METRIC_NAMESPACE, "rawSize/backlogBytes_" + topicPartition.toString());
        if (this.checkStopReadingFn != null && ((Boolean) this.checkStopReadingFn.apply(topicPartition)).booleanValue()) {
            restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getTo() - 1));
            return DoFn.ProcessContinuation.stop();
        }
        TimestampPolicy<K, V> timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
        long j2 = from;
        consumer.resume(Collections.singleton(topicPartition));
        consumer.seek(topicPartition, from);
        long j3 = 0;
        Stopwatch createStarted = Stopwatch.createStarted();
        ?? kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
        while (true) {
            try {
                Stopwatch createStarted2 = Stopwatch.createStarted();
                ConsumerRecords poll = consumer.poll(this.consumerPollingTimeout);
                kafkaMetrics.updateSuccessfulRpcMetrics(topicPartition.topic(), createStarted2.elapsed());
                if (poll == ConsumerRecords.empty()) {
                    consumer.pause(Collections.singleton(topicPartition));
                    if (!topicPartitionExists(kafkaSourceDescriptor.getTopicPartition(), consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
                        DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                        kafkaMetrics.flushBufferedMetrics();
                        return stop;
                    }
                    if (timestampPolicy != null) {
                        updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker);
                    }
                    DoFn.ProcessContinuation resume = DoFn.ProcessContinuation.resume();
                    kafkaMetrics.flushBufferedMetrics();
                    return resume;
                }
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (consumerRecord.offset() >= from) {
                        if (j3 > 0) {
                            LOG.warn("{} records were skipped due to seek returning an earlier position than requested position of {}", Long.valueOf(j3), Long.valueOf(j2));
                            j3 = 0;
                        }
                        if (!restrictionTracker.tryClaim(Long.valueOf(consumerRecord.offset()))) {
                            consumer.seek(topicPartition, consumerRecord.offset());
                            consumer.pause(Collections.singleton(topicPartition));
                            DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                            kafkaMetrics.flushBufferedMetrics();
                            return stop2;
                        }
                        j2 = consumerRecord.offset() + 1;
                        try {
                            String str = consumerRecord.topic();
                            ?? partition = consumerRecord.partition();
                            KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(str, partition, consumerRecord.offset(), ConsumerSpEL.getRecordTimestamp(consumerRecord), ConsumerSpEL.getRecordTimestampType(consumerRecord), ConsumerSpEL.hasHeaders() ? consumerRecord.headers() : null, ConsumerSpEL.deserializeKey(deserializer, consumerRecord), ConsumerSpEL.deserializeValue(deserializer2, consumerRecord));
                            int length = (consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length);
                            movingAvg.update(length);
                            distribution.update(length);
                            if (timestampPolicy != null) {
                                RestrictionTracker<OffsetRange, Long> restrictionTracker3 = restrictionTracker;
                                instant = timestampPolicy.getTimestampForRecord(updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker3), kafkaRecord);
                                restrictionTracker2 = restrictionTracker3;
                            } else {
                                Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                                instant = (Instant) this.extractOutputTimestampFn.apply(kafkaRecord);
                                restrictionTracker2 = partition;
                            }
                            multiOutputReceiver.get(this.recordTag).outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), instant);
                            j = restrictionTracker2;
                        } catch (SerializationException e) {
                            j = 0;
                            this.badRecordRouter.route(multiOutputReceiver, consumerRecord, (Coder) null, e, "Failure deserializing Key or Value of Kakfa record reading from Kafka");
                            if (timestampPolicy != null) {
                                RestrictionTracker<OffsetRange, Long> restrictionTracker4 = restrictionTracker;
                                updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker4);
                                j = restrictionTracker4;
                            }
                        }
                    } else {
                        if (createStarted.elapsed().getSeconds() > 10) {
                            LOG.error("The expected offset ({}) was not reached even after skipping consumed records for 10 seconds. The offset we could reach was {}. The processing of this bundle will be attempted at a later time.", Long.valueOf(j2), Long.valueOf(consumerRecord.offset()));
                            consumer.pause(Collections.singleton(topicPartition));
                            DoFn.ProcessContinuation withResumeDelay = DoFn.ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
                            kafkaMetrics.flushBufferedMetrics();
                            return withResumeDelay;
                        }
                        j3++;
                    }
                }
                long j4 = j2;
                j2 = j;
                if (j4 < consumer.position(topicPartition)) {
                    if (!restrictionTracker.tryClaim(Long.valueOf(j2 - 1))) {
                        consumer.seek(topicPartition, j2 - 1);
                        consumer.pause(Collections.singleton(topicPartition));
                        DoFn.ProcessContinuation stop3 = DoFn.ProcessContinuation.stop();
                        kafkaMetrics.flushBufferedMetrics();
                        return stop3;
                    }
                    if (timestampPolicy != null) {
                        updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker);
                    }
                }
                long doubleValue = (long) (BigDecimal.valueOf(kafkaLatestOffsetEstimator.estimate()).subtract(BigDecimal.valueOf(j2), MathContext.DECIMAL128).doubleValue() * movingAvg.get());
                gauge.set(doubleValue);
                j = doubleValue;
                kafkaMetrics.updateBacklogBytes(kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition().intValue(), j);
            } catch (Throwable th) {
                kafkaMetrics.flushBufferedMetrics();
                throw th;
            }
        }
    }

    private boolean topicPartitionExists(TopicPartition topicPartition, List<PartitionInfo> list) {
        return list.stream().anyMatch(partitionInfo -> {
            return partitionInfo.partition() == topicPartition.partition();
        });
    }

    private KafkaUnboundedReader.TimestampPolicyContext updateWatermarkManually(TimestampPolicy<K, V> timestampPolicy, WatermarkEstimator<Instant> watermarkEstimator, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(watermarkEstimator instanceof ManualWatermarkEstimator);
        KafkaUnboundedReader.TimestampPolicyContext timestampPolicyContext = new KafkaUnboundedReader.TimestampPolicyContext((long) ((RestrictionTracker.HasProgress) restrictionTracker).getProgress().getWorkRemaining(), Instant.now());
        ((ManualWatermarkEstimator) watermarkEstimator).setWatermark(ensureTimestampWithinBounds(timestampPolicy.getWatermark(timestampPolicyContext)));
        return timestampPolicyContext;
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.setup();
        }
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        try {
            if (this.valueDeserializerInstance != null) {
                Closeables.close(this.valueDeserializerInstance, true);
                this.valueDeserializerInstance = null;
            }
            if (this.keyDeserializerInstance != null) {
                Closeables.close(this.keyDeserializerInstance, true);
                this.keyDeserializerInstance = null;
            }
        } catch (Exception e) {
            LOG.warn("Fail to close resource during finishing bundle.", e);
        }
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.teardown();
        }
        ((LoadingCache) this.avgRecordSizeCacheSupplier.get()).cleanUp();
        ((LoadingCache) this.latestOffsetEstimatorCacheSupplier.get()).cleanUp();
        ((LoadingCache) this.pollConsumerCacheSupplier.get()).cleanUp();
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1549540852:
                if (implMethodName.equals("lambda$new$3d9faa22$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1549540851:
                if (implMethodName.equals("lambda$new$3d9faa22$2")) {
                    z = true;
                    break;
                }
                break;
            case 490548043:
                if (implMethodName.equals("lambda$new$31b90dd8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/util/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/cache/LoadingCache;")) {
                    ReadFromKafkaDoFn readFromKafkaDoFn = (ReadFromKafkaDoFn) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().build(new CacheLoader<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.1
                            public KafkaIOUtils.MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
                                return new KafkaIOUtils.MovingAvg();
                            }
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/util/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;)Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/cache/LoadingCache;")) {
                    ReadFromKafkaDoFn readFromKafkaDoFn2 = (ReadFromKafkaDoFn) serializedLambda.getCapturedArg(0);
                    SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(removalNotification -> {
                            Consumer consumer;
                            if (removalNotification.getCause() != RemovalCause.COLLECTED || (consumer = (Consumer) removalNotification.getValue()) == null) {
                                return;
                            }
                            consumer.close();
                        }).build(new CacheLoader<KafkaSourceDescriptor, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.3
                            public Consumer<byte[], byte[]> load(KafkaSourceDescriptor kafkaSourceDescriptor) {
                                ReadFromKafkaDoFn.LOG.info("Creating Kafka consumer for restriction processing for {}", kafkaSourceDescriptor);
                                Consumer<byte[], byte[]> consumer = (Consumer) serializableFunction.apply(KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, kafkaSourceDescriptor));
                                consumer.assign(Collections.singleton(kafkaSourceDescriptor.getTopicPartition()));
                                return consumer;
                            }
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/util/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;)Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/cache/LoadingCache;")) {
                    ReadFromKafkaDoFn readFromKafkaDoFn3 = (ReadFromKafkaDoFn) serializedLambda.getCapturedArg(0);
                    SerializableFunction serializableFunction2 = (SerializableFunction) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return CacheBuilder.newBuilder().concurrencyLevel(Runtime.getRuntime().availableProcessors()).weakValues().removalListener(removalNotification -> {
                            KafkaLatestOffsetEstimator kafkaLatestOffsetEstimator;
                            if (removalNotification.getCause() != RemovalCause.COLLECTED || (kafkaLatestOffsetEstimator = (KafkaLatestOffsetEstimator) removalNotification.getValue()) == null) {
                                return;
                            }
                            kafkaLatestOffsetEstimator.close();
                        }).build(new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.2
                            public KafkaLatestOffsetEstimator load(KafkaSourceDescriptor kafkaSourceDescriptor) {
                                ReadFromKafkaDoFn.LOG.info("Creating Kafka consumer for offset estimation for {}", kafkaSourceDescriptor);
                                return new KafkaLatestOffsetEstimator((Consumer) serializableFunction2.apply(KafkaIOUtils.overrideBootstrapServersConfig(ReadFromKafkaDoFn.this.consumerConfig, kafkaSourceDescriptor)), kafkaSourceDescriptor.getTopicPartition());
                            }
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
