package io.micronaut.pulsar.intercept;

import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.MutableArgumentValue;
import io.micronaut.core.type.ReturnType;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.PulsarProducerRegistry;
import io.micronaut.pulsar.annotation.MessageKey;
import io.micronaut.pulsar.annotation.MessageProperties;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import io.micronaut.pulsar.events.ProducerSubscriptionFailedEvent;
import io.micronaut.pulsar.processor.DefaultSchemaHandler;
import jakarta.annotation.PreDestroy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterceptorBean({PulsarProducerClient.class})
/* loaded from: input_file:io/micronaut/pulsar/intercept/PulsarProducerAdvice.class */
public class PulsarProducerAdvice implements MethodInterceptor<Object, Object>, AutoCloseable, PulsarProducerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarProducerAdvice.class);
    protected final Map<String, Producer<?>> producers = new ConcurrentHashMap();
    protected final PulsarClient pulsarClient;
    protected final DefaultSchemaHandler simpleSchemaResolver;
    protected final BeanContext beanContext;
    protected final ApplicationEventPublisher<ProducerSubscriptionFailedEvent> applicationEventPublisher;
    protected final ConversionService conversionService;

    public PulsarProducerAdvice(PulsarClient pulsarClient, DefaultSchemaHandler defaultSchemaHandler, BeanContext beanContext, ApplicationEventPublisher<ProducerSubscriptionFailedEvent> applicationEventPublisher, ConversionService conversionService) {
        this.pulsarClient = pulsarClient;
        this.simpleSchemaResolver = defaultSchemaHandler;
        this.beanContext = beanContext;
        this.applicationEventPublisher = applicationEventPublisher;
        this.conversionService = conversionService;
    }

    public Object intercept(MethodInvocationContext<Object, Object> methodInvocationContext) {
        if (!methodInvocationContext.hasAnnotation(PulsarProducer.class)) {
            return methodInvocationContext.proceed();
        }
        AnnotationValue<PulsarProducer> annotationValue = (AnnotationValue) methodInvocationContext.findAnnotation(PulsarProducer.class).orElseThrow(() -> {
            return new IllegalStateException("No @PulsarProducer on method: " + methodInvocationContext);
        });
        boolean booleanValue = ((Boolean) annotationValue.booleanValue("sendBefore").orElse(false)).booleanValue();
        boolean isAbstract = methodInvocationContext.isAbstract();
        Object proceed = (isAbstract || booleanValue) ? null : methodInvocationContext.proceed();
        Object valueFromContext = getValueFromContext(methodInvocationContext);
        Object keyFromContext = getKeyFromContext(methodInvocationContext);
        Map<String, String> collectHeaders = collectHeaders(methodInvocationContext);
        ExecutableMethod<?, ?> executableMethod = methodInvocationContext.getExecutableMethod();
        Producer<?> orCreateProducer = getOrCreateProducer(executableMethod, annotationValue);
        ReturnType<?> returnType = executableMethod.getReturnType();
        if (returnType.isAsyncOrReactive()) {
            return isAbstract ? sendAsync(valueFromContext, orCreateProducer, returnType, keyFromContext, collectHeaders) : !booleanValue ? proceed : methodInvocationContext.proceed();
        }
        try {
            if (isAbstract) {
                return sendBlocking(valueFromContext, orCreateProducer, returnType, keyFromContext, collectHeaders);
            }
            sendBlocking(valueFromContext, orCreateProducer, ReturnType.of(Void.TYPE, new Argument[0]), keyFromContext, collectHeaders);
            return proceed;
        } catch (PulsarClientException e) {
            String producerName = orCreateProducer.getProducerName();
            LOG.error("Failed to produce message on producer {}", producerName, e);
            throw new RuntimeException("Failed to produce a message on " + producerName, e);
        }
    }

    @NonNull
    private static Object getValueFromContext(MethodInvocationContext<Object, Object> methodInvocationContext) {
        return methodInvocationContext.getParameters().size() == 1 ? methodInvocationContext.getParameterValues()[0] : methodInvocationContext.getParameters().values().stream().filter(mutableArgumentValue -> {
            return mutableArgumentValue.isAnnotationPresent(MessageBody.class);
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Producers with multiple values must have one argument annotated with @MessageBody");
        });
    }

    @Nullable
    private static Object getKeyFromContext(MethodInvocationContext<Object, Object> methodInvocationContext) {
        if (methodInvocationContext.getParameters().size() == 1) {
            return null;
        }
        return methodInvocationContext.getParameters().values().stream().filter(mutableArgumentValue -> {
            return mutableArgumentValue.isAnnotationPresent(MessageKey.class);
        }).map((v0) -> {
            return v0.getValue();
        }).findFirst().orElse(null);
    }

    private <T, V> Object sendAsync(V v, Producer<T> producer, ReturnType<?> returnType, @Nullable Object obj, Map<String, String> map) {
        CompletableFuture sendAsync = buildMessage(producer, v, obj, map).sendAsync();
        return CompletableFuture.class == returnType.getType() ? sendAsync : Publishers.convertPublisher(this.conversionService, sendAsync, returnType.getType());
    }

    private static <T, V> Object sendBlocking(V v, Producer<T> producer, ReturnType<?> returnType, @Nullable Object obj, Map<String, String> map) throws PulsarClientException {
        MessageId send = buildMessage(producer, v, obj, map).send();
        if (returnType.isVoid()) {
            return Void.TYPE;
        }
        if (returnType.getType() == MessageId.class) {
            return send;
        }
        if (returnType.getType() == v.getClass()) {
            return v;
        }
        throw new IllegalArgumentException("Pulsar abstract producers can only return MessageId or body being sent.");
    }

    private static <T, V> TypedMessageBuilder<?> buildMessage(Producer<T> producer, V v, @Nullable Object obj, Map<String, String> map) {
        TypedMessageBuilder<?> newMessage = producer.newMessage();
        if (null == obj) {
            newMessage.value(v);
        } else {
            newMessage.value(new KeyValue(obj, v));
        }
        if (!map.isEmpty()) {
            newMessage.properties(map);
        }
        return newMessage;
    }

    private static Map<String, String> collectHeaders(MethodInvocationContext<Object, Object> methodInvocationContext) {
        if (methodInvocationContext.getParameters().size() == 1) {
            return Collections.emptyMap();
        }
        List list = methodInvocationContext.getParameters().values().stream().filter(mutableArgumentValue -> {
            return mutableArgumentValue.isAnnotationPresent(MessageProperties.class) || mutableArgumentValue.isAnnotationPresent(MessageHeader.class);
        }).toList();
        return (list.size() == 1 && ((MutableArgumentValue) list.get(0)).isAnnotationPresent(MessageProperties.class)) ? (Map) ((MutableArgumentValue) list.get(0)).getValue() : (Map) list.stream().collect(Collectors.toMap(mutableArgumentValue2 -> {
            return (String) ((AnnotationValue) Objects.requireNonNull(mutableArgumentValue2.getAnnotation(MessageHeader.class))).stringValue().orElse(mutableArgumentValue2.getName());
        }, mutableArgumentValue3 -> {
            return (String) mutableArgumentValue3.getValue();
        }));
    }

    protected Producer<?> getOrCreateProducer(ExecutableMethod<?, ?> executableMethod, AnnotationValue<PulsarProducer> annotationValue) {
        String str = (String) annotationValue.stringValue("producerName").orElse(executableMethod.getMethodName());
        Producer<?> producer = this.producers.get(str);
        if (null == producer) {
            try {
                producer = (Producer) this.beanContext.createBean(Producer.class, new Object[]{this.pulsarClient, annotationValue, executableMethod.getArguments(), this.simpleSchemaResolver, executableMethod.getDescription(true)});
                this.producers.put(str, producer);
            } catch (Exception e) {
                if (MessageListenerException.class == e.getClass() && e.getMessage().startsWith("Topic")) {
                    LOG.error("Topic missing for producer {} {}", str, executableMethod.getDescription(false));
                } else {
                    LOG.error("Failed to create producer {} with reason: ", str, e);
                }
                this.applicationEventPublisher.publishEventAsync(new ProducerSubscriptionFailedEvent(str, e));
            }
        }
        return producer;
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        for (Producer<?> producer : this.producers.values()) {
            if (producer.isConnected()) {
                try {
                    producer.flush();
                    producer.close();
                } catch (Exception e) {
                    LOG.warn("Error shutting down Pulsar producer: {}", e.getMessage(), e);
                }
            }
        }
    }

    @Override // io.micronaut.pulsar.PulsarProducerRegistry
    public Map<String, Producer<?>> getProducers() {
        return this.producers;
    }

    @Override // io.micronaut.pulsar.PulsarProducerRegistry
    public Producer<?> getProducer(@NonNull String str) {
        return this.producers.get(str);
    }

    @Override // io.micronaut.pulsar.PulsarProducerRegistry
    public Set<String> getProducerIds() {
        return this.producers.keySet();
    }
}
