package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_ReadSourceDescriptors;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_WriteRecords;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO.class */
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @DefaultSchema(JavaFieldSchema.class)
    @SuppressFBWarnings({"URF_UNREAD_FIELD"})
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$ByteArrayKafkaRecord.class */
    public static class ByteArrayKafkaRecord {
        String topic;
        int partition;
        long offset;
        long timestamp;
        byte[] key;
        byte[] value;
        List<KafkaHeader> headers;
        int timestampTypeId;
        String timestampTypeName;

        @SchemaCreate
        public ByteArrayKafkaRecord(String str, int i, long j, long j2, byte[] bArr, byte[] bArr2, List<KafkaHeader> list, int i2, String str2) {
            this.topic = str;
            this.partition = i;
            this.offset = j;
            this.timestamp = j2;
            this.key = bArr;
            this.value = bArr2;
            this.headers = list;
            this.timestampTypeId = i2;
            this.timestampTypeName = str2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DefaultSchema(JavaFieldSchema.class)
    @SuppressFBWarnings({"URF_UNREAD_FIELD"})
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaHeader.class */
    public static class KafkaHeader {
        String key;
        byte[] value;

        @SchemaCreate
        public KafkaHeader(String str, byte[] bArr) {
            this.key = str;
            this.value = bArr;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaValueWrite.class */
    private static class KafkaValueWrite<V> extends PTransform<PCollection<V>, PDone> {
        private final Write<?, V> kvWriteTransform;

        private KafkaValueWrite(Write<?, V> write) {
            this.kvWriteTransform = write;
        }

        public PDone expand(PCollection<V> pCollection) {
            return pCollection.apply("Kafka values with default key", MapElements.via(new SimpleFunction<V, KV<Object, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.KafkaValueWrite.1
                public KV<Object, V> apply(V v) {
                    return KV.of((Object) null, v);
                }

                /* renamed from: apply, reason: collision with other method in class */
                public /* bridge */ /* synthetic */ Object m7apply(Object obj) {
                    return apply((AnonymousClass1) obj);
                }
            })).setCoder(KvCoder.of(new NullOnlyCoder(), pCollection.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$NullOnlyCoder.class */
    public static class NullOnlyCoder<T> extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T t, OutputStream outputStream) {
            Preconditions.checkArgument(t == null, "Can only encode nulls");
        }

        public T decode(InputStream inputStream) {
            return null;
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read.class */
    public static abstract class Read<K, V> extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        public static final Class<AutoValue_KafkaIO_Read> AUTOVALUE_CLASS = AutoValue_KafkaIO_Read.class;

        @Internal
        public static final PTransformOverride KAFKA_READ_OVERRIDE = PTransformOverride.of(PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class), new KafkaReadOverrideFactory());

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$AbstractReadFromKafka.class */
        public static abstract class AbstractReadFromKafka<K, V> extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
            Read<K, V> kafkaRead;
            Coder<K> keyCoder;
            Coder<V> valueCoder;

            AbstractReadFromKafka(Read<K, V> read, Coder<K> coder, Coder<V> coder2, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation kafkaIOReadImplementation) {
                KafkaIOReadImplementationCompatibility.getCompatibility(read).checkSupport(kafkaIOReadImplementation);
                this.kafkaRead = read;
                this.keyCoder = coder;
                this.valueCoder = coder2;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$Builder.class */
        public static abstract class Builder<K, V> {
            abstract Builder<K, V> setConsumerConfig(Map<String, Object> map);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<K, V> setTopics(List<String> list);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<K, V> setTopicPartitions(List<TopicPartition> list);

            abstract Builder<K, V> setTopicPattern(Pattern pattern);

            abstract Builder<K, V> setKeyCoder(Coder<K> coder);

            abstract Builder<K, V> setValueCoder(Coder<V> coder);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction);

            abstract Builder<K, V> setMaxNumRecords(long j);

            abstract Builder<K, V> setMaxReadTime(Duration duration);

            abstract Builder<K, V> setStartReadTime(Instant instant);

            abstract Builder<K, V> setStopReadTime(Instant instant);

            abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean z);

            abstract Builder<K, V> setDynamicRead(boolean z);

            abstract Builder<K, V> setWatchTopicPartitionDuration(Duration duration);

            abstract Builder<K, V> setRedistributed(boolean z);

            abstract Builder<K, V> setAllowDuplicates(boolean z);

            abstract Builder<K, V> setRedistributeNumKeys(int i);

            abstract Builder<K, V> setOffsetDeduplication(Boolean bool);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory);

            abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> map);

            abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider<K> deserializerProvider);

            abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider<V> deserializerProvider);

            abstract Builder<K, V> setCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn);

            abstract Builder<K, V> setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler);

            Builder<K, V> setCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> serializableFunction) {
                return setCheckStopReadingFn((CheckStopReadingFn) CheckStopReadingFnWrapper.of(serializableFunction));
            }

            abstract Builder<K, V> setConsumerPollingTimeout(long j);

            abstract Builder<K, V> setLogTopicVerification(Boolean bool);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Read<K, V> build();

            static <K, V> void setupExternalBuilder(Builder<K, V> builder, External.Configuration configuration) {
                ImmutableList.Builder builder2 = ImmutableList.builder();
                Iterator it = configuration.topics.iterator();
                while (it.hasNext()) {
                    builder2.add((String) it.next());
                }
                builder.setTopics(builder2.build());
                Class resolveClass = KafkaIO.resolveClass(configuration.keyDeserializer);
                builder.setKeyDeserializerProvider(LocalDeserializerProvider.of(resolveClass));
                builder.setKeyCoder(resolveCoder(resolveClass));
                Class resolveClass2 = KafkaIO.resolveClass(configuration.valueDeserializer);
                builder.setValueDeserializerProvider(LocalDeserializerProvider.of(resolveClass2));
                builder.setValueCoder(resolveCoder(resolveClass2));
                HashMap hashMap = new HashMap(configuration.consumerConfig);
                hashMap.put("key.deserializer", resolveClass.getName());
                hashMap.put("value.deserializer", resolveClass2.getName());
                builder.setConsumerConfig(hashMap);
                builder.setTopicPartitions(Collections.emptyList());
                builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
                if (configuration.maxReadTime != null) {
                    builder.setMaxReadTime(Duration.standardSeconds(configuration.maxReadTime.longValue()));
                }
                builder.setMaxNumRecords(configuration.maxNumRecords == null ? Long.MAX_VALUE : configuration.maxNumRecords.longValue());
                builder.setCommitOffsetsInFinalizeEnabled(configuration.commitOffsetInFinalize.booleanValue());
                String str = configuration.timestampPolicy;
                if (str.equals("ProcessingTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
                } else if (str.equals("CreateTime")) {
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
                } else {
                    if (!str.equals("LogAppendTime")) {
                        throw new IllegalArgumentException("timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
                    }
                    builder.setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
                }
                if (configuration.startReadTime != null) {
                    builder.setStartReadTime(Instant.ofEpochMilli(configuration.startReadTime.longValue()));
                }
                if (configuration.stopReadTime != null) {
                    builder.setStopReadTime(Instant.ofEpochMilli(configuration.stopReadTime.longValue()));
                }
                builder.setDynamicRead(false);
                if (configuration.consumerPollingTimeout == null) {
                    builder.setConsumerPollingTimeout(2L);
                } else {
                    if (configuration.consumerPollingTimeout.longValue() <= 0) {
                        throw new IllegalArgumentException("consumerPollingTimeout should be > 0.");
                    }
                    builder.setConsumerPollingTimeout(configuration.consumerPollingTimeout.longValue());
                }
                if (configuration.redistribute == null) {
                    builder.setRedistributed(false);
                    builder.setRedistributeNumKeys(0);
                    builder.setAllowDuplicates(false);
                    builder.setOffsetDeduplication(false);
                    return;
                }
                builder.setRedistributed(configuration.redistribute.booleanValue());
                if (configuration.redistributeNumKeys != null) {
                    builder.setRedistributeNumKeys(configuration.redistributeNumKeys.intValue());
                }
                if (configuration.allowDuplicates != null) {
                    builder.setAllowDuplicates(configuration.allowDuplicates.booleanValue());
                }
                if (configuration.redistribute.booleanValue()) {
                    if ((configuration.allowDuplicates == null || !configuration.allowDuplicates.booleanValue()) && configuration.offsetDeduplication != null) {
                        builder.setOffsetDeduplication(configuration.offsetDeduplication);
                    }
                }
            }

            /* JADX INFO: Access modifiers changed from: private */
            public static <T> Coder<T> resolveCoder(Class<Deserializer<T>> cls) {
                for (Method method : cls.getDeclaredMethods()) {
                    if (method.getName().equals("deserialize")) {
                        Class<?> returnType = method.getReturnType();
                        if (!returnType.equals(Object.class)) {
                            if (returnType.equals(byte[].class)) {
                                return NullableCoder.of(ByteArrayCoder.of());
                            }
                            if (returnType.equals(Integer.class)) {
                                return NullableCoder.of(VarIntCoder.of());
                            }
                            if (returnType.equals(Long.class)) {
                                return NullableCoder.of(VarLongCoder.of());
                            }
                            throw new RuntimeException("Couldn't infer Coder from " + cls);
                        }
                    }
                }
                throw new RuntimeException("Couldn't resolve coder for Deserializer: " + cls);
            }
        }

        @AutoService({ExternalTransformRegistrar.class})
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$External.class */
        public static class External implements ExternalTransformRegistrar {
            public static final String URN_WITH_METADATA = "beam:transform:org.apache.beam:kafka_read_with_metadata:v1";
            public static final String URN_WITHOUT_METADATA = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1";

            /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$External$Configuration.class */
            public static class Configuration {
                private Map<String, String> consumerConfig;
                private List<String> topics;
                private String keyDeserializer;
                private String valueDeserializer;
                private Long startReadTime;
                private Long stopReadTime;
                private Long maxNumRecords;
                private Long maxReadTime;
                private Boolean commitOffsetInFinalize;
                private Long consumerPollingTimeout;
                private String timestampPolicy;
                private Integer redistributeNumKeys;
                private Boolean redistribute;
                private Boolean allowDuplicates;
                private Boolean offsetDeduplication;

                public void setConsumerConfig(Map<String, String> map) {
                    this.consumerConfig = map;
                }

                public void setTopics(List<String> list) {
                    this.topics = list;
                }

                public void setKeyDeserializer(String str) {
                    this.keyDeserializer = str;
                }

                public void setValueDeserializer(String str) {
                    this.valueDeserializer = str;
                }

                public void setStartReadTime(Long l) {
                    this.startReadTime = l;
                }

                public void setStopReadTime(Long l) {
                    this.stopReadTime = l;
                }

                public void setMaxNumRecords(Long l) {
                    this.maxNumRecords = l;
                }

                public void setMaxReadTime(Long l) {
                    this.maxReadTime = l;
                }

                public void setCommitOffsetInFinalize(Boolean bool) {
                    this.commitOffsetInFinalize = bool;
                }

                public void setTimestampPolicy(String str) {
                    this.timestampPolicy = str;
                }

                public void setConsumerPollingTimeout(Long l) {
                    this.consumerPollingTimeout = l;
                }

                public void setRedistributeNumKeys(Integer num) {
                    this.redistributeNumKeys = num;
                }

                public void setRedistribute(Boolean bool) {
                    this.redistribute = bool;
                }

                public void setAllowDuplicates(Boolean bool) {
                    this.allowDuplicates = bool;
                }

                public void setOffsetDeduplication(Boolean bool) {
                    this.offsetDeduplication = bool;
                }
            }

            public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
                return ImmutableMap.of(URN_WITH_METADATA, RowsWithMetadata.Builder.class, URN_WITHOUT_METADATA, TypedWithoutMetadata.Builder.class);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$FakeFlinkPipelineOptions.class */
        public interface FakeFlinkPipelineOptions extends PipelineOptions {
            @Default.Long(-1)
            Long getCheckpointingInterval();

            void setCheckpointingInterval(Long l);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$GenerateKafkaSourceDescriptor.class */
        public static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescriptor> {
            private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
            private final List<TopicPartition> topicPartitions;
            private final Instant startReadTime;
            private final Instant stopReadTime;

            @VisibleForTesting
            final Map<String, Object> consumerConfig;

            @VisibleForTesting
            final List<String> topics;
            private final Pattern topicPattern;
            private final Boolean logTopicVerification;

            GenerateKafkaSourceDescriptor(Read<?, ?> read) {
                this.consumerConfig = read.getConsumerConfig();
                this.consumerFactoryFn = read.getConsumerFactoryFn();
                this.topics = read.getTopics();
                this.topicPartitions = read.getTopicPartitions();
                this.topicPattern = read.getTopicPattern();
                this.startReadTime = read.getStartReadTime();
                this.stopReadTime = read.getStopReadTime();
                this.logTopicVerification = read.getLogTopicVerification();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.OutputReceiver<KafkaSourceDescriptor> outputReceiver) {
                ArrayList arrayList = new ArrayList((Collection) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.topicPartitions));
                if (arrayList.isEmpty()) {
                    Consumer consumer = (Consumer) this.consumerFactoryFn.apply(this.consumerConfig);
                    try {
                        List<String> list = (List) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.topics);
                        if (list.isEmpty()) {
                            Pattern pattern = (Pattern) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.topicPattern);
                            for (Map.Entry<K, V> entry : consumer.listTopics().entrySet()) {
                                if (pattern.matcher((CharSequence) entry.getKey()).matches()) {
                                    for (PartitionInfo partitionInfo : (List) entry.getValue()) {
                                        arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                                    }
                                }
                            }
                        } else {
                            for (String str : list) {
                                List<PartitionInfo> partitionsFor = consumer.partitionsFor(str);
                                if (this.logTopicVerification == null || !this.logTopicVerification.booleanValue()) {
                                    Preconditions.checkState((partitionsFor == null || partitionsFor.isEmpty()) ? false : true, "Could not find any partitions info for topic " + str + ". Please check Kafka configuration and make sure that provided topics exist.");
                                } else {
                                    KafkaIO.LOG.warn("Could not find any partitions info for topic {}. Please check Kafka configuration and make sure that the provided topics exist.", str);
                                }
                                for (PartitionInfo partitionInfo2 : partitionsFor) {
                                    arrayList.add(new TopicPartition(partitionInfo2.topic(), partitionInfo2.partition()));
                                }
                            }
                        }
                        if (consumer != null) {
                            consumer.close();
                        }
                    } catch (Throwable th) {
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    outputReceiver.output(KafkaSourceDescriptor.of((TopicPartition) it.next(), null, this.startReadTime, null, this.stopReadTime, null));
                }
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$KafkaReadOverrideFactory.class */
        private static class KafkaReadOverrideFactory<K, V> implements PTransformOverrideFactory<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> {
            private KafkaReadOverrideFactory() {
            }

            public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> appliedPTransform) {
                try {
                    return PTransformOverrideFactory.PTransformReplacement.of(appliedPTransform.getPipeline().begin(), new ReadFromKafkaViaUnbounded(((ReadFromKafkaViaSDF) appliedPTransform.getTransform()).kafkaRead, ((ReadFromKafkaViaSDF) appliedPTransform.getTransform()).keyCoder, ((ReadFromKafkaViaSDF) appliedPTransform.getTransform()).valueCoder));
                } catch (KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityException e) {
                    throw new IllegalStateException("The current runner does not support SDF-based Kafka read properly and the replacement runner lacks the support for the following properties: " + e.getConflictingProperties() + ". For example if you are using Dataflow then consider using Dataflow Runner v2.");
                }
            }

            public Map<PCollection<?>, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PCollection<?>> map, PCollection<KafkaRecord<K, V>> pCollection) {
                return ReplacementOutputs.singleton(map, pCollection);
            }

            public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
                return mapOutputs((Map<TupleTag<?>, PCollection<?>>) map, (PCollection) pOutput);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$ReadFromKafkaViaSDF.class */
        public static class ReadFromKafkaViaSDF<K, V> extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaSDF(Read<K, V> read, Coder<K> coder, Coder<V> coder2) {
                super(read, coder, coder2, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.SDF);
            }

            public PCollection<KafkaRecord<K, V>> expand(PBegin pBegin) {
                PCollection apply;
                ReadSourceDescriptors<K, V> withConsumerPollingTimeout = ReadSourceDescriptors.read().withConsumerConfigOverrides(this.kafkaRead.getConsumerConfig()).withOffsetConsumerConfigOverrides(this.kafkaRead.getOffsetConsumerConfig()).withConsumerFactoryFn(this.kafkaRead.getConsumerFactoryFn()).withKeyDeserializerProviderAndCoder(this.kafkaRead.getKeyDeserializerProvider(), this.keyCoder).withValueDeserializerProviderAndCoder(this.kafkaRead.getValueDeserializerProvider(), this.valueCoder).withManualWatermarkEstimator().withTimestampPolicyFactory(this.kafkaRead.getTimestampPolicyFactory()).withCheckStopReadingFn(this.kafkaRead.getCheckStopReadingFn()).withConsumerPollingTimeout(this.kafkaRead.getConsumerPollingTimeout());
                if (this.kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.commitOffsets();
                }
                if (this.kafkaRead.getStopReadTime() != null) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.withBounded();
                }
                if (this.kafkaRead.getBadRecordErrorHandler() != null) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.withBadRecordErrorHandler(this.kafkaRead.getBadRecordErrorHandler());
                }
                if (this.kafkaRead.isRedistributed()) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.withRedistribute();
                }
                if (this.kafkaRead.isAllowDuplicates()) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.withAllowDuplicates();
                }
                if (this.kafkaRead.getRedistributeNumKeys() > 0) {
                    withConsumerPollingTimeout = withConsumerPollingTimeout.withRedistributeNumKeys(this.kafkaRead.getRedistributeNumKeys());
                }
                if (this.kafkaRead.isDynamicRead()) {
                    HashSet hashSet = new HashSet();
                    if (this.kafkaRead.getTopics() != null && this.kafkaRead.getTopics().size() > 0) {
                        hashSet.addAll(this.kafkaRead.getTopics());
                    }
                    if (this.kafkaRead.getTopicPartitions() != null && this.kafkaRead.getTopicPartitions().size() > 0) {
                        Iterator<TopicPartition> it = this.kafkaRead.getTopicPartitions().iterator();
                        while (it.hasNext()) {
                            hashSet.add(it.next().topic());
                        }
                    }
                    apply = (PCollection) pBegin.apply(new WatchForKafkaTopicPartitions(this.kafkaRead.getWatchTopicPartitionDuration(), this.kafkaRead.getConsumerFactoryFn(), this.kafkaRead.getConsumerConfig(), this.kafkaRead.getCheckStopReadingFn(), hashSet, this.kafkaRead.getTopicPattern(), this.kafkaRead.getStartReadTime(), this.kafkaRead.getStopReadTime()));
                } else {
                    apply = pBegin.getPipeline().apply(Impulse.create()).apply(ParDo.of(new GenerateKafkaSourceDescriptor(this.kafkaRead)));
                }
                if (!this.kafkaRead.isRedistributed()) {
                    return apply.apply(withConsumerPollingTimeout).setCoder(KafkaRecordCoder.of(this.keyCoder, this.valueCoder));
                }
                PCollection coder = apply.apply(withConsumerPollingTimeout).setCoder(KafkaRecordCoder.of(this.keyCoder, this.valueCoder));
                return this.kafkaRead.getRedistributeNumKeys() == 0 ? coder.apply("Insert Redistribute", Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates())) : coder.apply("Insert Redistribute with Shards", Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()).withNumBuckets(Integer.valueOf(this.kafkaRead.getRedistributeNumKeys())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$ReadFromKafkaViaUnbounded.class */
        public static class ReadFromKafkaViaUnbounded<K, V> extends AbstractReadFromKafka<K, V> {
            ReadFromKafkaViaUnbounded(Read<K, V> read, Coder<K> coder, Coder<V> coder2) {
                super(read, coder, coder2, KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY);
            }

            public PCollection<KafkaRecord<K, V>> expand(PBegin pBegin) {
                if (this.kafkaRead.getBadRecordErrorHandler() != null) {
                    KafkaIO.LOG.warn("The Legacy implementation of Kafka Read does not support writing malformed messages to an error handler. Use the SDF implementation instead.");
                }
                PTransform from = org.apache.beam.sdk.io.Read.from(this.kafkaRead.toBuilder().setKeyCoder(this.keyCoder).setValueCoder(this.valueCoder).build().makeSource());
                PTransform pTransform = from;
                if (this.kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || this.kafkaRead.getMaxReadTime() != null) {
                    pTransform = from.withMaxReadTime(this.kafkaRead.getMaxReadTime()).withMaxNumRecords(this.kafkaRead.getMaxNumRecords());
                }
                if (!this.kafkaRead.isRedistributed()) {
                    return pBegin.getPipeline().apply(pTransform);
                }
                if (this.kafkaRead.isCommitOffsetsInFinalizeEnabled() && this.kafkaRead.isAllowDuplicates()) {
                    KafkaIO.LOG.warn("Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled");
                }
                PCollection apply = pBegin.getPipeline().apply(pTransform);
                return this.kafkaRead.getRedistributeNumKeys() == 0 ? apply.apply("Insert Redistribute", Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates())) : apply.apply("Insert Redistribute with Shards", Redistribute.arbitrarily().withAllowDuplicates(this.kafkaRead.isAllowDuplicates()).withNumBuckets(Integer.valueOf(this.kafkaRead.getRedistributeNumKeys())));
            }
        }

        @Pure
        public abstract Map<String, Object> getConsumerConfig();

        @Pure
        public abstract List<String> getTopics();

        @Pure
        public abstract List<TopicPartition> getTopicPartitions();

        @Pure
        public abstract Pattern getTopicPattern();

        @Pure
        public abstract Coder<K> getKeyCoder();

        @Pure
        public abstract Coder<V> getValueCoder();

        @Pure
        public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        @Pure
        public abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        @Pure
        public abstract long getMaxNumRecords();

        @Pure
        public abstract Duration getMaxReadTime();

        @Pure
        public abstract Instant getStartReadTime();

        @Pure
        public abstract Instant getStopReadTime();

        @Pure
        public abstract boolean isCommitOffsetsInFinalizeEnabled();

        @Pure
        public abstract boolean isDynamicRead();

        @Pure
        public abstract boolean isRedistributed();

        @Pure
        public abstract boolean isAllowDuplicates();

        @Pure
        public abstract int getRedistributeNumKeys();

        @Pure
        public abstract Boolean getOffsetDeduplication();

        @Pure
        public abstract Duration getWatchTopicPartitionDuration();

        @Pure
        public abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        @Pure
        public abstract Map<String, Object> getOffsetConsumerConfig();

        @Pure
        public abstract DeserializerProvider<K> getKeyDeserializerProvider();

        @Pure
        public abstract DeserializerProvider<V> getValueDeserializerProvider();

        @Pure
        public abstract CheckStopReadingFn getCheckStopReadingFn();

        @Pure
        public abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

        @Pure
        public abstract long getConsumerPollingTimeout();

        @Pure
        public abstract Boolean getLogTopicVerification();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String str) {
            return withConsumerConfigUpdates(ImmutableMap.of("bootstrap.servers", str));
        }

        public Read<K, V> withTopic(String str) {
            return withTopics(ImmutableList.of(str));
        }

        public Read<K, V> withTopics(List<String> list) {
            Preconditions.checkState((getTopicPartitions() == null || getTopicPartitions().isEmpty()) && getTopicPattern() == null, "Only one of topics, topicPartitions or topicPattern can be set");
            return toBuilder().setTopics(ImmutableList.copyOf(list)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> list) {
            Preconditions.checkState((getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null, "Only one of topics, topicPartitions or topicPattern can be set");
            return toBuilder().setTopicPartitions(ImmutableList.copyOf(list)).build();
        }

        public Read<K, V> withRedistribute() {
            return toBuilder().setRedistributed(true).build();
        }

        public Read<K, V> withAllowDuplicates(Boolean bool) {
            return toBuilder().setAllowDuplicates(bool.booleanValue()).build();
        }

        public Read<K, V> withRedistributeNumKeys(int i) {
            return toBuilder().setRedistributeNumKeys(i).build();
        }

        public Read<K, V> withOffsetDeduplication(Boolean bool) {
            return toBuilder().setOffsetDeduplication(bool).build();
        }

        public Read<K, V> withTopicPattern(String str) {
            Preconditions.checkState((getTopics() == null || getTopics().isEmpty()) && (getTopicPartitions() == null || getTopicPartitions().isEmpty()), "Only one of topics, topicPartitions or topicPattern can be set");
            return toBuilder().setTopicPattern(Pattern.compile(str)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> cls) {
            return withKeyDeserializer(LocalDeserializerProvider.of(cls));
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> cls, Coder<K> coder) {
            return withKeyDeserializer(cls).toBuilder().setKeyCoder(coder).build();
        }

        public Read<K, V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider) {
            return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public Read<K, V> withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> coder) {
            return toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(coder).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> cls) {
            return withValueDeserializer(LocalDeserializerProvider.of(cls));
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> cls, Coder<V> coder) {
            return withValueDeserializer(cls).toBuilder().setValueCoder(coder).build();
        }

        public Read<K, V> withValueDeserializer(DeserializerProvider<V> deserializerProvider) {
            return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public Read<K, V> withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> coder) {
            return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(coder).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        @Deprecated
        public Read<K, V> updateConsumerProperties(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), map)).build();
        }

        public Read<K, V> withMaxNumRecords(long j) {
            return toBuilder().setMaxNumRecords(j).build();
        }

        public Read<K, V> withStartReadTime(Instant instant) {
            return toBuilder().setStartReadTime(instant).build();
        }

        public Read<K, V> withStopReadTime(Instant instant) {
            return toBuilder().setStopReadTime(instant).build();
        }

        public Read<K, V> withMaxReadTime(Duration duration) {
            return toBuilder().setMaxReadTime(duration).build();
        }

        public Read<K, V> withLogAppendTime() {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
        }

        public Read<K, V> withProcessingTime() {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
        }

        public Read<K, V> withCreateTime(Duration duration) {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(duration));
        }

        public Read<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "timestampFn can not be null");
            return toBuilder().setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(serializableFunction)).build();
        }

        @Deprecated
        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "watermarkFn can not be null");
            return toBuilder().setWatermarkFn(serializableFunction).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "timestampFn can not be null");
            return withTimestampFn2(unwrapKafkaAndThen(serializableFunction));
        }

        @Deprecated
        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "watermarkFn can not be null");
            return withWatermarkFn2(unwrapKafkaAndThen(serializableFunction));
        }

        public Read<K, V> withReadCommitted() {
            return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
        }

        public Read<K, V> commitOffsetsInFinalize() {
            return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
        }

        public Read<K, V> withDynamicRead(Duration duration) {
            return toBuilder().setDynamicRead(true).setWatchTopicPartitionDuration(duration).build();
        }

        public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> map) {
            return toBuilder().setOffsetConsumerConfig(map).build();
        }

        public Read<K, V> withConsumerConfigUpdates(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), map)).build();
        }

        public Read<K, V> withCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn) {
            return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public Read<K, V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> serializableFunction) {
            return toBuilder().setCheckStopReadingFn((CheckStopReadingFn) CheckStopReadingFnWrapper.of(serializableFunction)).build();
        }

        public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
            return toBuilder().setBadRecordErrorHandler(errorHandler).build();
        }

        public Read<K, V> withConsumerPollingTimeout(long j) {
            Preconditions.checkState(j > 0, "Consumer polling timeout must be greater than 0.");
            return toBuilder().setConsumerPollingTimeout(j).build();
        }

        public Read<K, V> withGCPApplicationDefaultCredentials() {
            return withConsumerConfigUpdates(ImmutableMap.of("security.protocol", "SASL_SSL", "sasl.mechanism", "OAUTHBEARER", "sasl.login.callback.handler.class", "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", "sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"));
        }

        public Read<K, V> withTopicVerificationLogging(boolean z) {
            return toBuilder().setLogTopicVerification(Boolean.valueOf(z)).build();
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public PTransform<PBegin, PCollection<Row>> externalWithMetadata() {
            return new RowsWithMetadata(this);
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin pBegin) {
            Preconditions.checkArgument(getConsumerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            if (isDynamicRead()) {
                Preconditions.checkArgument(ExperimentalOptions.hasExperiment(pBegin.getPipeline().getOptions(), "beam_fn_api"), "Kafka Dynamic Read requires enabling experiment beam_fn_api.");
            } else {
                Preconditions.checkArgument((getTopics() != null && getTopics().size() > 0) || (getTopicPartitions() != null && getTopicPartitions().size() > 0) || getTopicPattern() != null, "Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required");
            }
            Preconditions.checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
            Preconditions.checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                KafkaIO.LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", AppInfoParser.getVersion());
            }
            if (getStartReadTime() != null) {
                Preconditions.checkArgument(ConsumerSpEL.hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
            if (getStopReadTime() != null) {
                Preconditions.checkArgument(ConsumerSpEL.hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
            if (isCommitOffsetsInFinalizeEnabled()) {
                Preconditions.checkArgument(getConsumerConfig().get("group.id") != null, "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config is not set. Offset management requires group.id.");
                if (Boolean.TRUE.equals(getConsumerConfig().get("enable.auto.commit"))) {
                    KafkaIO.LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize() is set. You need only one of them.", "enable.auto.commit");
                }
            }
            checkRedistributeConfiguration();
            warnAboutUnsafeConfigurations(pBegin);
            CoderRegistry coderRegistry = pBegin.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = getKeyCoder(coderRegistry);
            Coder<V> valueCoder = getValueCoder(coderRegistry);
            KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult compatibility = KafkaIOReadImplementationCompatibility.getCompatibility(this);
            return (ExperimentalOptions.hasExperiment(pBegin.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read") || ExperimentalOptions.hasExperiment(pBegin.getPipeline().getOptions(), "use_deprecated_read") || ExperimentalOptions.hasExperiment(pBegin.getPipeline().getOptions(), "use_unbounded_sdf_wrapper") || compatibility.supportsOnly(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) || (compatibility.supports(KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation.LEGACY) && runnerPrefersLegacyRead(pBegin.getPipeline().getOptions()))) ? pBegin.apply(new ReadFromKafkaViaUnbounded(this, keyCoder, valueCoder)) : pBegin.apply(new ReadFromKafkaViaSDF(this, keyCoder, valueCoder));
        }

        private void checkRedistributeConfiguration() {
            if (getRedistributeNumKeys() == 0 && isRedistributed()) {
                KafkaIO.LOG.warn("withRedistribute without withRedistributeNumKeys will create a key per record, which is sub-optimal for most use cases.");
            }
            if (isAllowDuplicates()) {
                KafkaIO.LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
            }
            if (getRedistributeNumKeys() > 0) {
                Preconditions.checkState(isRedistributed(), "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform.");
            }
            if (getOffsetDeduplication() == null || !getOffsetDeduplication().booleanValue()) {
                return;
            }
            Preconditions.checkState(isRedistributed() && !isAllowDuplicates(), "withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false).");
        }

        private void warnAboutUnsafeConfigurations(PBegin pBegin) {
            Long checkpointingInterval = ((FakeFlinkPipelineOptions) pBegin.getPipeline().getOptions().as(FakeFlinkPipelineOptions.class)).getCheckpointingInterval();
            String str = (String) getConsumerConfig().get("auto.offset.reset");
            if (checkpointingInterval == null || checkpointingInterval.longValue() == -1 || !Boolean.TRUE.equals(getConsumerConfig().get("enable.auto.commit")) || isCommitOffsetsInFinalizeEnabled()) {
                return;
            }
            if (str == null || "latest".equals(str)) {
                KafkaIO.LOG.warn("When using the Flink runner with checkpointingInterval enabled, Kafka enable.auto.commit enabled, and Kafka auto.offset.reset set to latest or unset, there is a chance for every checkpoint to time out, which will cause data loss. We recommend setting commitOffsetInFinalize to true in ReadFromKafka, enable.auto.commit to false, and auto.offset.reset to none");
            }
        }

        private boolean runnerPrefersLegacyRead(PipelineOptions pipelineOptions) {
            return (ExperimentalOptions.hasExperiment(pipelineOptions, "use_sdf_read") || pipelineOptions.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.") || ExperimentalOptions.hasExperiment(pipelineOptions, "beam_fn_api")) ? false : true;
        }

        private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
            return getKeyCoder() != null ? getKeyCoder() : ((DeserializerProvider) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getKeyDeserializerProvider())).getCoder(coderRegistry);
        }

        private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
            return getValueCoder() != null ? getValueCoder() : ((DeserializerProvider) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getValueDeserializerProvider())).getCoder(coderRegistry);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new KafkaUnboundedSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(SerializableFunction<KV<KeyT, ValueT>, OutT> serializableFunction) {
            return kafkaRecord -> {
                return serializableFunction.apply(kafkaRecord.getKV());
            };
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List list = (List) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getTopics());
            List list2 = (List) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getTopicPartitions());
            Pattern topicPattern = getTopicPattern();
            if (list.size() > 0) {
                builder.add(DisplayData.item("topics", Joiner.on(",").join(list)).withLabel("Topic/s"));
            } else if (list2.size() > 0) {
                builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(list2)).withLabel("Topic Partition/s"));
            } else if (topicPattern != null) {
                builder.add(DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern"));
            }
            Set<String> keySet = KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getConsumerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1979494231:
                    if (implMethodName.equals("lambda$unwrapKafkaAndThen$d0142e15$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$Read") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Ljava/lang/Object;")) {
                        SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(0);
                        return kafkaRecord -> {
                            return serializableFunction.apply(kafkaRecord.getKV());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors.class */
    public static abstract class ReadSourceDescriptors<K, V> extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
        private final TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> records = new TupleTag<>();
        private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$Builder.class */
        public static abstract class Builder<K, V> {
            abstract Builder<K, V> setConsumerConfig(Map<String, Object> map);

            abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> map);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction);

            abstract Builder<K, V> setCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn);

            Builder<K, V> setCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> serializableFunction) {
                return setCheckStopReadingFn((CheckStopReadingFn) CheckStopReadingFnWrapper.of(serializableFunction));
            }

            abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider<K> deserializerProvider);

            abstract Builder<K, V> setValueDeserializerProvider(DeserializerProvider<V> deserializerProvider);

            abstract Builder<K, V> setKeyCoder(Coder<K> coder);

            abstract Builder<K, V> setValueCoder(Coder<V> coder);

            abstract Builder<K, V> setExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction);

            abstract Builder<K, V> setCreateWatermarkEstimatorFn(SerializableFunction<Instant, WatermarkEstimator<Instant>> serializableFunction);

            abstract Builder<K, V> setCommitOffsetEnabled(boolean z);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory);

            abstract Builder<K, V> setBadRecordRouter(BadRecordRouter badRecordRouter);

            abstract Builder<K, V> setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler);

            abstract Builder<K, V> setConsumerPollingTimeout(long j);

            abstract Builder<K, V> setBounded(boolean z);

            abstract Builder<K, V> setRedistribute(boolean z);

            abstract Builder<K, V> setAllowDuplicates(boolean z);

            abstract Builder<K, V> setRedistributeNumKeys(int i);

            abstract ReadSourceDescriptors<K, V> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$ExtractOutputTimestampFns.class */
        public static class ExtractOutputTimestampFns<K, V> {
            ExtractOutputTimestampFns() {
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useProcessingTime() {
                return kafkaRecord -> {
                    return Instant.now();
                };
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useCreateTime() {
                return kafkaRecord -> {
                    Preconditions.checkArgument(kafkaRecord.getTimestampType() == KafkaTimestampType.CREATE_TIME, "Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", kafkaRecord.getTopic(), Integer.valueOf(kafkaRecord.getPartition()), Long.valueOf(kafkaRecord.getOffset()), kafkaRecord.getTimestampType());
                    return new Instant(kafkaRecord.getTimestamp());
                };
            }

            public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useLogAppendTime() {
                return kafkaRecord -> {
                    Preconditions.checkArgument(kafkaRecord.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME, "Kafka record's timestamp is not 'LOG_APPEND_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", kafkaRecord.getTopic(), Integer.valueOf(kafkaRecord.getPartition()), Long.valueOf(kafkaRecord.getOffset()), kafkaRecord.getTimestampType());
                    return new Instant(kafkaRecord.getTimestamp());
                };
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1358628209:
                        if (implMethodName.equals("lambda$useProcessingTime$9480aca4$1")) {
                            z = false;
                            break;
                        }
                        break;
                    case -671849448:
                        if (implMethodName.equals("lambda$useLogAppendTime$9480aca4$1")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 841663622:
                        if (implMethodName.equals("lambda$useCreateTime$9480aca4$1")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$ExtractOutputTimestampFns") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/joda/time/Instant;")) {
                            return kafkaRecord -> {
                                return Instant.now();
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$ExtractOutputTimestampFns") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/joda/time/Instant;")) {
                            return kafkaRecord2 -> {
                                Preconditions.checkArgument(kafkaRecord2.getTimestampType() == KafkaTimestampType.CREATE_TIME, "Kafka record's timestamp is not 'CREATE_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", kafkaRecord2.getTopic(), Integer.valueOf(kafkaRecord2.getPartition()), Long.valueOf(kafkaRecord2.getOffset()), kafkaRecord2.getTimestampType());
                                return new Instant(kafkaRecord2.getTimestamp());
                            };
                        }
                        break;
                    case true:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$ExtractOutputTimestampFns") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/joda/time/Instant;")) {
                            return kafkaRecord3 -> {
                                Preconditions.checkArgument(kafkaRecord3.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME, "Kafka record's timestamp is not 'LOG_APPEND_TIME' (topic: %s, partition %s, offset %s, timestamp type '%s')", kafkaRecord3.getTopic(), Integer.valueOf(kafkaRecord3.getPartition()), Long.valueOf(kafkaRecord3.getOffset()), kafkaRecord3.getTimestampType());
                                return new Instant(kafkaRecord3.getTimestamp());
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors$ReadAllFromRow.class */
        private static class ReadAllFromRow<K, V> extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
            private final ReadSourceDescriptors<K, V> readViaSDF;

            ReadAllFromRow(ReadSourceDescriptors<K, V> readSourceDescriptors) {
                this.readViaSDF = readSourceDescriptors;
            }

            public PCollection<KV<K, V>> expand(PCollection<Row> pCollection) {
                return pCollection.apply(Convert.fromRows(KafkaSourceDescriptor.class)).apply(this.readViaSDF).apply(ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors.ReadAllFromRow.1
                    @DoFn.ProcessElement
                    public void processElement(@DoFn.Element KafkaRecord<K, V> kafkaRecord, DoFn.OutputReceiver<KV<K, V>> outputReceiver) {
                        outputReceiver.output(kafkaRecord.getKV());
                    }
                })).setCoder(KvCoder.of((Coder) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.readViaSDF.getKeyCoder()), (Coder) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(this.readViaSDF.getValueCoder())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract Map<String, Object> getConsumerConfig();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract Map<String, Object> getOffsetConsumerConfig();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract DeserializerProvider<K> getKeyDeserializerProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract DeserializerProvider<V> getValueDeserializerProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract Coder<K> getKeyCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract Coder<V> getValueCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract CheckStopReadingFn getCheckStopReadingFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract SerializableFunction<Instant, WatermarkEstimator<Instant>> getCreateWatermarkEstimatorFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract boolean isCommitOffsetEnabled();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract boolean isRedistribute();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract boolean isAllowDuplicates();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract int getRedistributeNumKeys();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract BadRecordRouter getBadRecordRouter();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract long getConsumerPollingTimeout();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean isBounded();

        abstract Builder<K, V> toBuilder();

        public static <K, V> ReadSourceDescriptors<K, V> read() {
            return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder().setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setCommitOffsetEnabled(false).setBounded(false).setBadRecordRouter(BadRecordRouter.THROWING_ROUTER).setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler()).setConsumerPollingTimeout(2L).setRedistribute(false).setAllowDuplicates(false).setRedistributeNumKeys(0).build().withProcessingTime().withMonotonicallyIncreasingWatermarkEstimator();
        }

        public ReadSourceDescriptors<K, V> withBootstrapServers(String str) {
            return withConsumerConfigUpdates(ImmutableMap.of("bootstrap.servers", str));
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> cls) {
            return withKeyDeserializerProvider(LocalDeserializerProvider.of(cls));
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> cls, Coder<K> coder) {
            return withKeyDeserializer(cls).toBuilder().setKeyCoder(coder).build();
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(DeserializerProvider<K> deserializerProvider) {
            return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
        }

        public ReadSourceDescriptors<K, V> withKeyDeserializerProviderAndCoder(DeserializerProvider<K> deserializerProvider, Coder<K> coder) {
            return toBuilder().setKeyDeserializerProvider(deserializerProvider).setKeyCoder(coder).build();
        }

        public ReadSourceDescriptors<K, V> withValueDeserializer(Class<? extends Deserializer<V>> cls) {
            return withValueDeserializerProvider(LocalDeserializerProvider.of(cls));
        }

        public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> cls, Coder<V> coder) {
            return withValueDeserializer(cls).toBuilder().setValueCoder(coder).build();
        }

        public ReadSourceDescriptors<K, V> withValueDeserializerProvider(DeserializerProvider<V> deserializerProvider) {
            return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
        }

        public ReadSourceDescriptors<K, V> withValueDeserializerProviderAndCoder(DeserializerProvider<V> deserializerProvider, Coder<V> coder) {
            return toBuilder().setValueDeserializerProvider(deserializerProvider).setValueCoder(coder).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        public ReadSourceDescriptors<K, V> withCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn) {
            return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
        }

        public ReadSourceDescriptors<K, V> withCheckStopReadingFn(SerializableFunction<TopicPartition, Boolean> serializableFunction) {
            return toBuilder().setCheckStopReadingFn((CheckStopReadingFn) CheckStopReadingFnWrapper.of(serializableFunction)).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), map)).build();
        }

        public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            return toBuilder().setExtractOutputTimestampFn(serializableFunction).build();
        }

        public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(SerializableFunction<Instant, WatermarkEstimator<Instant>> serializableFunction) {
            return toBuilder().setCreateWatermarkEstimatorFn(serializableFunction).build();
        }

        public ReadSourceDescriptors<K, V> withLogAppendTime() {
            return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useLogAppendTime());
        }

        public ReadSourceDescriptors<K, V> withProcessingTime() {
            return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useProcessingTime());
        }

        public ReadSourceDescriptors<K, V> withRedistribute() {
            return toBuilder().setRedistribute(true).build();
        }

        public ReadSourceDescriptors<K, V> withAllowDuplicates() {
            return toBuilder().setAllowDuplicates(true).build();
        }

        public ReadSourceDescriptors<K, V> withRedistributeNumKeys(int i) {
            return toBuilder().setRedistributeNumKeys(i).build();
        }

        public ReadSourceDescriptors<K, V> withCreateTime() {
            return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useCreateTime());
        }

        public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
            return withCreatWatermarkEstimatorFn(instant -> {
                return new WatermarkEstimators.WallTime(instant);
            });
        }

        public ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
            return withCreatWatermarkEstimatorFn(instant -> {
                return new WatermarkEstimators.MonotonicallyIncreasing(instant);
            });
        }

        public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
            return withCreatWatermarkEstimatorFn(instant -> {
                return new WatermarkEstimators.Manual(instant);
            });
        }

        public ReadSourceDescriptors<K, V> withReadCommitted() {
            return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
        }

        public ReadSourceDescriptors<K, V> commitOffsets() {
            return toBuilder().setCommitOffsetEnabled(true).build();
        }

        public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> map) {
            return toBuilder().setOffsetConsumerConfig(map).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(map).build();
        }

        public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
            return toBuilder().setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordErrorHandler(errorHandler).build();
        }

        public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(long j) {
            return toBuilder().setConsumerPollingTimeout(j).build();
        }

        ReadAllFromRow<K, V> forExternalBuild() {
            return new ReadAllFromRow<>(this);
        }

        ReadSourceDescriptors<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build().withManualWatermarkEstimator();
        }

        ReadSourceDescriptors<K, V> withBounded() {
            return toBuilder().setBounded(true).build();
        }

        public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> pCollection) {
            Preconditions.checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
            Preconditions.checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
            if (!ConsumerSpEL.hasOffsetsForTimes()) {
                LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", AppInfoParser.getVersion());
            }
            if (isCommitOffsetEnabled() && configuredKafkaCommit()) {
                LOG.info("auto_commit is set together with commitOffsetEnabled but you only need one of them. The commitOffsetEnabled is going to be ignored");
            }
            if (isRedistribute()) {
                if (getRedistributeNumKeys() == 0) {
                    LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
                }
                if ((isCommitOffsetEnabled() || configuredKafkaCommit()) && isAllowDuplicates()) {
                    LOG.warn("Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since withRestribute() is enabled with allow duplicates, the runner may have additional work processed that is ahead of the current checkpoint");
                }
            }
            if (getConsumerConfig().get("bootstrap.servers") == null) {
                LOG.warn("The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
            }
            CoderRegistry coderRegistry = pCollection.getPipeline().getCoderRegistry();
            KafkaRecordCoder of = KafkaRecordCoder.of(getKeyCoder(coderRegistry), getValueCoder(coderRegistry));
            try {
                PCollectionTuple apply = pCollection.apply(ParDo.of(ReadFromKafkaDoFn.create(this, this.records)).withOutputTags(this.records, TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
                getBadRecordErrorHandler().addErrorCollection(apply.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(BadRecord.getCoder(pCollection.getPipeline())));
                PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> coder = apply.get(this.records).setCoder(KvCoder.of(pCollection.getPipeline().getSchemaRegistry().getSchemaCoder(KafkaSourceDescriptor.class), of));
                if (!(isCommitOffsetEnabled() && !configuredKafkaCommit())) {
                    return coder.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors.1
                    }).via((v0) -> {
                        return v0.getValue();
                    })).setCoder(of);
                }
                String updateCompatibilityVersion = pCollection.getPipeline().getOptions().as(StreamingOptions.class).getUpdateCompatibilityVersion();
                if (updateCompatibilityVersion != null) {
                    List asList = Arrays.asList(updateCompatibilityVersion.split("\\."));
                    List asList2 = Arrays.asList("2", "60", "0");
                    if (Comparators.lexicographical(Comparator.naturalOrder()).compare(asList, asList2) < 0) {
                        Preconditions.checkArgument(!isRedistribute(), "Can not enable isRedistribute() while committing offsets prior to " + String.join(".", asList2));
                        return expand259Commits(coder, of, pCollection.getPipeline().getSchemaRegistry());
                    }
                }
                coder.apply(new KafkaCommitOffset(this, false)).setCoder(VoidCoder.of());
                return coder.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors.2
                }).via((v0) -> {
                    return v0.getValue();
                })).setCoder(of);
            } catch (NoSuchSchemaException e) {
                throw new RuntimeException(e.getMessage());
            }
        }

        private PCollection<KafkaRecord<K, V>> expand259Commits(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> pCollection, Coder<KafkaRecord<K, V>> coder, SchemaRegistry schemaRegistry) throws NoSuchSchemaException {
            PCollection coder2 = pCollection.apply(Reshuffle.viaRandomKey()).setCoder(KvCoder.of(schemaRegistry.getSchemaCoder(KafkaSourceDescriptor.class), coder));
            coder2.apply(new KafkaCommitOffset(this, true)).setCoder(VoidCoder.of());
            return coder2.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors.3
            }).via((v0) -> {
                return v0.getValue();
            })).setCoder(coder);
        }

        private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
            return getKeyCoder() != null ? getKeyCoder() : ((DeserializerProvider) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getKeyDeserializerProvider())).getCoder(coderRegistry);
        }

        private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
            return getValueCoder() != null ? getValueCoder() : ((DeserializerProvider) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getValueDeserializerProvider())).getCoder(coderRegistry);
        }

        private boolean configuredKafkaCommit() {
            return Boolean.TRUE.equals(getConsumerConfig().get("enable.auto.commit"));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1942724550:
                    if (implMethodName.equals("lambda$withWallTimeWatermarkEstimator$6028e89e$1")) {
                        z = 3;
                        break;
                    }
                    break;
                case -1630415413:
                    if (implMethodName.equals("lambda$withManualWatermarkEstimator$6028e89e$1")) {
                        z = 2;
                        break;
                    }
                    break;
                case -1038162090:
                    if (implMethodName.equals("lambda$withMonotonicallyIncreasingWatermarkEstimator$6028e89e$1")) {
                        z = true;
                        break;
                    }
                    break;
                case 1967798203:
                    if (implMethodName.equals("getValue")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        return (v0) -> {
                            return v0.getValue();
                        };
                    }
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        return (v0) -> {
                            return v0.getValue();
                        };
                    }
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        return (v0) -> {
                            return v0.getValue();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator;")) {
                        return instant -> {
                            return new WatermarkEstimators.MonotonicallyIncreasing(instant);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator;")) {
                        return instant2 -> {
                            return new WatermarkEstimators.Manual(instant2);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$ReadSourceDescriptors") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Lorg/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator;")) {
                        return instant3 -> {
                            return new WatermarkEstimators.WallTime(instant3);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$RowsWithMetadata.class */
    public static class RowsWithMetadata<K, V> extends PTransform<PBegin, PCollection<Row>> {
        private final Read<K, V> read;

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$RowsWithMetadata$Builder.class */
        static class Builder<K, V> implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<Row>> {
            Builder() {
            }

            public PTransform<PBegin, PCollection<Row>> buildExternal(Read.External.Configuration configuration) {
                AutoValue_KafkaIO_Read.Builder builder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(builder, configuration);
                Coder resolveCoder = Read.Builder.resolveCoder(KafkaIO.resolveClass(configuration.keyDeserializer));
                if (!(resolveCoder instanceof NullableCoder) || !(resolveCoder.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    throw new RuntimeException("ExternalWithMetadata transform only supports keys of type nullable(byte[])");
                }
                Coder resolveCoder2 = Read.Builder.resolveCoder(KafkaIO.resolveClass(configuration.valueDeserializer));
                if ((resolveCoder2 instanceof NullableCoder) && (resolveCoder2.getCoderArguments().get(0) instanceof ByteArrayCoder)) {
                    return builder.build().externalWithMetadata();
                }
                throw new RuntimeException("ExternalWithMetadata transform only supports values of type nullable(byte[])");
            }
        }

        RowsWithMetadata(Read<K, V> read) {
            super("KafkaIO.RowsWithMetadata");
            this.read = read;
        }

        public static <K, V> ByteArrayKafkaRecord toExternalKafkaRecord(KafkaRecord<K, V> kafkaRecord) {
            return new ByteArrayKafkaRecord(kafkaRecord.getTopic(), kafkaRecord.getPartition(), kafkaRecord.getOffset(), kafkaRecord.getTimestamp(), (byte[]) kafkaRecord.getKV().getKey(), (byte[]) kafkaRecord.getKV().getValue(), kafkaRecord.getHeaders() == null ? null : (List) Arrays.stream(kafkaRecord.getHeaders().toArray()).map(header -> {
                return new KafkaHeader(header.key(), header.value());
            }).collect(Collectors.toList()), kafkaRecord.getTimestampType().id, kafkaRecord.getTimestampType().name);
        }

        public PCollection<Row> expand(PBegin pBegin) {
            return pBegin.apply(this.read).apply("Convert to ExternalKafkaRecord", ParDo.of(new DoFn<KafkaRecord<K, V>, ByteArrayKafkaRecord>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.RowsWithMetadata.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KafkaRecord<K, V>, ByteArrayKafkaRecord>.ProcessContext processContext) {
                    processContext.output(RowsWithMetadata.toExternalKafkaRecord((KafkaRecord) processContext.element()));
                }
            })).apply(Convert.toRows());
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$TypedWithoutMetadata.class */
    public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$TypedWithoutMetadata$Builder.class */
        static class Builder<K, V> implements ExternalTransformBuilder<Read.External.Configuration, PBegin, PCollection<KV<K, V>>> {
            Builder() {
            }

            public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(Read.External.Configuration configuration) {
                AutoValue_KafkaIO_Read.Builder builder = new AutoValue_KafkaIO_Read.Builder();
                Read.Builder.setupExternalBuilder(builder, configuration);
                return builder.build().withoutMetadata();
            }
        }

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin pBegin) {
            return pBegin.apply(this.read).apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.TypedWithoutMetadata.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) {
                    processContext.output(((KafkaRecord) processContext.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write.class */
    public static abstract class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
        public static final Class<AutoValue_KafkaIO_Write> AUTOVALUE_CLASS = AutoValue_KafkaIO_Write.class;

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$Builder.class */
        public static abstract class Builder<K, V> implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
            abstract Builder<K, V> setTopic(String str);

            abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> writeRecords);

            abstract Write<K, V> build();

            public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(External.Configuration configuration) {
                setTopic(configuration.topic);
                HashMap hashMap = new HashMap(configuration.producerConfig);
                Class<? extends Serializer<K>> resolveClass = KafkaIO.resolveClass(configuration.keySerializer);
                setWriteRecordsTransform(KafkaIO.writeRecords().withProducerConfigUpdates(hashMap).withKeySerializer(resolveClass).withValueSerializer(KafkaIO.resolveClass(configuration.valueSerializer)).withTopic(configuration.topic));
                return build();
            }
        }

        @AutoService({ExternalTransformRegistrar.class})
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$External.class */
        public static class External implements ExternalTransformRegistrar {
            public static final String URN = "beam:transform:org.apache.beam:kafka_write:v1";

            /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$External$Configuration.class */
            public static class Configuration {
                private Map<String, String> producerConfig;
                private String topic;
                private String keySerializer;
                private String valueSerializer;

                public void setProducerConfig(Map<String, String> map) {
                    this.producerConfig = map;
                }

                public void setTopic(String str) {
                    this.topic = str;
                }

                public void setKeySerializer(String str) {
                    this.keySerializer = str;
                }

                public void setValueSerializer(String str) {
                    this.valueSerializer = str;
                }
            }

            public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
                return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$PublishTimestampFunctionKV.class */
        private static class PublishTimestampFunctionKV<K, V> implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
            private KafkaPublishTimestampFunction<KV<K, V>> fn;

            public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> kafkaPublishTimestampFunction) {
                this.fn = kafkaPublishTimestampFunction;
            }

            @Override // org.apache.beam.sdk.io.kafka.KafkaPublishTimestampFunction
            public Instant getTimestamp(ProducerRecord<K, V> producerRecord, Instant instant) {
                return this.fn.getTimestamp(KV.of(producerRecord.key(), producerRecord.value()), instant);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTopic();

        public abstract WriteRecords<K, V> getWriteRecordsTransform();

        abstract Builder<K, V> toBuilder();

        private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> writeRecords) {
            return toBuilder().setWriteRecordsTransform(writeRecords).build();
        }

        public Write<K, V> withBootstrapServers(String str) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withBootstrapServers(str));
        }

        public Write<K, V> withTopic(String str) {
            return toBuilder().setTopic(str).setWriteRecordsTransform(getWriteRecordsTransform().withTopic(str)).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> cls) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(cls));
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> cls) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withValueSerializer(cls));
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withProducerFactoryFn(serializableFunction));
        }

        public Write<K, V> withInputTimestamp() {
            return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
        }

        @Deprecated
        public Write<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<KV<K, V>> kafkaPublishTimestampFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withPublishTimestampFunction(new PublishTimestampFunctionKV(kafkaPublishTimestampFunction)));
        }

        public Write<K, V> withEOS(int i, String str) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(i, str));
        }

        public Write<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withConsumerFactoryFn(serializableFunction));
        }

        @Deprecated
        public Write<K, V> updateProducerProperties(Map<String, Object> map) {
            return withWriteRecordsTransform(getWriteRecordsTransform().updateProducerProperties(map));
        }

        public Write<K, V> withProducerConfigUpdates(Map<String, Object> map) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withProducerConfigUpdates(map));
        }

        public Write<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withBadRecordErrorHandler(errorHandler));
        }

        public Write<K, V> withGCPApplicationDefaultCredentials() {
            return withProducerConfigUpdates(ImmutableMap.of("security.protocol", "SASL_SSL", "sasl.mechanism", "OAUTHBEARER", "sasl.login.callback.handler.class", "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", "sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"));
        }

        public PDone expand(PCollection<KV<K, V>> pCollection) {
            final String str = (String) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(getTopic(), "withTopic() is required");
            KvCoder coder = pCollection.getCoder();
            return pCollection.apply("Kafka ProducerRecord", MapElements.via(new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.Write.1
                public ProducerRecord<K, V> apply(KV<K, V> kv) {
                    return new ProducerRecord<>(str, kv.getKey(), kv.getValue());
                }
            })).setCoder(ProducerRecordCoder.of(coder.getKeyCoder(), coder.getValueCoder())).apply(getWriteRecordsTransform());
        }

        public void validate(PipelineOptions pipelineOptions) {
            getWriteRecordsTransform().validate(pipelineOptions);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            getWriteRecordsTransform().populateDisplayData(builder);
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(withKeySerializer(StringSerializer.class));
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$WriteRecords.class */
    public static abstract class WriteRecords<K, V> extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of("retries", 3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of("key.serializer", "Use withKeySerializer instead", "value.serializer", "Use withValueSerializer instead");

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$WriteRecords$Builder.class */
        public static abstract class Builder<K, V> {
            abstract Builder<K, V> setTopic(String str);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> map);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> cls);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> cls);

            abstract Builder<K, V> setPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> kafkaPublishTimestampFunction);

            abstract Builder<K, V> setEOS(boolean z);

            abstract Builder<K, V> setSinkGroupId(String str);

            abstract Builder<K, V> setNumShards(int i);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction);

            abstract Builder<K, V> setBadRecordRouter(BadRecordRouter badRecordRouter);

            abstract Builder<K, V> setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler);

            abstract WriteRecords<K, V> build();
        }

        @Pure
        public abstract String getTopic();

        @Pure
        public abstract Map<String, Object> getProducerConfig();

        @Pure
        public abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        @Pure
        public abstract Class<? extends Serializer<K>> getKeySerializer();

        @Pure
        public abstract Class<? extends Serializer<V>> getValueSerializer();

        @Pure
        public abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();

        @Pure
        public abstract boolean isEOS();

        @Pure
        public abstract String getSinkGroupId();

        @Pure
        public abstract int getNumShards();

        @Pure
        public abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();

        @Pure
        public abstract BadRecordRouter getBadRecordRouter();

        @Pure
        public abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

        abstract Builder<K, V> toBuilder();

        public WriteRecords<K, V> withBootstrapServers(String str) {
            return withProducerConfigUpdates(ImmutableMap.of("bootstrap.servers", str));
        }

        public WriteRecords<K, V> withTopic(String str) {
            return toBuilder().setTopic(str).build();
        }

        public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> cls) {
            return toBuilder().setKeySerializer(cls).build();
        }

        public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> cls) {
            return toBuilder().setValueSerializer(cls).build();
        }

        @Deprecated
        public WriteRecords<K, V> updateProducerProperties(Map<String, Object> map) {
            return toBuilder().setProducerConfig(KafkaIOUtils.updateKafkaProperties(getProducerConfig(), map)).build();
        }

        public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> map) {
            return toBuilder().setProducerConfig(KafkaIOUtils.updateKafkaProperties(getProducerConfig(), map)).build();
        }

        public WriteRecords<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction) {
            return toBuilder().setProducerFactoryFn(serializableFunction).build();
        }

        public WriteRecords<K, V> withInputTimestamp() {
            return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
        }

        @Deprecated
        public WriteRecords<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> kafkaPublishTimestampFunction) {
            return toBuilder().setPublishTimestampFunction(kafkaPublishTimestampFunction).build();
        }

        public WriteRecords<K, V> withEOS(int i, String str) {
            KafkaExactlyOnceSink.ensureEOSSupport();
            Preconditions.checkArgument(i >= 1, "numShards should be >= 1");
            Preconditions.checkArgument(str != null, "sinkGroupId is required for exactly-once sink");
            return toBuilder().setEOS(true).setNumShards(i).setSinkGroupId(str).build();
        }

        public WriteRecords<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        public WriteRecords<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
            return toBuilder().setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER).setBadRecordErrorHandler(errorHandler).build();
        }

        public PDone expand(PCollection<ProducerRecord<K, V>> pCollection) {
            Preconditions.checkArgument(getProducerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            Preconditions.checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
            Preconditions.checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
            if (isEOS()) {
                Preconditions.checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");
                Preconditions.checkArgument(getBadRecordErrorHandler() instanceof ErrorHandler.DefaultErrorHandler, "BadRecordErrorHandling isn't supported with Kafka Exactly Once writing");
                KafkaExactlyOnceSink.ensureEOSSupport();
                pCollection.apply(new KafkaExactlyOnceSink(this));
            } else {
                getBadRecordErrorHandler().addErrorCollection(pCollection.apply(ParDo.of(new KafkaWriter(this)).withOutputTags(new TupleTag(), TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG))).get(BadRecordRouter.BAD_RECORD_TAG).setCoder(BadRecord.getCoder(pCollection.getPipeline())));
            }
            return PDone.in(pCollection.getPipeline());
        }

        public void validate(PipelineOptions pipelineOptions) {
            org.apache.beam.sdk.util.Preconditions.checkStateNotNull(pipelineOptions);
            if (isEOS()) {
                String name = pipelineOptions.getRunner().getName();
                if (!"org.apache.beam.runners.direct.DirectRunner".equals(name) && !name.startsWith("org.apache.beam.runners.dataflow.") && !name.startsWith("org.apache.beam.runners.spark.") && !name.startsWith("org.apache.beam.runners.flink.")) {
                    throw new UnsupportedOperationException(name + " is not a runner known to be compatible with Kafka exactly-once sink. This implementation of exactly-once sink relies on specific checkpoint guarantees. Only the runners with known to have compatible checkpoint semantics are allowed.");
                }
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
            Set<String> keySet = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getProducerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }
    }

    public static Read<byte[], byte[]> readBytes() {
        return read().withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList()).setTopicPartitions(new ArrayList()).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).setCommitOffsetsInFinalizeEnabled(false).setDynamicRead(false).setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()).setConsumerPollingTimeout(2L).setRedistributed(false).setAllowDuplicates(false).setRedistributeNumKeys(0).build();
    }

    public static <K, V> ReadSourceDescriptors<K, V> readSourceDescriptors() {
        return ReadSourceDescriptors.read();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setWriteRecordsTransform(writeRecords()).build();
    }

    public static <K, V> WriteRecords<K, V> writeRecords() {
        return new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN).setBadRecordRouter(BadRecordRouter.THROWING_ROUTER).setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler()).build();
    }

    private KafkaIO() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Class<?> resolveClass(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not find class: " + str);
        }
    }
}
