package io.micronaut.pulsar.intercept;

import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.annotation.PulsarReader;
import io.micronaut.pulsar.annotation.PulsarReaderClient;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;

@InterceptorBean({PulsarReaderClient.class})
/* loaded from: input_file:io/micronaut/pulsar/intercept/PulsarReaderAdvice.class */
public class PulsarReaderAdvice implements MethodInterceptor<Object, Object> {
    protected final BeanContext beanContext;
    protected final ConversionService conversionService;

    public PulsarReaderAdvice(BeanContext beanContext, ConversionService conversionService) {
        this.beanContext = beanContext;
        this.conversionService = conversionService;
    }

    public Object intercept(MethodInvocationContext<Object, Object> methodInvocationContext) {
        if (!methodInvocationContext.hasAnnotation(PulsarReader.class)) {
            return methodInvocationContext.proceed();
        }
        if (!methodInvocationContext.getExecutableMethod().isAbstract()) {
            throw new IllegalArgumentException(String.format("Non abstract method cannot be annotated as Readers: %s", methodInvocationContext.getExecutableMethod().getDescription(false)));
        }
        ReturnType<?> returnType = methodInvocationContext.getExecutableMethod().getReturnType();
        Argument asArgument = returnType.isAsyncOrReactive() ? (Argument) returnType.getFirstTypeVariable().orElseThrow(() -> {
            return new IllegalArgumentException("Could not extract return type for %s. Async / reactive ");
        }) : returnType.asArgument();
        AnnotationValue<PulsarReader> annotation = methodInvocationContext.getAnnotation(PulsarReader.class);
        Reader<?> reader = (Reader) this.beanContext.createBean(Reader.class, new Object[]{annotation, asArgument, methodInvocationContext});
        try {
            return read(reader, returnType, annotation);
        } catch (PulsarClientException e) {
            throw new MessageListenerException(String.format("Failed to read message on topic %s", reader.getTopic()), e);
        }
    }

    private Object read(Reader<?> reader, ReturnType<?> returnType, AnnotationValue<PulsarReader> annotationValue) throws PulsarClientException {
        return returnType.isAsyncOrReactive() ? Message.class.isAssignableFrom(((Argument) returnType.getFirstTypeVariable().orElseThrow(() -> {
            return new IllegalStateException("Missing inner type for async reader.");
        })).getType()) ? readAsync(returnType, reader.readNextAsync()) : readAsync(returnType, reader.readNextAsync().thenApply((v0) -> {
            return v0.getValue();
        })) : Message.class.isAssignableFrom(returnType.getType()) ? readBlocking(reader, annotationValue) : readBlocking(reader, annotationValue).getValue();
    }

    private static Message<?> readBlocking(Reader<?> reader, AnnotationValue<PulsarReader> annotationValue) throws PulsarClientException {
        int orElse = ((AnnotationValue) Objects.requireNonNull(annotationValue)).intValue("readTimeout").orElse(0);
        return orElse > 0 ? reader.readNext(orElse, (TimeUnit) annotationValue.get("timeoutUnit", TimeUnit.class).orElse(TimeUnit.SECONDS)) : reader.readNext();
    }

    private Object readAsync(ReturnType<?> returnType, CompletableFuture<?> completableFuture) {
        return CompletableFuture.class == returnType.getType() ? completableFuture : Publishers.convertPublisher(this.conversionService, completableFuture, returnType.getType());
    }
}
