package io.micronaut.pulsar;

import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.BeanResolutionContext;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.inject.ArgumentInjectionPoint;
import io.micronaut.inject.ConstructorInjectionPoint;
import io.micronaut.inject.FieldInjectionPoint;
import io.micronaut.pulsar.annotation.PulsarReader;
import io.micronaut.pulsar.processor.DefaultSchemaHandler;
import io.micronaut.pulsar.processor.TopicResolver;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
/* loaded from: input_file:io/micronaut/pulsar/PulsarReaderFactory.class */
public class PulsarReaderFactory implements AutoCloseable, PulsarReaderRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarReaderFactory.class);
    private final Map<String, Reader<?>> readers = new ConcurrentHashMap();
    private final PulsarClient pulsarClient;
    private final DefaultSchemaHandler simpleSchemaResolver;
    private final TopicResolver topicResolver;

    public PulsarReaderFactory(PulsarClient pulsarClient, DefaultSchemaHandler defaultSchemaHandler, TopicResolver topicResolver) {
        this.pulsarClient = pulsarClient;
        this.simpleSchemaResolver = defaultSchemaHandler;
        this.topicResolver = topicResolver;
    }

    @Prototype
    public Reader<?> getReaderByInjectionPoint(BeanResolutionContext beanResolutionContext, @Nullable @Parameter AnnotationValue<PulsarReader> annotationValue, @Nullable @Parameter Argument<?> argument, @Nullable @Parameter MethodInvocationContext<?, ?> methodInvocationContext) throws PulsarClientException {
        return beanResolutionContext.getPath().currentSegment().isEmpty() ? getReaderForAnnotation((AnnotationValue) Objects.requireNonNull(annotationValue), (Argument) Objects.requireNonNull(argument), (MethodInvocationContext) Objects.requireNonNull(methodInvocationContext)) : getReaderByInjectionPoint(beanResolutionContext);
    }

    private Reader<?> getReaderByInjectionPoint(BeanResolutionContext beanResolutionContext) throws PulsarClientException {
        Argument<?> of;
        String name;
        String str;
        ArgumentInjectionPoint injectionPoint = ((BeanResolutionContext.Segment) beanResolutionContext.getPath().currentSegment().orElseThrow(() -> {
            return new IllegalStateException("Could not resolve current injection context while creating a reader");
        })).getInjectionPoint();
        AnnotationValue<PulsarReader> annotation = injectionPoint.getAnnotation(PulsarReader.class);
        if (null == annotation) {
            throw new IllegalStateException("Failed to get value for bean annotated with PulsarReader");
        }
        String str2 = (String) annotation.getRequiredValue(String.class);
        if (injectionPoint instanceof ArgumentInjectionPoint) {
            ArgumentInjectionPoint argumentInjectionPoint = injectionPoint;
            of = (Argument) argumentInjectionPoint.getArgument().getFirstTypeVariable().orElse(Argument.of(byte[].class));
            name = argumentInjectionPoint.getArgument().getName();
            str = argumentInjectionPoint.getDeclaringBean().getName() + " " + name;
            if ((argumentInjectionPoint.getOuterInjectionPoint() instanceof ConstructorInjectionPoint) && TopicResolver.isDynamicTenantInTopic(str2)) {
                throw new ConfigurationException(String.format("Cannot use dynamic tenant in topics for constructor injected Readers in %s", str));
            }
        } else if (injectionPoint instanceof FieldInjectionPoint) {
            FieldInjectionPoint fieldInjectionPoint = (FieldInjectionPoint) injectionPoint;
            of = (Argument) fieldInjectionPoint.asArgument().getFirstTypeVariable().orElse(Argument.of(byte[].class));
            name = fieldInjectionPoint.getName();
            str = fieldInjectionPoint.getDeclaringBean().getName() + "::" + name;
            if (TopicResolver.isDynamicTenantInTopic(str2)) {
                throw new ConfigurationException(String.format("Cannot use dynamic tenant in topics for field injected Readers in %s", str));
            }
        } else {
            of = Argument.of(byte[].class);
            name = injectionPoint.getDeclaringBean().getName();
            str = name;
            if (TopicResolver.isDynamicTenantInTopic(str2)) {
                throw new ConfigurationException(String.format("Cannot use dynamic tenant in topics for field injected Readers in %s", str));
            }
        }
        return getOrCreateReader(annotation, of, name, str);
    }

    private Reader<?> getReaderForAnnotation(AnnotationValue<PulsarReader> annotationValue, Argument<?> argument, MethodInvocationContext<?, ?> methodInvocationContext) throws PulsarClientException {
        return getOrCreateReader(annotationValue, argument, methodInvocationContext.getExecutableMethod().getName(), methodInvocationContext.getExecutableMethod().getDescription(false));
    }

    private Reader<?> getOrCreateReader(AnnotationValue<PulsarReader> annotationValue, Argument<?> argument, String str, String str2) throws PulsarClientException {
        Argument<?> argument2;
        Argument<?> argument3;
        if (KeyValue.class.isAssignableFrom(argument.getType())) {
            argument3 = argument.getTypeParameters()[0];
            argument2 = argument.getTypeParameters()[1];
        } else {
            argument2 = Message.class.isAssignableFrom(argument.getType()) ? (Argument) argument.getFirstTypeVariable().orElseThrow(() -> {
                return new ConfigurationException("Reader methods must return non-raw Message");
            }) : argument;
            argument3 = null;
        }
        String str3 = (String) annotationValue.stringValue("readerName").orElse(str);
        TopicResolver.TopicResolved extractTopic = TopicResolver.extractTopic(annotationValue, str3);
        String generateIdFromMessagingClientName = this.topicResolver.generateIdFromMessagingClientName(str3, extractTopic);
        if (this.readers.containsKey(generateIdFromMessagingClientName)) {
            return this.readers.get(generateIdFromMessagingClientName);
        }
        Schema<?> decideSchema = this.simpleSchemaResolver.decideSchema(argument2, argument3, annotationValue, str2);
        String resolve = this.topicResolver.resolve(extractTopic.getTopic());
        MessageId messageId = ((Boolean) annotationValue.getRequiredValue("startMessageLatest", Boolean.TYPE)).booleanValue() ? MessageId.latest : MessageId.earliest;
        Optional stringValue = annotationValue.stringValue("subscriptionName");
        ReaderBuilder readerBuilder = this.pulsarClient.newReader(decideSchema).startMessageId(messageId).readerName(generateIdFromMessagingClientName).topic(resolve);
        Objects.requireNonNull(readerBuilder);
        stringValue.ifPresent(readerBuilder::subscriptionName);
        Reader<?> create = readerBuilder.create();
        this.readers.put(generateIdFromMessagingClientName, create);
        return create;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Iterator<Reader<?>> it = this.readers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                LOG.warn("Error shutting down Pulsar reader: {}", e.getMessage(), e);
            }
        }
        this.readers.clear();
    }

    @Override // io.micronaut.pulsar.PulsarReaderRegistry
    public Reader<?> getReader(String str) {
        return this.readers.get(str);
    }

    @Override // io.micronaut.pulsar.PulsarReaderRegistry
    public Collection<Reader<?>> getReaders() {
        return this.readers.values();
    }
}
