package org.springframework.cloud.stream.function;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshotFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
import org.springframework.cloud.stream.binding.NewDestinationBindingCallback;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.GlobalChannelInterceptorProcessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-4.1.4.jar:org/springframework/cloud/stream/function/StreamBridge.class */
public final class StreamBridge implements StreamOperations, SmartInitializingSingleton, DisposableBean {
    private static final String STREAM_BRIDGE_FUNC_NAME = "streamBridge";
    private final Map<String, MessageChannel> channelCache;
    private final FunctionCatalog functionCatalog;
    private final NewDestinationBindingCallback destinationBindingCallback;
    private final BindingServiceProperties bindingServiceProperties;
    private final ConfigurableApplicationContext applicationContext;
    private boolean initialized;
    private boolean async;
    private final BindingService bindingService;
    private final Map<Integer, SimpleFunctionRegistry.FunctionInvocationWrapper> streamBridgeFunctionCache;
    private final FunctionInvocationHelper<?> functionInvocationHelper;
    private static final boolean isContextPropagationPresent = ClassUtils.isPresent("io.micrometer.context.ContextSnapshotFactory", StreamBridge.class.getClassLoader());
    private static final ReentrantLock lock = new ReentrantLock();
    private final Log logger = LogFactory.getLog(getClass());
    private ExecutorService executorService = Executors.newCachedThreadPool();

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-4.1.4.jar:org/springframework/cloud/stream/function/StreamBridge$ContextPropagationHelper.class */
    private static final class ContextPropagationHelper {
        private ContextPropagationHelper() {
        }

        static ExecutorService wrap(ExecutorService executorService) {
            return ContextExecutorService.wrap(executorService, () -> {
                return ContextSnapshotFactory.builder().build().captureAll(new Object[0]);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamBridge(FunctionCatalog functionCatalog, final BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext configurableApplicationContext, @Nullable NewDestinationBindingCallback newDestinationBindingCallback) {
        Assert.notNull(functionCatalog, "'functionCatalog' must not be null");
        Assert.notNull(configurableApplicationContext, "'applicationContext' must not be null");
        Assert.notNull(bindingServiceProperties, "'bindingServiceProperties' must not be null");
        this.bindingService = (BindingService) configurableApplicationContext.getBean(BindingService.class);
        this.functionCatalog = functionCatalog;
        this.applicationContext = configurableApplicationContext;
        this.bindingServiceProperties = bindingServiceProperties;
        this.destinationBindingCallback = newDestinationBindingCallback;
        this.channelCache = new LinkedHashMap<String, MessageChannel>() { // from class: org.springframework.cloud.stream.function.StreamBridge.1
            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> entry) {
                boolean z = size() > bindingServiceProperties.getDynamicDestinationCacheSize();
                if (z) {
                    if (StreamBridge.this.logger.isDebugEnabled()) {
                        StreamBridge.this.logger.debug("Removing message channel from cache " + entry.getKey());
                    }
                    StreamBridge.this.bindingService.unbindProducers(entry.getKey());
                }
                return z;
            }
        };
        this.functionInvocationHelper = (FunctionInvocationHelper) configurableApplicationContext.getBean(FunctionInvocationHelper.class);
        this.streamBridgeFunctionCache = new HashMap();
    }

    @Override // org.springframework.cloud.stream.function.StreamOperations
    public boolean send(String str, Object obj) {
        return send(str, obj, determineContentType(str, this.bindingServiceProperties));
    }

    @Override // org.springframework.cloud.stream.function.StreamOperations
    public boolean send(String str, Object obj, MimeType mimeType) {
        return send(str, null, obj, mimeType);
    }

    @Override // org.springframework.cloud.stream.function.StreamOperations
    public boolean send(String str, @Nullable String str2, Object obj) {
        return send(str, str2, obj, determineContentType(str, this.bindingServiceProperties));
    }

    private static MimeType determineContentType(String str, BindingServiceProperties bindingServiceProperties) {
        BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(str);
        return StringUtils.hasText(bindingProperties.getContentType()) ? MimeType.valueOf(bindingProperties.getContentType()) : MimeTypeUtils.APPLICATION_JSON;
    }

    @Override // org.springframework.cloud.stream.function.StreamOperations
    public boolean send(String str, @Nullable String str2, Object obj, MimeType mimeType) {
        Message message;
        if (!this.initialized) {
            afterSingletonsInstantiated();
        }
        ProducerProperties producerProperties = this.bindingServiceProperties.getProducerProperties(str);
        MessageChannel resolveDestination = resolveDestination(str, producerProperties, str2);
        Function streamBridgeFunction = getStreamBridgeFunction(mimeType.toString(), producerProperties);
        if (producerProperties != null && producerProperties.isPartitioned()) {
            streamBridgeFunction = new PartitionAwareFunctionWrapper(streamBridgeFunction, this.applicationContext, producerProperties);
        }
        String resolveBinderTargetType = resolveBinderTargetType(str, str2, MessageChannel.class, (BinderFactory) this.applicationContext.getBean(BinderFactory.class));
        Message build = obj instanceof Message ? MessageBuilder.fromMessage((Message) obj).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, (Object) resolveBinderTargetType).build() : new GenericMessage(obj, (Map<String, Object>) Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, resolveBinderTargetType));
        synchronized (this) {
            message = (Message) streamBridgeFunction.apply(build);
        }
        if (message == null && build.getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
            message = build;
        }
        return resolveDestination.send((Message) this.functionInvocationHelper.postProcessResult(message, null));
    }

    private int hashProducerProperties(ProducerProperties producerProperties, String str) {
        return str.hashCode() + Boolean.hashCode(producerProperties.isUseNativeEncoding()) + Boolean.hashCode(producerProperties.isPartitioned()) + producerProperties.getPartitionCount();
    }

    private synchronized SimpleFunctionRegistry.FunctionInvocationWrapper getStreamBridgeFunction(String str, ProducerProperties producerProperties) {
        int hashProducerProperties = hashProducerProperties(producerProperties, str);
        if (this.streamBridgeFunctionCache.containsKey(Integer.valueOf(hashProducerProperties))) {
            return this.streamBridgeFunctionCache.get(Integer.valueOf(hashProducerProperties));
        }
        SimpleFunctionRegistry.FunctionInvocationWrapper functionInvocationWrapper = (SimpleFunctionRegistry.FunctionInvocationWrapper) this.functionCatalog.lookup(STREAM_BRIDGE_FUNC_NAME, str.toString());
        this.streamBridgeFunctionCache.put(Integer.valueOf(hashProducerProperties), functionInvocationWrapper);
        functionInvocationWrapper.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
        return functionInvocationWrapper;
    }

    @Override // org.springframework.beans.factory.SmartInitializingSingleton
    public void afterSingletonsInstantiated() {
        if (this.initialized) {
            return;
        }
        FunctionRegistration functionRegistration = new FunctionRegistration(new SimpleFunctionRegistry.PassThruFunction(), STREAM_BRIDGE_FUNC_NAME);
        functionRegistration.getProperties().put("singleton", "false");
        ((FunctionRegistry) this.functionCatalog).register(functionRegistration.type(ResolvableType.forClassWithGenerics((Class<?>) Function.class, (Class<?>[]) new Class[]{Object.class, Object.class}).getType()));
        this.initialized = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v58, types: [org.springframework.messaging.MessageChannel] */
    /* JADX WARN: Type inference failed for: r0v72, types: [org.springframework.messaging.MessageChannel] */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.springframework.messaging.MessageChannel] */
    public MessageChannel resolveDestination(String str, ProducerProperties producerProperties, String str2) {
        lock.lock();
        try {
            AbstractSubscribableChannel abstractSubscribableChannel = StringUtils.hasText(str2) ? this.channelCache.get(str2 + ":" + str) : this.channelCache.get(str);
            if (abstractSubscribableChannel == null) {
                if (this.applicationContext.containsBean(str)) {
                    abstractSubscribableChannel = (MessageChannel) this.applicationContext.getBean(str, MessageChannel.class);
                    String[] consumerBindingNames = this.bindingService.getConsumerBindingNames();
                    if (abstractSubscribableChannel instanceof AbstractMessageChannel) {
                        addPartitioningInterceptorIfNeedBe(producerProperties, str, abstractSubscribableChannel);
                    }
                    if (ObjectUtils.containsElement(consumerBindingNames, str)) {
                        this.logger.warn("You seem to be sending data to the input binding.  It is not recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
                    }
                } else {
                    abstractSubscribableChannel = isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
                    abstractSubscribableChannel.setApplicationContext(this.applicationContext);
                    abstractSubscribableChannel.setComponentName(str);
                    if (this.destinationBindingCallback != null) {
                        this.destinationBindingCallback.configure(str, abstractSubscribableChannel, producerProperties, this.bindingService.getExtendedProducerProperties(abstractSubscribableChannel, str));
                    }
                    Binder binder = null;
                    if (StringUtils.hasText(str2)) {
                        binder = ((BinderFactory) this.applicationContext.getBean(BinderFactory.class)).getBinder(str2, abstractSubscribableChannel.getClass());
                    }
                    addPartitioningInterceptorIfNeedBe(producerProperties, str, abstractSubscribableChannel);
                    addGlobalChannelInterceptorProcessor(abstractSubscribableChannel, str);
                    this.bindingService.bindProducer(abstractSubscribableChannel, str, true, binder);
                    if (StringUtils.hasText(str2)) {
                        this.channelCache.put(str2 + ":" + str, abstractSubscribableChannel);
                    } else {
                        this.channelCache.put(str, abstractSubscribableChannel);
                    }
                }
            }
            AbstractSubscribableChannel abstractSubscribableChannel2 = abstractSubscribableChannel;
            lock.unlock();
            return abstractSubscribableChannel2;
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private void addPartitioningInterceptorIfNeedBe(ProducerProperties producerProperties, String str, AbstractMessageChannel abstractMessageChannel) {
        if (producerProperties != null && producerProperties.isPartitioned() && producerProperties.isUseNativeEncoding()) {
            abstractMessageChannel.addInterceptor(new DefaultPartitioningInterceptor(this.bindingServiceProperties.getBindingProperties(str), this.applicationContext.getBeanFactory()));
        }
    }

    private String resolveBinderTargetType(String str, String str2, Class<?> cls, BinderFactory binderFactory) {
        return binderFactory.getBinder(str2 != null ? str2 : this.bindingServiceProperties.getBinder(str), cls).getClass().getSimpleName().startsWith("Rabbit") ? "amqp" : "kafka";
    }

    private void addGlobalChannelInterceptorProcessor(AbstractMessageChannel abstractMessageChannel, String str) {
        ((GlobalChannelInterceptorProcessor) this.applicationContext.getBean(GlobalChannelInterceptorProcessor.class)).postProcessAfterInitialization(abstractMessageChannel, str);
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() throws Exception {
        this.executorService.shutdown();
        if (!this.executorService.awaitTermination(AbstractComponentTracker.LINGERING_TIMEOUT, TimeUnit.MILLISECONDS)) {
            this.logger.warn("Failed to terminate executor. Terminating current tasks.");
            this.executorService.shutdownNow();
        }
        this.executorService = null;
        this.async = false;
        Set<String> keySet = this.channelCache.keySet();
        BindingService bindingService = this.bindingService;
        Objects.requireNonNull(bindingService);
        keySet.forEach(bindingService::unbindProducers);
        this.channelCache.clear();
    }

    public boolean isAsync() {
        return this.async;
    }

    public void setAsync(boolean z) {
        if (isContextPropagationPresent) {
            this.executorService = ContextPropagationHelper.wrap(this.executorService);
        }
        this.async = z;
    }
}
