package io.vertx.core.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.test.core.VertxTestBase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;

/* loaded from: input_file:io/vertx/core/eventbus/MessageConsumerTest.class */
public class MessageConsumerTest extends VertxTestBase {

    /* loaded from: input_file:io/vertx/core/eventbus/MessageConsumerTest$TestVerticle.class */
    private static class TestVerticle extends AbstractVerticle {
        private final CountDownLatch msgLatch;
        private final Map<String, Boolean> messageArrivedOnWorkerThread;

        private TestVerticle(Integer num) {
            this.messageArrivedOnWorkerThread = new HashMap();
            this.msgLatch = new CountDownLatch(num.intValue());
        }

        public void start() {
            handleMessages(this.vertx.eventBus().localConsumer("testAddress"));
        }

        private void handleMessages(MessageConsumer<String> messageConsumer) {
            messageConsumer.handler(message -> {
                messageConsumer.pause();
                this.messageArrivedOnWorkerThread.putIfAbsent(message.body(), Boolean.valueOf(Context.isOnWorkerThread()));
                this.msgLatch.countDown();
                this.vertx.setTimer(20L, l -> {
                    messageConsumer.resume();
                });
            });
        }
    }

    @Test
    public void testMessageConsumptionStayOnWorkerThreadAfterResume() throws Exception {
        TestVerticle testVerticle = new TestVerticle(2);
        Future deployVerticle = this.vertx.deployVerticle(testVerticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        deployVerticle.onComplete(onSuccess(str -> {
            countDownLatch.countDown();
        }));
        awaitLatch(countDownLatch);
        this.vertx.eventBus().send("testAddress", "message1");
        this.vertx.eventBus().send("testAddress", "message2");
        awaitLatch(testVerticle.msgLatch);
        assertEquals(2L, testVerticle.messageArrivedOnWorkerThread.size());
        assertTrue("message1 should be processed on worker thread", ((Boolean) testVerticle.messageArrivedOnWorkerThread.get("message1")).booleanValue());
        assertTrue("message2 should be processed on worker thread", ((Boolean) testVerticle.messageArrivedOnWorkerThread.get("message2")).booleanValue());
    }
}
