package io.smallrye.reactive.messaging.memory;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.memory.i18n.InMemoryExceptions;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(InMemoryConnector.CONNECTOR)
@ConnectorAttributes({@ConnectorAttribute(name = "run-on-vertx-context", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether messages are dispatched on the Vert.x context or not.", defaultValue = "false"), @ConnectorAttribute(name = "broadcast", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the messages are dispatched to multiple consumer", defaultValue = "false")})
/* loaded from: input_file:io/smallrye/reactive/messaging/memory/InMemoryConnector.class */
public class InMemoryConnector implements InboundConnector, OutboundConnector {
    public static final String CONNECTOR = "smallrye-in-memory";
    private final Map<String, InMemorySourceImpl<?>> sources = new HashMap();
    private final Map<String, InMemorySinkImpl<?>> sinks = new HashMap();

    @Inject
    ExecutionHolder executionHolder;

    /* loaded from: input_file:io/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySinkImpl.class */
    private static class InMemorySinkImpl<T> implements InMemorySink<T> {
        private final String name;
        private final List<Message<T>> list = new CopyOnWriteArrayList();
        private final AtomicReference<Throwable> failure = new AtomicReference<>();
        private final AtomicBoolean completed = new AtomicBoolean();
        private final Flow.Subscriber<? extends Message<T>> sink = MultiUtils.via(multi -> {
            return multi.call(message -> {
                this.list.add(message);
                UniCreate createFrom = Uni.createFrom();
                Objects.requireNonNull(message);
                Uni completionStage = createFrom.completionStage(message::ack);
                if (message.getMetadata(LocalContextMetadata.class).isPresent()) {
                    Context newInstance = Context.newInstance(((LocalContextMetadata) message.getMetadata(LocalContextMetadata.class).get()).context());
                    Objects.requireNonNull(newInstance);
                    completionStage = completionStage.emitOn(newInstance::runOnContext);
                }
                return completionStage;
            }).onFailure().invoke(th -> {
                this.failure.compareAndSet(null, th);
            }).onCompletion().invoke(() -> {
                this.completed.compareAndSet(false, true);
            });
        });

        private InMemorySinkImpl(String str) {
            this.name = str;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public String name() {
            return this.name;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public List<? extends Message<T>> received() {
            return new ArrayList(this.list);
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public void clear() {
            this.completed.set(false);
            this.failure.set(null);
            this.list.clear();
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public boolean hasCompleted() {
            return this.completed.get();
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public boolean hasFailed() {
            return getFailure() != null;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySink
        public Throwable getFailure() {
            return this.failure.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySourceImpl.class */
    public static class InMemorySourceImpl<T> implements InMemorySource<T> {
        private final Flow.Processor<Message<T>, Message<T>> processor;
        private final Flow.Publisher<? extends Message<T>> source;
        private final String name;
        private final Context context;
        private boolean runOnVertxContext;

        private InMemorySourceImpl(String str, Vertx vertx, boolean z, boolean z2) {
            this.name = str;
            this.context = vertx.getOrCreateContext();
            this.runOnVertxContext = z;
            if (z2) {
                this.processor = BroadcastProcessor.create();
            } else {
                this.processor = UnicastProcessor.create();
            }
            this.source = Multi.createFrom().publisher(this.processor);
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySource
        public String name() {
            return this.name;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySource
        public InMemorySource<T> send(T t) {
            if (t instanceof Message) {
                if (this.runOnVertxContext) {
                    this.context.runOnContext(() -> {
                        this.processor.onNext(ContextAwareMessage.withContextMetadata((Message) t));
                    });
                } else {
                    this.processor.onNext(ContextAwareMessage.withContextMetadata((Message) t));
                }
            } else if (this.runOnVertxContext) {
                this.context.runOnContext(() -> {
                    this.processor.onNext(ContextAwareMessage.of(t));
                });
            } else {
                this.processor.onNext(ContextAwareMessage.of(t));
            }
            return this;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySource
        public InMemorySource<T> runOnVertxContext(boolean z) {
            this.runOnVertxContext = z;
            return this;
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySource
        public void complete() {
            if (this.runOnVertxContext) {
                this.context.runOnContext(() -> {
                    this.processor.onComplete();
                });
            } else {
                this.processor.onComplete();
            }
        }

        @Override // io.smallrye.reactive.messaging.memory.InMemorySource
        public void fail(Throwable th) {
            if (this.runOnVertxContext) {
                this.context.runOnContext(() -> {
                    this.processor.onError(th);
                });
            } else {
                this.processor.onError(th);
            }
        }
    }

    public static Map<String, String> switchIncomingChannelsToInMemory(String... strArr) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str : strArr) {
            if (str == null || str.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String str2 = "mp.messaging.incoming." + str + ".connector";
            linkedHashMap.put(str2, CONNECTOR);
            System.setProperty(str2, CONNECTOR);
        }
        return linkedHashMap;
    }

    public static Map<String, String> switchOutgoingChannelsToInMemory(String... strArr) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str : strArr) {
            if (str == null || str.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String str2 = "mp.messaging.outgoing." + str + ".connector";
            linkedHashMap.put(str2, CONNECTOR);
            System.setProperty(str2, CONNECTOR);
        }
        return linkedHashMap;
    }

    public static void clear() {
        ((List) System.getProperties().entrySet().stream().filter(entry -> {
            return CONNECTOR.equals(entry.getValue());
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toList())).forEach(System::clearProperty);
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        InMemoryConnectorIncomingConfiguration inMemoryConnectorIncomingConfiguration = new InMemoryConnectorIncomingConfiguration(config);
        String channel = inMemoryConnectorIncomingConfiguration.getChannel();
        boolean booleanValue = inMemoryConnectorIncomingConfiguration.getBroadcast().booleanValue();
        Vertx vertx = this.executionHolder.vertx();
        boolean booleanValue2 = inMemoryConnectorIncomingConfiguration.getRunOnVertxContext().booleanValue();
        return ((InMemorySourceImpl) this.sources.computeIfAbsent(channel, str -> {
            return new InMemorySourceImpl(str, vertx, booleanValue2, booleanValue);
        })).source;
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        return ((InMemorySinkImpl) this.sinks.computeIfAbsent(new InMemoryConnectorOutgoingConfiguration(config).getChannel(), InMemorySinkImpl::new)).sink;
    }

    public <T> InMemorySource<T> source(String str) {
        if (str == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySourceImpl<?> inMemorySourceImpl = this.sources.get(str);
        if (inMemorySourceImpl == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(str);
        }
        return inMemorySourceImpl;
    }

    public <T> InMemorySink<T> sink(String str) {
        if (str == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySinkImpl<?> inMemorySinkImpl = this.sinks.get(str);
        if (inMemorySinkImpl == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(str);
        }
        return inMemorySinkImpl;
    }
}
