package org.apache.beam.sdk.io.kafka;

import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.class */
public abstract class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final AtomicLong FN_ID = new AtomicLong();
    private final long fnId;
    private final Map<String, Object> offsetConsumerConfig;
    private final CheckStopReadingFn checkStopReadingFn;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn;
    private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private final BadRecordRouter badRecordRouter;
    private final TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> recordTag;
    private transient Deserializer<K> keyDeserializerInstance;
    private transient Deserializer<V> valueDeserializerInstance;
    private transient LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache;
    private transient LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg> avgRecordSizeCache;
    private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2;

    @VisibleForTesting
    final long consumerPollingTimeout;

    @VisibleForTesting
    final DeserializerProvider<K> keyDeserializerProvider;

    @VisibleForTesting
    final DeserializerProvider<V> valueDeserializerProvider;

    @VisibleForTesting
    final Map<String, Object> consumerConfig;

    @VisibleForTesting
    static final String METRIC_NAMESPACE = "KafkaIOReader";

    @VisibleForTesting
    static final String RAW_SIZE_METRIC_PREFIX = "rawSize/";

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Bounded.class */
    public static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Bounded(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
            super(readSourceDescriptors, tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$KafkaLatestOffsetEstimator.class */
    public static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final Consumer<byte[], byte[]> offsetConsumer;
        private final TopicPartition topicPartition;
        private final Supplier<Long> memoizedBacklog;

        KafkaLatestOffsetEstimator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
            this.offsetConsumer = consumer;
            this.topicPartition = topicPartition;
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                Long l;
                synchronized (consumer) {
                    l = (Long) Preconditions.checkStateNotNull((Long) consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition), "No end offset found for partition %s.", topicPartition);
                }
                return l;
            }, 1L, TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, true);
                ReadFromKafkaDoFn.LOG.info("Offset Estimator consumer was closed for {}", this.topicPartition);
            } catch (Exception e) {
                ReadFromKafkaDoFn.LOG.warn("Failed to close offset consumer for {}", this.topicPartition);
            }
        }

        public long estimate() {
            return ((Long) this.memoizedBacklog.get()).longValue();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$SharedStateHolder.class */
    private static final class SharedStateHolder {
        private static final Map<Long, LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>> OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap();
        private static final Map<Long, LoadingCache<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap();

        private SharedStateHolder() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Unbounded.class */
    public static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Unbounded(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
            super(readSourceDescriptors, tupleTag);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> ReadFromKafkaDoFn<K, V> create(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
        return readSourceDescriptors.isBounded() ? new Bounded(readSourceDescriptors, tupleTag) : new Unbounded(readSourceDescriptors, tupleTag);
    }

    private ReadFromKafkaDoFn(KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> tupleTag) {
        this.fnId = FN_ID.getAndIncrement();
        this.keyDeserializerInstance = null;
        this.valueDeserializerInstance = null;
        this.consumerConfig = readSourceDescriptors.getConsumerConfig();
        this.offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getValueDeserializerProvider());
        this.consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
        this.extractOutputTimestampFn = readSourceDescriptors.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = readSourceDescriptors.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = readSourceDescriptors.getTimestampPolicyFactory();
        this.checkStopReadingFn = readSourceDescriptors.getCheckStopReadingFn();
        this.badRecordRouter = readSourceDescriptors.getBadRecordRouter();
        this.recordTag = tupleTag;
        if (readSourceDescriptors.getConsumerPollingTimeout() > 0) {
            this.consumerPollingTimeout = readSourceDescriptors.getConsumerPollingTimeout();
        } else {
            this.consumerPollingTimeout = DEFAULT_KAFKA_POLL_TIMEOUT;
        }
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor) {
        Map<String, Object> overrideBootstrapServersConfig = overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
        LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor);
        Consumer consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig);
        try {
            consumer.assign(ImmutableList.of(topicPartition));
            Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
            long longValue = kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset().longValue() : startReadTime != null ? ConsumerSpEL.offsetForTime(consumer, topicPartition, startReadTime) : consumer.position(topicPartition);
            long j = Long.MAX_VALUE;
            Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
            if (kafkaSourceDescriptor.getStopReadOffset() != null) {
                j = kafkaSourceDescriptor.getStopReadOffset().longValue();
            } else if (stopReadTime != null) {
                j = ConsumerSpEL.offsetForTime(consumer, topicPartition, stopReadTime);
            }
            new OffsetRange(longValue, j);
            Lineage.getSources().add("kafka", ImmutableList.of((String) overrideBootstrapServersConfig.get("bootstrap.servers"), (String) MoreObjects.firstNonNull(kafkaSourceDescriptor.getTopic(), topicPartition.topic())));
            OffsetRange offsetRange = new OffsetRange(longValue, j);
            if (consumer != null) {
                consumer.close();
            }
            return offsetRange;
        } catch (Throwable th) {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return (WatermarkEstimator) ((SerializableFunction) Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn)).apply(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) throws ExecutionException {
        KafkaIOUtils.MovingAvg movingAvg = (KafkaIOUtils.MovingAvg) ((LoadingCache) Preconditions.checkStateNotNull(this.avgRecordSizeCache)).getIfPresent(kafkaSourceDescriptor);
        double workRemaining = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return movingAvg == null ? workRemaining : workRemaining * movingAvg.get();
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) throws ExecutionException {
        if (offsetRange.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(offsetRange);
        }
        return new GrowableOffsetRangeTracker(offsetRange.getFrom(), (KafkaLatestOffsetEstimator) ((LoadingCache) Preconditions.checkStateNotNull(this.offsetEstimatorCache)).get(kafkaSourceDescriptor));
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator<Instant> watermarkEstimator, DoFn.MultiOutputReceiver multiOutputReceiver) throws Exception {
        Instant instant;
        LoadingCache loadingCache = (LoadingCache) Preconditions.checkStateNotNull(this.avgRecordSizeCache);
        LoadingCache loadingCache2 = (LoadingCache) Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        Deserializer deserializer = (Deserializer) Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer deserializer2 = (Deserializer) Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
        Distribution distribution = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
        Gauge gauge = Metrics.gauge(METRIC_NAMESPACE, "rawSize/backlogBytes_" + topicPartition.toString());
        if (this.checkStopReadingFn != null && ((Boolean) this.checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())).booleanValue()) {
            restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getTo() - 1));
            return DoFn.ProcessContinuation.stop();
        }
        Map<String, Object> overrideBootstrapServersConfig = overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy<K, V> timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
        Consumer<byte[], byte[]> consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig);
        try {
            consumer.assign(ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = from;
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), from);
            ConsumerRecords.empty();
            long j2 = 0;
            Stopwatch createStarted = Stopwatch.createStarted();
            KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
            while (true) {
                try {
                    KafkaIOUtils.MovingAvg movingAvg = (KafkaIOUtils.MovingAvg) loadingCache.getUnchecked(kafkaSourceDescriptor);
                    ConsumerRecords<byte[], byte[]> poll = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), kafkaMetrics);
                    if (poll.isEmpty()) {
                        if (!topicPartitionExists(kafkaSourceDescriptor.getTopicPartition(), consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) {
                            DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                            kafkaMetrics.flushBufferedMetrics();
                            if (consumer != null) {
                                consumer.close();
                            }
                            return stop;
                        }
                        if (timestampPolicy != null) {
                            updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker);
                        }
                        DoFn.ProcessContinuation resume = DoFn.ProcessContinuation.resume();
                        kafkaMetrics.flushBufferedMetrics();
                        if (consumer != null) {
                            consumer.close();
                        }
                        return resume;
                    }
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        if (consumerRecord.offset() >= from) {
                            if (j2 > 0) {
                                LOG.warn("{} records were skipped due to seek returning an earlier position than requested position of {}", Long.valueOf(j2), Long.valueOf(j));
                                j2 = 0;
                            }
                            if (!restrictionTracker.tryClaim(Long.valueOf(consumerRecord.offset()))) {
                                DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                                kafkaMetrics.flushBufferedMetrics();
                                if (consumer != null) {
                                    consumer.close();
                                }
                                return stop2;
                            }
                            try {
                                KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), ConsumerSpEL.getRecordTimestamp(consumerRecord), ConsumerSpEL.getRecordTimestampType(consumerRecord), ConsumerSpEL.hasHeaders() ? consumerRecord.headers() : null, ConsumerSpEL.deserializeKey(deserializer, consumerRecord), ConsumerSpEL.deserializeValue(deserializer2, consumerRecord));
                                int length = (consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length);
                                movingAvg.update(length);
                                distribution.update(length);
                                j = consumerRecord.offset() + 1;
                                if (timestampPolicy != null) {
                                    instant = timestampPolicy.getTimestampForRecord(updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker), kafkaRecord);
                                } else {
                                    Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                                    instant = (Instant) this.extractOutputTimestampFn.apply(kafkaRecord);
                                }
                                multiOutputReceiver.get(this.recordTag).outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), instant);
                            } catch (SerializationException e) {
                                this.badRecordRouter.route(multiOutputReceiver, consumerRecord, (Coder) null, e, "Failure deserializing Key or Value of Kakfa record reading from Kafka");
                                if (timestampPolicy != null) {
                                    updateWatermarkManually(timestampPolicy, watermarkEstimator, restrictionTracker);
                                }
                            }
                        } else {
                            if (createStarted.elapsed().getSeconds() > 10) {
                                LOG.error("The expected offset ({}) was not reached even after skipping consumed records for 10 seconds. The offset we could reach was {}. The processing of this bundle will be attempted at a later time.", Long.valueOf(j), Long.valueOf(consumerRecord.offset()));
                                DoFn.ProcessContinuation withResumeDelay = DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(10L));
                                kafkaMetrics.flushBufferedMetrics();
                                if (consumer != null) {
                                    consumer.close();
                                }
                                return withResumeDelay;
                            }
                            j2++;
                        }
                    }
                    gauge.set((long) (BigDecimal.valueOf(((Long) Preconditions.checkStateNotNull(Long.valueOf(((KafkaLatestOffsetEstimator) loadingCache2.get(kafkaSourceDescriptor)).estimate()))).longValue()).subtract(BigDecimal.valueOf(j), MathContext.DECIMAL128).doubleValue() * movingAvg.get()));
                    kafkaMetrics.updateBacklogBytes(kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition().intValue(), (long) (BigDecimal.valueOf(((Long) Preconditions.checkStateNotNull(Long.valueOf(((KafkaLatestOffsetEstimator) loadingCache2.get(kafkaSourceDescriptor)).estimate()))).longValue()).subtract(BigDecimal.valueOf(j), MathContext.DECIMAL128).doubleValue() * movingAvg.get()));
                } catch (Throwable th) {
                    kafkaMetrics.flushBufferedMetrics();
                    throw th;
                }
            }
        } catch (Throwable th2) {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private boolean topicPartitionExists(TopicPartition topicPartition, List<PartitionInfo> list) {
        return list.stream().anyMatch(partitionInfo -> {
            return partitionInfo.partition() == topicPartition.partition();
        });
    }

    private ConsumerRecords<byte[], byte[]> poll(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, KafkaMetrics kafkaMetrics) {
        ConsumerRecords<byte[], byte[]> poll;
        Stopwatch createStarted = Stopwatch.createStarted();
        long j = -1;
        java.time.Duration ofSeconds = java.time.Duration.ofSeconds(this.consumerPollingTimeout);
        java.time.Duration duration = java.time.Duration.ZERO;
        do {
            poll = consumer.poll(ofSeconds.minus(duration));
            duration = createStarted.elapsed();
            kafkaMetrics.updateSuccessfulRpcMetrics(topicPartition.topic(), java.time.Duration.ofMillis(duration.toMillis()));
            if (!poll.isEmpty()) {
                return poll;
            }
            long j2 = j;
            j = j2;
            if (j2 == consumer.position(topicPartition)) {
                return poll;
            }
        } while (duration.toMillis() < ofSeconds.toMillis());
        LOG.warn("No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", Long.valueOf(this.consumerPollingTimeout));
        return poll;
    }

    private KafkaUnboundedReader.TimestampPolicyContext updateWatermarkManually(TimestampPolicy<K, V> timestampPolicy, WatermarkEstimator<Instant> watermarkEstimator, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(watermarkEstimator instanceof ManualWatermarkEstimator);
        KafkaUnboundedReader.TimestampPolicyContext timestampPolicyContext = new KafkaUnboundedReader.TimestampPolicyContext((long) ((RestrictionTracker.HasProgress) restrictionTracker).getProgress().getWorkRemaining(), Instant.now());
        ((ManualWatermarkEstimator) watermarkEstimator).setWatermark(ensureTimestampWithinBounds(timestampPolicy.getWatermark(timestampPolicyContext)));
        return timestampPolicyContext;
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.avgRecordSizeCache = (LoadingCache) SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent(Long.valueOf(this.fnId), l -> {
            return CacheBuilder.newBuilder().maximumSize(1000L).build(new CacheLoader<KafkaSourceDescriptor, KafkaIOUtils.MovingAvg>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.1
                public KafkaIOUtils.MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
                    return new KafkaIOUtils.MovingAvg();
                }
            });
        });
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
        this.offsetEstimatorCache = (LoadingCache) SharedStateHolder.OFFSET_ESTIMATOR_CACHE.computeIfAbsent(Long.valueOf(this.fnId), l2 -> {
            final ImmutableMap copyOf = ImmutableMap.copyOf(this.consumerConfig);
            final ImmutableMap copyOf2 = this.offsetConsumerConfig == null ? null : ImmutableMap.copyOf(this.offsetConsumerConfig);
            return CacheBuilder.newBuilder().weakValues().expireAfterAccess(1L, TimeUnit.MINUTES).build(new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.2
                public KafkaLatestOffsetEstimator load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
                    ReadFromKafkaDoFn.LOG.info("Creating Kafka consumer for offset estimation for {}", kafkaSourceDescriptor);
                    TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
                    return new KafkaLatestOffsetEstimator((Consumer) ReadFromKafkaDoFn.this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + topicPartition, copyOf2, ReadFromKafkaDoFn.this.overrideBootstrapServersConfig(copyOf, kafkaSourceDescriptor))), topicPartition);
                }
            });
        });
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.setup();
        }
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        LoadingCache loadingCache = (LoadingCache) Preconditions.checkStateNotNull(this.avgRecordSizeCache);
        LoadingCache loadingCache2 = (LoadingCache) Preconditions.checkStateNotNull(this.offsetEstimatorCache);
        try {
            if (this.valueDeserializerInstance != null) {
                Closeables.close(this.valueDeserializerInstance, true);
                this.valueDeserializerInstance = null;
            }
            if (this.keyDeserializerInstance != null) {
                Closeables.close(this.keyDeserializerInstance, true);
                this.keyDeserializerInstance = null;
            }
        } catch (Exception e) {
            LOG.warn("Fail to close resource during finishing bundle.", e);
        }
        if (this.checkStopReadingFn != null) {
            this.checkStopReadingFn.teardown();
        }
        loadingCache.cleanUp();
        loadingCache2.cleanUp();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Object> overrideBootstrapServersConfig(Map<String, Object> map, KafkaSourceDescriptor kafkaSourceDescriptor) {
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(map.containsKey("bootstrap.servers") || kafkaSourceDescriptor.getBootStrapServers() != null);
        HashMap hashMap = new HashMap(map);
        if (kafkaSourceDescriptor.getBootStrapServers() != null && kafkaSourceDescriptor.getBootStrapServers().size() > 0) {
            hashMap.put("bootstrap.servers", String.join(",", kafkaSourceDescriptor.getBootStrapServers()));
        }
        return hashMap;
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }
}
