package io.micronaut.configuration.kafka;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Any;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.ArgumentInjectionPoint;
import io.micronaut.inject.FieldInjectionPoint;
import io.micronaut.inject.InjectionPoint;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
/* loaded from: input_file:io/micronaut/configuration/kafka/KafkaProducerFactory.class */
public class KafkaProducerFactory implements ProducerRegistry, TransactionalProducerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class);
    private static final String TRANSACTIONAL_ID = "transactionalId";
    private final Map<ClientKey, Producer> clients = new ConcurrentHashMap();
    private final BeanContext beanContext;
    private final SerdeRegistry serdeRegistry;
    private final ProducerFactory producerFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/configuration/kafka/KafkaProducerFactory$ClientKey.class */
    public static final class ClientKey {
        private final String id;
        private final Class<?> keyType;
        private final Class<?> valueType;
        private final boolean transactional;

        ClientKey(String str, Class<?> cls, Class<?> cls2, boolean z) {
            this.id = str;
            this.keyType = cls;
            this.valueType = cls2;
            this.transactional = z;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ClientKey clientKey = (ClientKey) obj;
            return Objects.equals(this.id, clientKey.id) && Objects.equals(this.keyType, clientKey.keyType) && Objects.equals(this.valueType, clientKey.valueType) && Objects.equals(Boolean.valueOf(this.transactional), Boolean.valueOf(clientKey.transactional));
        }

        public int hashCode() {
            return Objects.hash(this.id, this.keyType, this.valueType, Boolean.valueOf(this.transactional));
        }
    }

    public KafkaProducerFactory(BeanContext beanContext, SerdeRegistry serdeRegistry, ProducerFactory producerFactory) {
        this.beanContext = beanContext;
        this.serdeRegistry = serdeRegistry;
        this.producerFactory = producerFactory;
    }

    @Bean
    @Any
    public <K, V> Producer<K, V> getProducer(@Nullable InjectionPoint<KafkaProducer<K, V>> injectionPoint, @Nullable @Parameter AbstractKafkaProducerConfiguration<K, V> abstractKafkaProducerConfiguration) {
        Argument argument;
        if (injectionPoint == null) {
            if (abstractKafkaProducerConfiguration == null) {
                throw new ConfigurationException("No Kafka configuration specified when using direct instantiation");
            }
            Optional<Serializer<K>> keySerializer = abstractKafkaProducerConfiguration.getKeySerializer();
            Optional<Serializer<V>> valueSerializer = abstractKafkaProducerConfiguration.getValueSerializer();
            Properties config = abstractKafkaProducerConfiguration.getConfig();
            if (keySerializer.isPresent() && valueSerializer.isPresent()) {
                return this.producerFactory.createProducer(config, keySerializer.get(), valueSerializer.get());
            }
            if (keySerializer.isPresent() || valueSerializer.isPresent()) {
                throw new ConfigurationException("Both the [keySerializer] and [valueSerializer] must be set when setting either");
            }
            return this.producerFactory.createProducer(config, null, null);
        }
        if (injectionPoint instanceof FieldInjectionPoint) {
            argument = ((FieldInjectionPoint) injectionPoint).asArgument();
        } else {
            if (!(injectionPoint instanceof ArgumentInjectionPoint)) {
                throw new ConfigurationException("Cannot directly retrieve KafkaProducer instances. Use @Inject or constructor injection");
            }
            argument = ((ArgumentInjectionPoint) injectionPoint).getArgument();
        }
        Argument<?> argument2 = (Argument) argument.getTypeVariable("K").orElse(null);
        Argument<?> argument3 = (Argument) argument.getTypeVariable("V").orElse(null);
        if (argument2 == null || argument3 == null) {
            throw new ConfigurationException("@KafkaClient used on type missing generic argument values for Key and Value: " + injectionPoint);
        }
        AnnotationMetadata annotationMetadata = injectionPoint.getAnnotationMetadata();
        String str = (String) annotationMetadata.stringValue(KafkaClient.class).orElse(null);
        HashMap hashMap = new HashMap();
        Optional map = annotationMetadata.findAnnotation(KafkaClient.class).map(annotationValue -> {
            return annotationValue.getProperties("properties", "name");
        });
        Objects.requireNonNull(hashMap);
        map.ifPresent(hashMap::putAll);
        annotationMetadata.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(duration -> {
            hashMap.put("max.block.ms", String.valueOf(duration.toMillis()));
        });
        int orElse = annotationMetadata.intValue(KafkaClient.class, "acks").orElse(KafkaClient.Acknowledge.DEFAULT);
        if (orElse != Integer.MIN_VALUE) {
            hashMap.put("acks", orElse == -1 ? "all" : String.valueOf(orElse));
        }
        String str2 = (String) annotationMetadata.stringValue(KafkaClient.class, TRANSACTIONAL_ID).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).orElse(null);
        return (Producer) getKafkaProducer(str, str2, argument2, argument3, str2 != null, hashMap);
    }

    private <T> T getKafkaProducer(@Nullable String str, @Nullable String str2, Argument<?> argument, Argument<?> argument2, boolean z, @Nullable Map<String, String> map) {
        return (T) this.clients.computeIfAbsent(new ClientKey(str, argument.getType(), argument2.getType(), z), clientKey -> {
            Optional filter = Optional.ofNullable(str).filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            });
            DefaultKafkaProducerConfiguration defaultKafkaProducerConfiguration = new DefaultKafkaProducerConfiguration((AbstractKafkaProducerConfiguration) filter.flatMap(this::findConfigBean).or(() -> {
                return filter.flatMap(this::findHyphenatedConfigBean);
            }).orElseGet(this::getDefaultConfigBean));
            Properties config = defaultKafkaProducerConfiguration.getConfig();
            if (!config.containsKey("key.serializer")) {
                defaultKafkaProducerConfiguration.setKeySerializer(this.serdeRegistry.pickSerializer(argument));
            }
            if (!config.containsKey("value.serializer")) {
                defaultKafkaProducerConfiguration.setValueSerializer(this.serdeRegistry.pickSerializer(argument2));
            }
            if (StringUtils.isNotEmpty(str2)) {
                config.putIfAbsent("transactional.id", str2);
                config.putIfAbsent("enable.auto.commit", false);
            }
            filter.ifPresent(str3 -> {
                config.putIfAbsent("client.id", str3);
            });
            if (CollectionUtils.isNotEmpty(map)) {
                config.putAll(map);
            }
            Producer producer = (Producer) this.beanContext.createBean(Producer.class, new Object[]{defaultKafkaProducerConfiguration});
            if (z) {
                producer.initTransactions();
            }
            return producer;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @PreDestroy
    public void stop() {
        Iterator<Producer> it = this.clients.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                LOG.warn("Error shutting down Kafka producer: {}", e.getMessage(), e);
            }
        }
        this.clients.clear();
    }

    @Override // io.micronaut.configuration.kafka.ProducerRegistry
    public <K, V> Producer<K, V> getProducer(String str, Argument<K> argument, Argument<V> argument2) {
        return (Producer) getKafkaProducer(str, null, argument, argument2, false, null);
    }

    @Override // io.micronaut.configuration.kafka.TransactionalProducerRegistry
    public <K, V> Producer<K, V> getTransactionalProducer(String str, String str2, Argument<K> argument, Argument<V> argument2) {
        return (Producer) getKafkaProducer(str, str2, argument, argument2, true, null);
    }

    @Override // io.micronaut.configuration.kafka.TransactionalProducerRegistry
    public void close(Producer<?, ?> producer) {
        for (Map.Entry<ClientKey, Producer> entry : this.clients.entrySet()) {
            if (entry.getValue() == producer) {
                this.clients.remove(entry.getKey());
                return;
            }
        }
    }

    private Optional<AbstractKafkaProducerConfiguration> findConfigBean(String str) {
        return this.beanContext.findBean(AbstractKafkaProducerConfiguration.class, Qualifiers.byName(str));
    }

    private Optional<AbstractKafkaProducerConfiguration> findHyphenatedConfigBean(String str) {
        return NameUtils.isValidHyphenatedPropertyName(str) ? Optional.empty() : findConfigBean(NameUtils.hyphenate(str));
    }

    private AbstractKafkaProducerConfiguration getDefaultConfigBean() {
        return (AbstractKafkaProducerConfiguration) this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
    }
}
