package org.apache.beam.sdk.io.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.class */
public class KafkaUnboundedSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);
    private final KafkaIO.Read<K, V> spec;
    private final int id;

    public List<KafkaUnboundedSource<K, V>> split(int i, PipelineOptions pipelineOptions) throws Exception {
        int min;
        ArrayList<TopicPartition> arrayList = new ArrayList((Collection) Preconditions.checkStateNotNull(this.spec.getTopicPartitions()));
        String str = (String) Preconditions.checkArgumentNotNull(this.spec.getConsumerConfig().get("bootstrap.servers"));
        if (arrayList.isEmpty()) {
            Consumer consumer = (Consumer) this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());
            try {
                List<String> list = (List) Preconditions.checkStateNotNull(this.spec.getTopics());
                if (list.isEmpty()) {
                    Pattern pattern = (Pattern) Preconditions.checkStateNotNull(this.spec.getTopicPattern());
                    for (Map.Entry<K, V> entry : consumer.listTopics().entrySet()) {
                        if (pattern.matcher((CharSequence) entry.getKey()).matches()) {
                            for (PartitionInfo partitionInfo : (List) entry.getValue()) {
                                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                                Lineage.getSources().add("kafka", ImmutableList.of(str, partitionInfo.topic()));
                            }
                        }
                    }
                } else {
                    for (String str2 : list) {
                        List<PartitionInfo> partitionsFor = consumer.partitionsFor(str2);
                        if (this.spec.getLogTopicVerification() == null || !this.spec.getLogTopicVerification().booleanValue()) {
                            org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState((partitionsFor == null || partitionsFor.isEmpty()) ? false : true, "Could not find any partitions info. Please check Kafka configuration and make sure that provided topics exist.");
                        } else {
                            LOG.warn("Could not find any partitions info. Please check Kafka configuration and make sure that the provided topics exist.");
                        }
                        for (PartitionInfo partitionInfo2 : partitionsFor) {
                            arrayList.add(new TopicPartition(partitionInfo2.topic(), partitionInfo2.partition()));
                        }
                        Lineage.getSources().add("kafka", ImmutableList.of(str, str2));
                    }
                }
                if (consumer != null) {
                    consumer.close();
                }
            } catch (Throwable th) {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } else {
            HashMap hashMap = new HashMap();
            for (TopicPartition topicPartition : arrayList) {
                ((List) hashMap.computeIfAbsent(topicPartition.topic(), str3 -> {
                    return new ArrayList();
                })).add(Integer.valueOf(topicPartition.partition()));
            }
            try {
                Consumer consumer2 = (Consumer) this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());
                try {
                    for (Map.Entry<K, V> entry2 : hashMap.entrySet()) {
                        String str4 = (String) entry2.getKey();
                        List<Integer> list2 = (List) entry2.getValue();
                        try {
                            Set set = (Set) consumer2.partitionsFor(str4).stream().map((v0) -> {
                                return v0.partition();
                            }).collect(Collectors.toSet());
                            if (this.spec.getLogTopicVerification() == null || !this.spec.getLogTopicVerification().booleanValue()) {
                                for (Integer num : list2) {
                                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(set.contains(num), "Partition " + num + " does not exist for topic " + str4 + ". Please check Kafka configuration.");
                                }
                            } else {
                                for (Integer num2 : list2) {
                                    if (!set.contains(num2)) {
                                        LOG.warn("Partition {} does not exist for topic {}. Please check Kafka configuration.", num2, str4);
                                    }
                                }
                            }
                        } catch (KafkaException e) {
                            LOG.warn("Unable to access cluster. Skipping fail fast checks.");
                        }
                        Lineage.getSources().add("kafka", ImmutableList.of(str, str4));
                    }
                    if (consumer2 != null) {
                        consumer2.close();
                    }
                } catch (Throwable th3) {
                    if (consumer2 != null) {
                        try {
                            consumer2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (KafkaException e2) {
                LOG.warn("WARN: Failed to connect to kafka for running pre-submit validation of kafka topic and partition configuration. This may be due to local permissions or connectivity to the kafka bootstrap server, or due to misconfiguration of KafkaIO. This validation is not required, and this warning may be ignored if the Beam job runs successfully.");
            }
        }
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.topic();
        }).thenComparingInt((v0) -> {
            return v0.partition();
        }));
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(i > 0);
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(arrayList.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names");
        if (offsetBasedDeduplicationSupported()) {
            min = arrayList.size();
            LOG.info("Offset-based deduplication is enabled for KafkaUnboundedSource. Forcing the number of splits to equal the number of total partitions: {}.", Integer.valueOf(min));
        } else {
            min = Math.min(i, arrayList.size());
            while (arrayList.size() % min > 0) {
                min++;
            }
        }
        ArrayList arrayList2 = new ArrayList(min);
        for (int i2 = 0; i2 < min; i2++) {
            arrayList2.add(new ArrayList());
        }
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            ((List) arrayList2.get(i3 % min)).add((TopicPartition) arrayList.get(i3));
        }
        ArrayList arrayList3 = new ArrayList(min);
        for (int i4 = 0; i4 < min; i4++) {
            List<TopicPartition> list3 = (List) arrayList2.get(i4);
            LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{Integer.valueOf(i4), Integer.valueOf(list3.size()), Joiner.on(",").join(list3)});
            arrayList3.add(new KafkaUnboundedSource(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(list3).build(), i4));
        }
        return arrayList3;
    }

    public KafkaUnboundedReader<K, V> createReader(PipelineOptions pipelineOptions, KafkaCheckpointMark kafkaCheckpointMark) {
        Preconditions.checkStateNotNull(this.spec.getTopicPartitions());
        if (!this.spec.getTopicPartitions().isEmpty()) {
            return new KafkaUnboundedReader<>(this, kafkaCheckpointMark);
        }
        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
        try {
            return new KafkaUnboundedReader<>(split(1, pipelineOptions).get(0), kafkaCheckpointMark);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
        return AvroCoder.of(KafkaCheckpointMark.class);
    }

    public boolean requiresDeduping() {
        return false;
    }

    public boolean offsetBasedDeduplicationSupported() {
        return this.spec.getOffsetDeduplication() != null && this.spec.getOffsetDeduplication().booleanValue();
    }

    public Coder<KafkaRecord<K, V>> getOutputCoder() {
        return KafkaRecordCoder.of((Coder) Preconditions.checkStateNotNull(this.spec.getKeyCoder()), (Coder) Preconditions.checkStateNotNull(this.spec.getValueCoder()));
    }

    public KafkaUnboundedSource(KafkaIO.Read<K, V> read, int i) {
        this.spec = read;
        this.id = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaIO.Read<K, V> getSpec() {
        return this.spec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getId() {
        return this.id;
    }
}
