package io.micronaut.pulsar.processor;

import io.micronaut.context.BeanContext;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.messaging.exceptions.MessageListenerException;
import io.micronaut.pulsar.PulsarConsumerRegistry;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import io.micronaut.pulsar.config.DefaultPulsarClientConfiguration;
import io.micronaut.pulsar.events.ConsumerSubscribedEvent;
import io.micronaut.pulsar.events.ConsumerSubscriptionFailedEvent;
import io.micronaut.pulsar.processor.TopicResolver;
import jakarta.inject.Singleton;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Internal
/* loaded from: input_file:io/micronaut/pulsar/processor/PulsarConsumerProcessor.class */
public class PulsarConsumerProcessor implements ExecutableMethodProcessor<PulsarConsumer>, AutoCloseable, PulsarConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarConsumerProcessor.class);
    protected final TopicResolver topicResolver;
    protected final DefaultPulsarClientConfiguration pulsarClientConfiguration;
    private final ApplicationEventPublisher<Object> applicationEventPublisher;
    private final BeanContext beanContext;
    private final PulsarClient pulsarClient;
    private final DefaultSchemaHandler simpleSchemaResolver;
    private final Map<String, Consumer<?>> consumers = new ConcurrentHashMap();
    private final Map<String, Consumer<?>> paused = new ConcurrentHashMap();
    private final AtomicInteger consumerCounter = new AtomicInteger(10);

    public PulsarConsumerProcessor(ApplicationEventPublisher<Object> applicationEventPublisher, BeanContext beanContext, PulsarClient pulsarClient, DefaultSchemaHandler defaultSchemaHandler, DefaultPulsarClientConfiguration defaultPulsarClientConfiguration, TopicResolver topicResolver) {
        this.applicationEventPublisher = applicationEventPublisher;
        this.beanContext = beanContext;
        this.pulsarClient = pulsarClient;
        this.simpleSchemaResolver = defaultSchemaHandler;
        this.pulsarClientConfiguration = defaultPulsarClientConfiguration;
        this.topicResolver = topicResolver;
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        AnnotationValue<PulsarConsumer> declaredAnnotation = executableMethod.getDeclaredAnnotation(PulsarConsumer.class);
        if (null == declaredAnnotation) {
            return;
        }
        String consumerName = getConsumerName(declaredAnnotation);
        TopicResolver.TopicResolved extractTopic = TopicResolver.extractTopic(declaredAnnotation, consumerName);
        String generateIdFromMessagingClientName = this.topicResolver.generateIdFromMessagingClientName(consumerName, extractTopic);
        if (this.consumers.containsKey(generateIdFromMessagingClientName)) {
            throw new MessageListenerException(String.format("Consumer %s already exists", generateIdFromMessagingClientName));
        }
        AnnotationValue<PulsarSubscription> annotation = executableMethod.getAnnotation(PulsarSubscription.class);
        if (ArrayUtils.isEmpty(executableMethod.getArguments())) {
            throw new MessageListenerException("Method annotated with PulsarConsumer must accept at least 1 parameter");
        }
        ConsumerBuilder<?> processConsumerAnnotation = processConsumerAnnotation(declaredAnnotation, annotation, executableMethod, this.beanContext.getBean(beanDefinition.getBeanType()), extractTopic);
        boolean booleanValue = ((Boolean) declaredAnnotation.getRequiredValue("subscribeAsync", Boolean.class)).booleanValue();
        processConsumerAnnotation.consumerName(consumerName);
        if (booleanValue) {
            processConsumerAnnotation.subscribeAsync().handle((consumer, th) -> {
                if (null != th) {
                    LOG.error("Failed subscribing Pulsar consumer {} {}", new Object[]{executableMethod.getDescription(false), generateIdFromMessagingClientName, th});
                    this.applicationEventPublisher.publishEventAsync(new ConsumerSubscriptionFailedEvent(th, generateIdFromMessagingClientName));
                    return new MessageListenerException("Failed to subscribe", th);
                }
                this.consumers.put(generateIdFromMessagingClientName, consumer);
                this.applicationEventPublisher.publishEventAsync(new ConsumerSubscribedEvent(consumer));
                return consumer;
            });
            return;
        }
        try {
            Consumer<?> subscribe = processConsumerAnnotation.subscribe();
            this.consumers.put(generateIdFromMessagingClientName, subscribe);
            this.applicationEventPublisher.publishEvent(new ConsumerSubscribedEvent(subscribe));
        } catch (Exception e) {
            LOG.error("Failed subscribing Pulsar consumer {} {}", new Object[]{executableMethod.getDescription(false), generateIdFromMessagingClientName, e});
            this.applicationEventPublisher.publishEvent(new ConsumerSubscriptionFailedEvent(e, generateIdFromMessagingClientName));
            throw new MessageListenerException("Failed to subscribe %s".formatted(generateIdFromMessagingClientName), e);
        }
    }

    @NotNull
    protected String getConsumerName(AnnotationValue<PulsarConsumer> annotationValue) {
        return (String) annotationValue.stringValue("consumerName").orElse("pulsar-consumer-" + this.consumerCounter.getAndIncrement());
    }

    private ConsumerBuilder<?> processConsumerAnnotation(AnnotationValue<PulsarConsumer> annotationValue, AnnotationValue<PulsarSubscription> annotationValue2, ExecutableMethod<Object, ?> executableMethod, Object obj, TopicResolver.TopicResolved topicResolved) {
        PulsarArgumentHandler pulsarArgumentHandler = new PulsarArgumentHandler(executableMethod.getArguments(), executableMethod.getDescription(false));
        ConsumerBuilderImpl consumerBuilderImpl = new ConsumerBuilderImpl(this.pulsarClient, this.simpleSchemaResolver.decideSchema(pulsarArgumentHandler.getBodyArgument(), pulsarArgumentHandler.getKeyArgument(), annotationValue, executableMethod.getDescription(false)));
        Optional stringValue = annotationValue.stringValue("consumerName");
        Objects.requireNonNull(consumerBuilderImpl);
        stringValue.ifPresent(consumerBuilderImpl::consumerName);
        resolveTopic(annotationValue, consumerBuilderImpl, topicResolved);
        resolveDeadLetter(annotationValue, consumerBuilderImpl);
        if (null != annotationValue2) {
            subscriptionValues(annotationValue2, consumerBuilderImpl);
        } else {
            consumerValues(annotationValue, consumerBuilderImpl);
        }
        annotationValue.stringValue("ackTimeout").map((v0) -> {
            return Duration.parse(v0);
        }).ifPresent(duration -> {
            long millis = duration.toMillis();
            if (1000 >= millis) {
                throw new MessageListenerException("Acknowledge timeout must be greater than 1 second");
            }
            consumerBuilderImpl.ackTimeout(millis, TimeUnit.MILLISECONDS);
        });
        consumerBuilderImpl.messageListener(new DefaultListener(executableMethod, pulsarArgumentHandler.isMessageWrapper(), obj, pulsarArgumentHandler));
        return consumerBuilderImpl;
    }

    private void resolveDeadLetter(AnnotationValue<PulsarConsumer> annotationValue, ConsumerBuilder<?> consumerBuilder) {
        if (this.pulsarClientConfiguration.getUseDeadLetterQueue().booleanValue()) {
            DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
            Optional stringValue = annotationValue.stringValue("deadLetterTopic");
            if (stringValue.isPresent()) {
                builder.deadLetterTopic(this.topicResolver.resolve((String) stringValue.get()));
            }
            int orElse = annotationValue.intValue("maxRetriesBeforeDlq").orElse(this.pulsarClientConfiguration.getDefaultMaxRetryDlq());
            if (0 < orElse) {
                builder.maxRedeliverCount(orElse);
                consumerBuilder.deadLetterPolicy(builder.build());
            }
        }
    }

    private void resolveTopic(AnnotationValue<PulsarConsumer> annotationValue, ConsumerBuilder<?> consumerBuilder, TopicResolver.TopicResolved topicResolved) {
        if (topicResolved.isPattern()) {
            resolveTopicsPattern(annotationValue, consumerBuilder, this.topicResolver.resolve(topicResolved.getTopic()));
            return;
        }
        if (!topicResolved.isArray()) {
            consumerBuilder.topic(new String[]{this.topicResolver.resolve(topicResolved.getTopic())});
            return;
        }
        Stream stream = Arrays.stream(topicResolved.getTopics());
        TopicResolver topicResolver = this.topicResolver;
        Objects.requireNonNull(topicResolver);
        consumerBuilder.topic((String[]) stream.map(topicResolver::resolve).toArray(i -> {
            return new String[i];
        }));
    }

    private void resolveTopicsPattern(AnnotationValue<PulsarConsumer> annotationValue, ConsumerBuilder<?> consumerBuilder, String str) {
        consumerBuilder.topicsPattern(str);
        consumerBuilder.subscriptionTopicsMode((RegexSubscriptionMode) annotationValue.getRequiredValue("subscriptionTopicsMode", RegexSubscriptionMode.class));
        OptionalInt intValue = annotationValue.intValue("patternAutoDiscoveryPeriod");
        if (intValue.isPresent()) {
            if (intValue.getAsInt() < 1) {
                throw new MessageListenerException("Topic " + str + " refresh time cannot be below 1 second.");
            }
            consumerBuilder.patternAutoDiscoveryPeriod(intValue.getAsInt(), TimeUnit.SECONDS);
        }
    }

    private void consumerValues(AnnotationValue<PulsarConsumer> annotationValue, ConsumerBuilder<?> consumerBuilder) {
        String str = (String) annotationValue.stringValue("subscription").orElseGet(() -> {
            return "pulsar-subscription-" + this.consumerCounter.incrementAndGet();
        });
        consumerBuilder.subscriptionName(str).subscriptionType((SubscriptionType) annotationValue.getRequiredValue("subscriptionType", SubscriptionType.class));
    }

    private void subscriptionValues(AnnotationValue<PulsarSubscription> annotationValue, ConsumerBuilder<?> consumerBuilder) {
        consumerBuilder.subscriptionName((String) annotationValue.stringValue("subscriptionName").orElse("pulsar-subscription-" + this.consumerCounter.incrementAndGet()));
        Optional enumValue = annotationValue.enumValue("subscriptionType", SubscriptionType.class);
        Objects.requireNonNull(consumerBuilder);
        enumValue.ifPresent(consumerBuilder::subscriptionType);
        Optional stringValue = annotationValue.stringValue("ackGroupTimeout");
        if (stringValue.isPresent()) {
            consumerBuilder.acknowledgmentGroupTime(Duration.parse((CharSequence) stringValue.get()).toNanos(), TimeUnit.NANOSECONDS);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        for (Consumer<?> consumer : getConsumers().values()) {
            try {
                consumer.unsubscribe();
                consumer.close();
            } catch (Exception e) {
                LOG.warn("Error shutting down Pulsar consumer: {}", e.getMessage(), e);
            }
        }
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    public Map<String, Consumer<?>> getConsumers() {
        return Collections.unmodifiableMap(this.consumers);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    @NonNull
    public <T> Consumer<T> getConsumer(@NonNull String str) {
        ArgumentUtils.requireNonNull("id", str);
        Consumer<?> consumer = this.consumers.get(str);
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        return consumer;
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    public boolean consumerExists(@NonNull String str) {
        return this.consumers.containsKey(str);
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    @NonNull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    public boolean isPaused(@NonNull String str) {
        if (StringUtils.isEmpty(str) || !this.consumers.containsKey(str)) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        return this.paused.containsKey(str);
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    public void pause(@NonNull String str) {
        if (StringUtils.isEmpty(str) || !this.consumers.containsKey(str)) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        Consumer<?> consumer = this.consumers.get(str);
        consumer.pause();
        this.paused.put(str, consumer);
    }

    @Override // io.micronaut.pulsar.PulsarConsumerRegistry
    public void resume(@NonNull String str) {
        if (StringUtils.isEmpty(str) || !this.paused.containsKey(str)) {
            throw new IllegalArgumentException("No paused consumer found for ID: " + str);
        }
        this.paused.remove(str).resume();
    }

    public /* bridge */ /* synthetic */ void process(BeanDefinition beanDefinition, Object obj) {
        process((BeanDefinition<?>) beanDefinition, (ExecutableMethod<?, ?>) obj);
    }
}
