package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Properties;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/sources/JsonKafkaSource.class */
public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> {
    private static final Logger LOG = LoggerFactory.getLogger(JsonKafkaSource.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:org/apache/hudi/utilities/sources/JsonKafkaSource$Config.class */
    public static class Config {
        public static final ConfigProperty<String> KAFKA_JSON_VALUE_DESERIALIZER_CLASS = ConfigProperty.key("hoodie.deltastreamer.source.kafka.json.value.deserializer.class").defaultValue(StringDeserializer.class.getName()).sinceVersion("1.1.0").withDocumentation("Kafka Json Payload Deserializer Class");
    }

    public JsonKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics hoodieIngestionMetrics) {
        this(typedProperties, javaSparkContext, sparkSession, hoodieIngestionMetrics, new DefaultStreamContext(schemaProvider, Option.empty()));
    }

    public JsonKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, HoodieIngestionMetrics hoodieIngestionMetrics, StreamContext streamContext) {
        super(typedProperties, javaSparkContext, sparkSession, Source.SourceType.JSON, hoodieIngestionMetrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), typedProperties, javaSparkContext), streamContext.getSourceProfileSupplier()));
        this.props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ConfigUtils.getStringWithAltKeys((Properties) this.props, Config.KAFKA_JSON_VALUE_DESERIALIZER_CLASS, true));
        this.offsetGen = new KafkaOffsetGen(this.props);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hudi.utilities.sources.KafkaSource
    public JavaRDD<String> toBatch(OffsetRange[] offsetRangeArr) {
        String string = this.props.getString(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        return postProcess(maybeAppendKafkaOffsets(KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).filter(consumerRecord -> {
            return filterForNullValues(consumerRecord.value(), string);
        }), string));
    }

    protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> javaRDD, String str) {
        return this.shouldAddOffsets ? javaRDD.mapPartitions(it -> {
            TaskContext taskContext = TaskContext.get();
            LOG.info("Converting Kafka source objects to strings with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ", new Object[]{Integer.valueOf(taskContext.stageId()), Integer.valueOf(taskContext.stageAttemptNumber()), Integer.valueOf(taskContext.partitionId()), Integer.valueOf(taskContext.attemptNumber()), Long.valueOf(taskContext.taskAttemptId())});
            return new CloseableMappingIterator(ClosableIterator.wrap(it), consumerRecord -> {
                try {
                    String valueAsString = getValueAsString(consumerRecord.value(), str);
                    String objToString = StringUtils.objToString(consumerRecord.key());
                    try {
                        ObjectNode readTree = OBJECT_MAPPER.readTree(valueAsString);
                        readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
                        readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
                        readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
                        if (objToString != null) {
                            readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN, objToString);
                        }
                        return OBJECT_MAPPER.writeValueAsString(readTree);
                    } catch (Throwable th) {
                        return valueAsString;
                    }
                } catch (JsonProcessingException e) {
                    throw new HoodieException((Throwable) e);
                }
            });
        }) : javaRDD.map(consumerRecord -> {
            try {
                return getValueAsString(consumerRecord.value(), str);
            } catch (JsonProcessingException e) {
                throw new HoodieException((Throwable) e);
            }
        });
    }

    private JavaRDD<String> postProcess(JavaRDD<String> javaRDD) {
        String stringWithAltKeys = ConfigUtils.getStringWithAltKeys((Properties) this.props, JsonKafkaPostProcessorConfig.JSON_KAFKA_PROCESSOR_CLASS, true);
        if (StringUtils.isNullOrEmpty(stringWithAltKeys)) {
            return javaRDD;
        }
        try {
            return UtilHelpers.createJsonKafkaSourcePostProcessor(stringWithAltKeys, this.props).process(javaRDD);
        } catch (IOException e) {
            throw new HoodieSourcePostProcessException("Could not init " + stringWithAltKeys, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Boolean filterForNullValues(Object obj, String str) {
        if (obj == null) {
            return false;
        }
        if (str.equals(StringDeserializer.class.getName())) {
            return Boolean.valueOf(StringUtils.nonEmpty((String) obj));
        }
        return true;
    }

    private static String getValueAsString(Object obj, String str) throws JsonProcessingException {
        return StringDeserializer.class.getName().equals(str) ? (String) obj : OBJECT_MAPPER.writeValueAsString(obj);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1482952910:
                if (implMethodName.equals("lambda$toBatch$34b00b68$1")) {
                    z = true;
                    break;
                }
                break;
            case 150087861:
                if (implMethodName.equals("lambda$maybeAppendKafkaOffsets$961c03d5$1")) {
                    z = false;
                    break;
                }
                break;
            case 589963124:
                if (implMethodName.equals("lambda$maybeAppendKafkaOffsets$8f2b7eb8$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/JsonKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return it -> {
                        TaskContext taskContext = TaskContext.get();
                        LOG.info("Converting Kafka source objects to strings with stageId : {}, stage attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ", new Object[]{Integer.valueOf(taskContext.stageId()), Integer.valueOf(taskContext.stageAttemptNumber()), Integer.valueOf(taskContext.partitionId()), Integer.valueOf(taskContext.attemptNumber()), Long.valueOf(taskContext.taskAttemptId())});
                        return new CloseableMappingIterator(ClosableIterator.wrap(it), consumerRecord -> {
                            try {
                                String valueAsString = getValueAsString(consumerRecord.value(), str);
                                String objToString = StringUtils.objToString(consumerRecord.key());
                                try {
                                    ObjectNode readTree = OBJECT_MAPPER.readTree(valueAsString);
                                    readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
                                    readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition());
                                    readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp());
                                    if (objToString != null) {
                                        readTree.put(KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN, objToString);
                                    }
                                    return OBJECT_MAPPER.writeValueAsString(readTree);
                                } catch (Throwable th) {
                                    return valueAsString;
                                }
                            } catch (JsonProcessingException e) {
                                throw new HoodieException((Throwable) e);
                            }
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/JsonKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Boolean;")) {
                    String str2 = (String) serializedLambda.getCapturedArg(0);
                    return consumerRecord -> {
                        return filterForNullValues(consumerRecord.value(), str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/JsonKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    return consumerRecord2 -> {
                        try {
                            return getValueAsString(consumerRecord2.value(), str3);
                        } catch (JsonProcessingException e) {
                            throw new HoodieException((Throwable) e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
