package org.apache.druid.messages.client;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.messages.MessageBatch;
import org.apache.druid.messages.server.Outbox;
import org.apache.druid.messages.server.OutboxImpl;
import org.apache.druid.server.DruidNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/messages/client/MessageRelaysTest.class */
public class MessageRelaysTest {
    private static final String MY_HOST = "me";
    private static final DruidNode OUTBOX_NODE = new DruidNode("service", "host", false, 80, -1, true, false);
    private static final DiscoveryDruidNode OUTBOX_DISCO_NODE = new DiscoveryDruidNode(new DruidNode("service", "host", false, 80, -1, true, false), NodeRole.HISTORICAL, Collections.emptyMap());
    private Outbox<String> outbox;
    private TestMessageListener messageListener;
    private TestDiscovery discovery;
    private MessageRelays<String> messageRelays;

    /* loaded from: input_file:org/apache/druid/messages/client/MessageRelaysTest$OutboxMessageRelayClient.class */
    private static class OutboxMessageRelayClient implements MessageRelayClient<String> {
        private final Outbox<String> outbox;

        public OutboxMessageRelayClient(Outbox<String> outbox) {
            this.outbox = outbox;
        }

        public ListenableFuture<MessageBatch<String>> getMessages(String str, long j, long j2) {
            return this.outbox.getMessages(str, j, j2);
        }
    }

    /* loaded from: input_file:org/apache/druid/messages/client/MessageRelaysTest$TestDiscovery.class */
    private static class TestDiscovery implements DruidNodeDiscovery {

        @GuardedBy("this")
        private final List<DruidNodeDiscovery.Listener> listeners = new ArrayList();

        public Collection<DiscoveryDruidNode> getAllNodes() {
            throw new UnsupportedOperationException();
        }

        public synchronized void registerListener(DruidNodeDiscovery.Listener listener) {
            this.listeners.add(listener);
        }

        public synchronized void removeListener(DruidNodeDiscovery.Listener listener) {
            this.listeners.remove(listener);
        }

        public synchronized List<DruidNodeDiscovery.Listener> getListeners() {
            return ImmutableList.copyOf(this.listeners);
        }

        public synchronized void fire(Consumer<DruidNodeDiscovery.Listener> consumer) {
            Iterator<DruidNodeDiscovery.Listener> it = this.listeners.iterator();
            while (it.hasNext()) {
                consumer.accept(it.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/messages/client/MessageRelaysTest$TestMessageListener.class */
    private static class TestMessageListener implements MessageListener<String> {

        @GuardedBy("this")
        private long adds;

        @GuardedBy("this")
        private long removes;

        @GuardedBy("this")
        private final List<String> messages = new ArrayList();

        private TestMessageListener() {
        }

        public synchronized void serverAdded(DruidNode druidNode) {
            this.adds++;
        }

        public synchronized void messageReceived(String str) {
            this.messages.add(str);
        }

        public synchronized void serverRemoved(DruidNode druidNode) {
            this.removes++;
        }

        public synchronized long getAdds() {
            return this.adds;
        }

        public synchronized long getRemoves() {
            return this.removes;
        }

        public synchronized List<String> getMessages() {
            return ImmutableList.copyOf(this.messages);
        }
    }

    @Before
    public void setUp() {
        this.outbox = new OutboxImpl();
        this.messageListener = new TestMessageListener();
        this.discovery = new TestDiscovery();
        this.messageRelays = new MessageRelays<>(() -> {
            return this.discovery;
        }, druidNode -> {
            Assert.assertEquals(OUTBOX_NODE, druidNode);
            return new MessageRelay(MY_HOST, druidNode, new OutboxMessageRelayClient(this.outbox), this.messageListener);
        });
        this.messageRelays.start();
    }

    @After
    public void tearDown() {
        this.messageRelays.stop();
        Assert.assertEquals(Collections.emptyList(), this.discovery.getListeners());
    }

    @Test
    public void test_serverAdded_thenRemoved() {
        this.discovery.fire(listener -> {
            listener.nodesAdded(Collections.singletonList(OUTBOX_DISCO_NODE));
        });
        this.discovery.fire(listener2 -> {
            listener2.nodesRemoved(Collections.singletonList(OUTBOX_DISCO_NODE));
        });
        Assert.assertEquals(1L, this.messageListener.getAdds());
        Assert.assertEquals(1L, this.messageListener.getRemoves());
    }

    @Test
    public void test_messageListener() {
        this.discovery.fire(listener -> {
            listener.nodesAdded(Collections.singletonList(OUTBOX_DISCO_NODE));
        });
        Assert.assertEquals(1L, this.messageListener.getAdds());
        Assert.assertEquals(0L, this.messageListener.getRemoves());
        ListenableFuture sendMessage = this.outbox.sendMessage(MY_HOST, "foo");
        Assert.assertEquals(ImmutableList.of("foo"), this.messageListener.getMessages());
        Assert.assertTrue(sendMessage.isDone());
        ListenableFuture sendMessage2 = this.outbox.sendMessage(MY_HOST, "bar");
        Assert.assertEquals(ImmutableList.of("foo", "bar"), this.messageListener.getMessages());
        Assert.assertTrue(sendMessage2.isDone());
    }
}
