package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.TransactionalProducerRegistry;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.event.KafkaConsumerStartedPollingEvent;
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.scheduling.TaskScheduler;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
@Singleton
@Requires(beans = {KafkaDefaultConfiguration.class})
@Internal
/* loaded from: input_file:io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.class */
public class KafkaConsumerProcessor implements ExecutableMethodProcessor<Topic>, AutoCloseable, ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private static final ByteArrayDeserializer DEFAULT_KEY_DESERIALIZER = new ByteArrayDeserializer();
    private static final StringDeserializer DEFAULT_VALUE_DESERIALIZER = new StringDeserializer();
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final TaskScheduler taskScheduler;
    private final ProducerRegistry producerRegistry;
    private final TransactionalProducerRegistry transactionalProducerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> kafkaConsumerStartedPollingEventPublisher;
    private final ApplicationEventPublisher<KafkaConsumerSubscribedEvent> kafkaConsumerSubscribedEventPublisher;
    private final ConditionalRetryBehaviourHandler conditionalRetryBehaviourHandler;
    private final Map<String, ConsumerState> consumers = new ConcurrentHashMap();
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumerProcessor(@Named("consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration abstractKafkaConsumerConfiguration, ConsumerRecordBinderRegistry consumerRecordBinderRegistry, BatchConsumerRecordsBinderRegistry batchConsumerRecordsBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler kafkaListenerExceptionHandler, @Named("scheduled") ExecutorService executorService2, TransactionalProducerRegistry transactionalProducerRegistry, ApplicationEventPublisher<KafkaConsumerStartedPollingEvent> applicationEventPublisher, ApplicationEventPublisher<KafkaConsumerSubscribedEvent> applicationEventPublisher2, ConditionalRetryBehaviourHandler conditionalRetryBehaviourHandler) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = abstractKafkaConsumerConfiguration;
        this.binderRegistry = consumerRecordBinderRegistry;
        this.batchBinderRegistry = batchConsumerRecordsBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = kafkaListenerExceptionHandler;
        this.taskScheduler = new ScheduledExecutorTaskScheduler(executorService2);
        this.transactionalProducerRegistry = transactionalProducerRegistry;
        this.kafkaConsumerStartedPollingEventPublisher = applicationEventPublisher;
        this.kafkaConsumerSubscribedEventPublisher = applicationEventPublisher2;
        this.conditionalRetryBehaviourHandler = conditionalRetryBehaviourHandler;
        this.beanContext.getBeanDefinitions(Qualifiers.byType(new Class[]{KafkaListener.class})).forEach(beanDefinition -> {
            if (beanDefinition.isSingleton()) {
                try {
                    beanContext.getBean(beanDefinition.getBeanType());
                } catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + beanDefinition.getBeanType() + "]: " + e.getMessage(), e);
                }
            }
        });
    }

    @NonNull
    private ConsumerState getConsumerState(@NonNull String str) {
        ConsumerState consumerState = this.consumers.get(str);
        if (consumerState == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        return consumerState;
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @NonNull
    public <K, V> Consumer<K, V> getConsumer(@NonNull String str) {
        ArgumentUtils.requireNonNull("id", str);
        Consumer<K, V> consumer = (Consumer<K, V>) getConsumerState(str).kafkaConsumer;
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        return consumer;
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @NonNull
    public Set<String> getConsumerSubscription(@NonNull String str) {
        ArgumentUtils.requireNonNull("id", str);
        Set<String> set = getConsumerState(str).subscriptions;
        if (set == null || set.isEmpty()) {
            throw new IllegalArgumentException("No consumer subscription found for ID: " + str);
        }
        return set;
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @NonNull
    public Set<TopicPartition> getConsumerAssignment(@NonNull String str) {
        ArgumentUtils.requireNonNull("id", str);
        Set<TopicPartition> set = getConsumerState(str).assignments;
        if (set == null || set.isEmpty()) {
            throw new IllegalArgumentException("No consumer assignment found for ID: " + str);
        }
        return set;
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public boolean isPaused(@NonNull String str) {
        return isPaused(str, getConsumerState(str).assignments);
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public boolean isPaused(@NonNull String str, @NonNull Collection<TopicPartition> collection) {
        return getConsumerState(str).isPaused(collection);
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void pause(@NonNull String str) {
        getConsumerState(str).pause();
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void pause(@NonNull String str, @NonNull Collection<TopicPartition> collection) {
        getConsumerState(str).pause(collection);
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void resume(@NonNull String str) {
        getConsumerState(str).resume();
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void resume(@NonNull String str, @NonNull Collection<TopicPartition> collection) {
        getConsumerState(str).resume(collection);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        List<AnnotationValue<Topic>> declaredAnnotationValuesByType = executableMethod.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue<KafkaListener> annotation = executableMethod.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty(declaredAnnotationValuesByType)) {
            declaredAnnotationValuesByType = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (annotation == null || CollectionUtils.isEmpty(declaredAnnotationValuesByType)) {
            return;
        }
        Class<?> beanType = beanDefinition.getBeanType();
        String str = (String) annotation.stringValue("groupId").filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).orElseGet(() -> {
            return (String) this.applicationConfiguration.getName().orElse(beanType.getName());
        });
        String str2 = (String) annotation.stringValue("clientId").filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).orElseGet(() -> {
            return (String) this.applicationConfiguration.getName().map(str3 -> {
                return str3 + "-" + NameUtils.hyphenate(beanType.getSimpleName());
            }).orElse(null);
        });
        OffsetStrategy offsetStrategy = (OffsetStrategy) annotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
        AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = getConsumerConfigurationDefaults(str);
        if (annotation.isTrue("uniqueGroupId")) {
            str = str + "_" + UUID.randomUUID();
        }
        DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration = new DefaultKafkaConsumerConfiguration(consumerConfigurationDefaults);
        Properties createConsumerProperties = createConsumerProperties(annotation, defaultKafkaConsumerConfiguration, str2, str, offsetStrategy);
        configureDeserializers(executableMethod, defaultKafkaConsumerConfiguration);
        submitConsumerThreads(executableMethod, str2, str, offsetStrategy, declaredAnnotationValuesByType, annotation, defaultKafkaConsumerConfiguration, createConsumerProperties, beanType);
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        this.consumers.values().forEach((v0) -> {
            v0.wakeUp();
        });
        this.consumers.values().forEach((v0) -> {
            v0.close();
        });
        this.consumers.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publishStartedPollingEvent(Consumer<?, ?> consumer) {
        this.kafkaConsumerStartedPollingEventPublisher.publishEvent(new KafkaConsumerStartedPollingEvent(consumer));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void handleException(Object obj, KafkaListenerException kafkaListenerException) {
        try {
            if (obj instanceof KafkaListenerExceptionHandler) {
                ((KafkaListenerExceptionHandler) obj).handle(kafkaListenerException);
            } else {
                this.exceptionHandler.handle(kafkaListenerException);
            }
        } catch (Exception e) {
            e.addSuppressed(kafkaListenerException);
            LOG.error("Unexpected error while handling the kafka listener exception", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public boolean shouldRetryMessage(Object obj, KafkaListenerException kafkaListenerException) {
        try {
            return (obj instanceof ConditionalRetryBehaviourHandler ? (ConditionalRetryBehaviourHandler) obj : this.conditionalRetryBehaviourHandler).conditionalRetryBehaviour(kafkaListenerException) == ConditionalRetryBehaviourHandler.ConditionalRetryBehaviour.RETRY;
        } catch (Exception e) {
            e.addSuppressed(kafkaListenerException);
            LOG.error("Unexpected error while determining how to handle the kafka listener exception", e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleTask(Duration duration, Runnable runnable) {
        this.taskScheduler.schedule(duration, runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> Producer<K, V> getProducer(String str, Class<K> cls, Class<V> cls2) {
        return this.producerRegistry.getProducer(str, Argument.of(cls), Argument.of(cls2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> Producer<K, V> getTransactionalProducer(@Nullable String str, @Nullable String str2, Class<K> cls, Class<V> cls2) {
        return this.transactionalProducerRegistry.getTransactionalProducer(str, str2, Argument.of(cls), Argument.of(cls2));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleProducerFencedException(Producer<?, ?> producer, ProducerFencedException producerFencedException) {
        LOG.error("Failed accessing the producer: {}", producer, producerFencedException);
        this.transactionalProducerRegistry.close(producer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Flux<T> convertPublisher(T t) {
        return Flux.from((Publisher) Publishers.convertPublisher(this.beanContext.getConversionService(), t, Publisher.class));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerRecordBinderRegistry getBinderRegistry() {
        return this.binderRegistry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchConsumerRecordsBinderRegistry getBatchBinderRegistry() {
        return this.batchBinderRegistry;
    }

    private AbstractKafkaConsumerConfiguration getConsumerConfigurationDefaults(String str) {
        return findConfigurationBean(str).or(() -> {
            return findHyphenatedConsumerConfigurationBean(str);
        }).orElse(this.defaultConsumerConfiguration);
    }

    private Optional<AbstractKafkaConsumerConfiguration> findConfigurationBean(String str) {
        return this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName(str));
    }

    private Optional<AbstractKafkaConsumerConfiguration> findHyphenatedConsumerConfigurationBean(String str) {
        return NameUtils.isValidHyphenatedPropertyName(str) ? Optional.empty() : findConfigurationBean(NameUtils.hyphenate(str));
    }

    private Properties createConsumerProperties(AnnotationValue<KafkaListener> annotationValue, DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration, String str, String str2, OffsetStrategy offsetStrategy) {
        Properties config = defaultKafkaConsumerConfiguration.getConfig();
        if (annotationValue.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
            config.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
        }
        config.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
        annotationValue.get("heartbeatInterval", Duration.class).map((v0) -> {
            return v0.toMillis();
        }).map((v0) -> {
            return String.valueOf(v0);
        }).ifPresent(str3 -> {
            config.putIfAbsent("heartbeat.interval.ms", str3);
        });
        annotationValue.get("sessionTimeout", Duration.class).map((v0) -> {
            return v0.toMillis();
        }).map((v0) -> {
            return String.valueOf(v0);
        }).ifPresent(str4 -> {
            config.putIfAbsent("session.timeout.ms", str4);
        });
        annotationValue.enumValue("isolation", IsolationLevel.class).ifPresent(isolationLevel -> {
            config.putIfAbsent("isolation.level", isolationLevel.toString().toLowerCase(Locale.ROOT));
        });
        config.put("group.id", str2);
        if (str != null) {
            config.put("client.id", str);
        }
        config.putAll(annotationValue.getProperties("properties", "name"));
        return config;
    }

    private void debugDeserializationConfiguration(ExecutableMethod<?, ?> executableMethod, DefaultKafkaConsumerConfiguration<?, ?> defaultKafkaConsumerConfiguration) {
        if (LOG.isDebugEnabled()) {
            Properties config = defaultKafkaConsumerConfiguration.getConfig();
            String logMethod = logMethod(executableMethod);
            String str = (String) defaultKafkaConsumerConfiguration.getKeyDeserializer().map((v0) -> {
                return v0.toString();
            }).orElseGet(() -> {
                return config.getProperty("key.deserializer");
            });
            String str2 = (String) defaultKafkaConsumerConfiguration.getValueDeserializer().map((v0) -> {
                return v0.toString();
            }).orElseGet(() -> {
                return config.getProperty("value.deserializer");
            });
            LOG.debug("Using key deserializer [{}] for Kafka listener: {}", str, logMethod);
            LOG.debug("Using value deserializer [{}] for Kafka listener: {}", str2, logMethod);
        }
    }

    private void submitConsumerThreads(ExecutableMethod<?, ?> executableMethod, String str, String str2, OffsetStrategy offsetStrategy, List<AnnotationValue<Topic>> list, AnnotationValue<KafkaListener> annotationValue, DefaultKafkaConsumerConfiguration<?, ?> defaultKafkaConsumerConfiguration, Properties properties, Class<?> cls) {
        String str3;
        int orElse = annotationValue.intValue("threads").orElse(1);
        for (int i = 0; i < orElse; i++) {
            if (str != null) {
                str3 = (orElse > 1 || this.consumers.containsKey(str)) ? str + "-" + this.clientIdGenerator.incrementAndGet() : str;
                properties.put("client.id", str3);
            } else {
                str3 = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
            }
            Consumer consumer = (Consumer) this.beanContext.createBean(Consumer.class, new Object[]{defaultKafkaConsumerConfiguration});
            Object bean = this.beanContext.getBean(cls);
            if (bean instanceof ConsumerAware) {
                ((ConsumerAware) bean).setKafkaConsumer(consumer);
            }
            list.forEach(annotationValue2 -> {
                setupConsumerSubscription(executableMethod, annotationValue2, bean, consumer);
            });
            this.kafkaConsumerSubscribedEventPublisher.publishEvent(new KafkaConsumerSubscribedEvent(consumer));
            ConsumerInfo consumerInfo = new ConsumerInfo(str3, str2, offsetStrategy, annotationValue, executableMethod);
            ConsumerState consumerStateBatch = consumerInfo.isBatch ? new ConsumerStateBatch(this, consumerInfo, consumer, bean) : new ConsumerStateSingle(this, consumerInfo, consumer, bean);
            this.consumers.put(str3, consumerStateBatch);
            ExecutorService executorService = this.executorService;
            Objects.requireNonNull(consumerStateBatch);
            executorService.submit(consumerStateBatch::threadPollLoop);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setupConsumerSubscription(ExecutableMethod<?, ?> executableMethod, AnnotationValue<Topic> annotationValue, Object obj, Consumer<?, ?> consumer) {
        String[] stringValues = annotationValue.stringValues();
        String[] stringValues2 = annotationValue.stringValues("patterns");
        boolean isNotEmpty = ArrayUtils.isNotEmpty(stringValues);
        boolean isNotEmpty2 = ArrayUtils.isNotEmpty(stringValues2);
        String logMethod = LOG.isInfoEnabled() ? logMethod(executableMethod) : null;
        if (!isNotEmpty && !isNotEmpty2) {
            throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + executableMethod);
        }
        Optional<ConsumerRebalanceListener> consumerRebalanceListener = getConsumerRebalanceListener(obj, consumer);
        if (isNotEmpty) {
            List asList = Arrays.asList(stringValues);
            consumerRebalanceListener.ifPresentOrElse(consumerRebalanceListener2 -> {
                consumer.subscribe(asList, consumerRebalanceListener2);
            }, () -> {
                consumer.subscribe(asList);
            });
            LOG.info("Kafka listener [{}] subscribed to topics: {}", logMethod, asList);
        }
        if (isNotEmpty2) {
            try {
                for (String str : stringValues2) {
                    Pattern compile = Pattern.compile(str);
                    consumerRebalanceListener.ifPresentOrElse(consumerRebalanceListener3 -> {
                        consumer.subscribe(compile, consumerRebalanceListener3);
                    }, () -> {
                        consumer.subscribe(compile);
                    });
                    LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", logMethod, str);
                }
            } catch (PatternSyntaxException e) {
                throw new MessagingSystemException("Invalid topic pattern [" + e.getPattern() + "] for method [" + executableMethod + "]: " + e.getMessage(), e);
            }
        }
    }

    private static Optional<ConsumerRebalanceListener> getConsumerRebalanceListener(Object obj, Consumer<?, ?> consumer) {
        if (obj instanceof ConsumerSeekAware) {
            return Optional.of(new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(consumer), (ConsumerSeekAware) obj));
        }
        return obj instanceof ConsumerRebalanceListener ? Optional.of((ConsumerRebalanceListener) obj) : Optional.empty();
    }

    private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> executableMethod) {
        return (Argument) Arrays.stream(executableMethod.getArguments()).filter(argument -> {
            return isConsumerRecord(argument) || argument.getAnnotationMetadata().hasAnnotation(MessageBody.class);
        }).findFirst().orElseGet(() -> {
            return (Argument) Arrays.stream(executableMethod.getArguments()).filter(argument2 -> {
                return (argument2.getAnnotationMetadata().hasStereotype(Bindable.class) || isLastArgumentOfSuspendedMethod(argument2, executableMethod)) ? false : true;
            }).findFirst().orElse(null);
        });
    }

    private static Argument<?> findBodyArgument(boolean z, ExecutableMethod<?, ?> executableMethod) {
        Argument<?> findBodyArgument = findBodyArgument(executableMethod);
        return (!z || findBodyArgument == null) ? findBodyArgument : isConsumerRecord(findBodyArgument) ? findBodyArgument : getComponentType(findBodyArgument);
    }

    private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, ExecutableMethod<?, ?> executableMethod) {
        if (executableMethod.isSuspend()) {
            return argument.equals(executableMethod.getArguments()[executableMethod.getArguments().length - 1]);
        }
        return false;
    }

    private void configureDeserializers(ExecutableMethod<?, ?> executableMethod, DefaultKafkaConsumerConfiguration<?, ?> defaultKafkaConsumerConfiguration) {
        Argument<?> findBodyArgument = findBodyArgument(executableMethod.isTrue(KafkaListener.class, "batch"), executableMethod);
        configureKeyDeserializer(findBodyArgument, executableMethod, defaultKafkaConsumerConfiguration);
        configureValueDeserializer(findBodyArgument, defaultKafkaConsumerConfiguration);
        debugDeserializationConfiguration(executableMethod, defaultKafkaConsumerConfiguration);
    }

    private void configureKeyDeserializer(Argument<?> argument, ExecutableMethod<?, ?> executableMethod, DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration) {
        if (defaultKafkaConsumerConfiguration.getConfig().containsKey("key.deserializer") || !defaultKafkaConsumerConfiguration.getKeyDeserializer().isEmpty()) {
            return;
        }
        Optional or = Arrays.stream(executableMethod.getArguments()).filter(argument2 -> {
            return argument2.isAnnotationPresent(KafkaKey.class);
        }).findFirst().or(() -> {
            return Optional.ofNullable(argument).filter(KafkaConsumerProcessor::isConsumerRecord).flatMap(argument3 -> {
                return argument3.getTypeVariable("K");
            });
        });
        SerdeRegistry serdeRegistry = this.serdeRegistry;
        Objects.requireNonNull(serdeRegistry);
        Optional map = or.map(serdeRegistry::pickDeserializer);
        Objects.requireNonNull(defaultKafkaConsumerConfiguration);
        map.ifPresentOrElse(defaultKafkaConsumerConfiguration::setKeyDeserializer, () -> {
            defaultKafkaConsumerConfiguration.setKeyDeserializer(DEFAULT_KEY_DESERIALIZER);
        });
    }

    private void configureValueDeserializer(Argument<?> argument, DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration) {
        if (defaultKafkaConsumerConfiguration.getConfig().containsKey("value.deserializer") || !defaultKafkaConsumerConfiguration.getValueDeserializer().isEmpty()) {
            return;
        }
        Optional ofNullable = Optional.ofNullable(argument);
        Optional or = ofNullable.filter(KafkaConsumerProcessor::isConsumerRecord).flatMap(argument2 -> {
            return argument2.getTypeVariable("V");
        }).or(() -> {
            return ofNullable;
        });
        SerdeRegistry serdeRegistry = this.serdeRegistry;
        Objects.requireNonNull(serdeRegistry);
        Optional map = or.map(serdeRegistry::pickDeserializer);
        Objects.requireNonNull(defaultKafkaConsumerConfiguration);
        map.ifPresentOrElse(defaultKafkaConsumerConfiguration::setValueDeserializer, () -> {
            defaultKafkaConsumerConfiguration.setValueDeserializer(DEFAULT_VALUE_DESERIALIZER);
        });
    }

    private static boolean isConsumerRecord(@NonNull Argument<?> argument) {
        return ConsumerRecord.class.isAssignableFrom(argument.getType()) || ConsumerRecords.class.isAssignableFrom(argument.getType());
    }

    private static Argument<?> getComponentType(Argument<?> argument) {
        Class type = argument.getType();
        return type.isArray() ? Argument.of(type.getComponentType()) : (Argument) argument.getFirstTypeVariable().orElse(argument);
    }

    private static String logMethod(ExecutableMethod<?, ?> executableMethod) {
        return executableMethod.getDeclaringType().getSimpleName() + "#" + executableMethod.getName();
    }

    public /* bridge */ /* synthetic */ void process(BeanDefinition beanDefinition, Object obj) {
        process((BeanDefinition<?>) beanDefinition, (ExecutableMethod<?, ?>) obj);
    }
}
