package org.apache.druid.messages.server;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.apache.druid.messages.MessageBatch;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/messages/server/OutboxImplTest.class */
public class OutboxImplTest {
    private static final String HOST = "h1";
    private OutboxImpl<String> outbox;

    @Before
    public void setUp() {
        this.outbox = new OutboxImpl<>();
    }

    @After
    public void tearDown() {
        this.outbox.stop();
    }

    @Test
    public void test_normalOperation() throws InterruptedException, ExecutionException {
        ListenableFuture sendMessage = this.outbox.sendMessage(HOST, "1");
        ListenableFuture sendMessage2 = this.outbox.sendMessage(HOST, "2");
        ListenableFuture sendMessage3 = this.outbox.sendMessage(HOST, "3");
        long outboxEpoch = this.outbox.getOutboxEpoch(HOST);
        Assert.assertFalse(sendMessage.isDone());
        Assert.assertFalse(sendMessage2.isDone());
        Assert.assertFalse(sendMessage3.isDone());
        Assert.assertEquals(new MessageBatch(ImmutableList.of("1", "2", "3"), outboxEpoch, 0L), this.outbox.getMessages(HOST, -1L, 0L).get());
        Assert.assertFalse(sendMessage.isDone());
        Assert.assertFalse(sendMessage2.isDone());
        Assert.assertFalse(sendMessage3.isDone());
        Assert.assertEquals(new MessageBatch(ImmutableList.of("2", "3"), outboxEpoch, 1L), this.outbox.getMessages(HOST, outboxEpoch, 1L).get());
        Assert.assertTrue(sendMessage.isDone());
        Assert.assertFalse(sendMessage2.isDone());
        Assert.assertFalse(sendMessage3.isDone());
        ListenableFuture messages = this.outbox.getMessages(HOST, outboxEpoch, 3L);
        Assert.assertFalse(messages.isDone());
        Assert.assertTrue(sendMessage.isDone());
        Assert.assertTrue(sendMessage2.isDone());
        Assert.assertTrue(sendMessage3.isDone());
        ListenableFuture sendMessage4 = this.outbox.sendMessage(HOST, "4");
        Assert.assertTrue(messages.isDone());
        Assert.assertFalse(sendMessage4.isDone());
    }

    @Test
    public void test_getMessages_wrongEpoch() throws InterruptedException, ExecutionException {
        ListenableFuture sendMessage = this.outbox.sendMessage(HOST, "1");
        long outboxEpoch = this.outbox.getOutboxEpoch(HOST);
        Assert.assertEquals(new MessageBatch(Collections.emptyList(), outboxEpoch, 0L), (MessageBatch) this.outbox.getMessages(HOST, outboxEpoch + 1, 0L).get());
        Assert.assertFalse(sendMessage.isDone());
    }

    @Test
    public void test_getMessages_nonexistentHost() throws InterruptedException, ExecutionException {
        ListenableFuture messages = this.outbox.getMessages("nonexistent", -1L, 0L);
        Assert.assertFalse(messages.isDone());
        MatcherAssert.assertThat(Long.valueOf(this.outbox.getOutboxEpoch("nonexistent")), Matchers.greaterThanOrEqualTo(0L));
        ListenableFuture sendMessage = this.outbox.sendMessage("nonexistent", "foo");
        Assert.assertTrue(messages.isDone());
        Assert.assertEquals(new MessageBatch(ImmutableList.of("foo"), this.outbox.getOutboxEpoch("nonexistent"), 0L), messages.get());
        Assert.assertFalse(sendMessage.isDone());
        ListenableFuture messages2 = this.outbox.getMessages("nonexistent", this.outbox.getOutboxEpoch("nonexistent"), 1L);
        Assert.assertTrue(sendMessage.isDone());
        this.outbox.resetOutbox("nonexistent");
        Assert.assertTrue(messages2.isDone());
    }

    @Test
    public void test_stop_cancelsSendMessage() {
        ListenableFuture sendMessage = this.outbox.sendMessage(HOST, "1");
        this.outbox.stop();
        Assert.assertTrue(sendMessage.isCancelled());
    }

    @Test
    public void test_stop_cancelsGetMessages() {
        ListenableFuture messages = this.outbox.getMessages(HOST, -1L, 0L);
        this.outbox.stop();
        Assert.assertTrue(messages.isCancelled());
    }

    @Test
    public void test_reset_cancelsSendMessage() {
        ListenableFuture sendMessage = this.outbox.sendMessage(HOST, "1");
        this.outbox.resetOutbox(HOST);
        Assert.assertTrue(sendMessage.isCancelled());
    }

    @Test
    public void test_reset_cancelsGetMessages() {
        ListenableFuture messages = this.outbox.getMessages(HOST, -1L, 0L);
        this.outbox.resetOutbox(HOST);
        Assert.assertTrue(messages.isCancelled());
    }

    @Test
    public void test_reset_nonexistentHost_doesNothing() {
        this.outbox.resetOutbox("nonexistent");
    }

    @Test
    public void test_stop_preventsSendMessage() {
        this.outbox.stop();
        Assert.assertTrue(this.outbox.sendMessage(HOST, "1").isCancelled());
    }

    @Test
    public void test_stop_preventsGetMessages() {
        this.outbox.stop();
        Assert.assertTrue(this.outbox.getMessages(HOST, -1L, 0L).isCancelled());
    }
}
