package io.helidon.messaging.connectors.mock;

import io.helidon.common.reactive.BufferedEmittingPublisher;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.FlowAdapters;

@ApplicationScoped
@TestConnector
@Connector(MockConnector.CONNECTOR_NAME)
/* loaded from: input_file:io/helidon/messaging/connectors/mock/MockConnector.class */
public class MockConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    public static final String CONNECTOR_NAME = "helidon-mock";
    private final Map<String, BufferedEmittingPublisher<Message<?>>> emitterMap = new HashMap();
    private final Map<String, MockOutgoing<?>> subscriberMap = new HashMap();
    private final ReentrantLock emitterLock = new ReentrantLock();
    private final ReentrantLock subscriberLock = new ReentrantLock();

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        BufferedEmittingPublisher<Message<?>> emitter = incoming((String) config.getValue("channel-name", String.class), Object.class).emitter();
        config.getOptionalValues("mock-data", (Class) config.getOptionalValue("mock-data-type", Class.class).orElse(String.class)).ifPresent(list -> {
            list.forEach(obj -> {
                emitter.emit(Message.of(obj));
            });
        });
        return ReactiveStreams.fromPublisher(FlowAdapters.toPublisher(emitter)).map(obj -> {
            return obj instanceof Message ? (Message) obj : Message.of(obj);
        });
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        return ReactiveStreams.builder().to(outgoing((String) config.getValue("channel-name", String.class), Object.class).subscriber());
    }

    public <P> MockIncoming<P> incoming(String str, Class<P> cls) {
        this.emitterLock.lock();
        try {
            MockIncoming<P> mockIncoming = new MockIncoming<>(this.emitterMap.computeIfAbsent(str, str2 -> {
                return BufferedEmittingPublisher.create();
            }));
            this.emitterLock.unlock();
            return mockIncoming;
        } catch (Throwable th) {
            this.emitterLock.unlock();
            throw th;
        }
    }

    public <P> MockOutgoing<P> outgoing(String str, Class<P> cls) {
        this.subscriberLock.lock();
        try {
            MockOutgoing<P> mockOutgoing = (MockOutgoing) this.subscriberMap.computeIfAbsent(str, str2 -> {
                return new MockOutgoing(new MockSubscriber());
            });
            this.subscriberLock.unlock();
            return mockOutgoing;
        } catch (Throwable th) {
            this.subscriberLock.unlock();
            throw th;
        }
    }
}
