package io.vertx.core.eventbus.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.concurrent.InboundMessageChannel;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;

/* loaded from: input_file:io/vertx/core/eventbus/impl/MessageConsumerImpl.class */
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private final boolean localOnly;
    private Handler<Message<T>> handler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private final int maxBufferedMessages;
    private final InboundMessageChannel<Message<T>> pending;
    private Promise<Void> result;
    private boolean registered;
    private boolean full;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageConsumerImpl(final ContextInternal contextInternal, EventBusImpl eventBusImpl, String str, boolean z, int i) {
        super(contextInternal, eventBusImpl, str, false);
        this.localOnly = z;
        this.result = contextInternal.promise();
        this.maxBufferedMessages = i;
        this.pending = new InboundMessageChannel<Message<T>>(contextInternal.executor(), contextInternal.executor(), i, i) { // from class: io.vertx.core.eventbus.impl.MessageConsumerImpl.1
            @Override // io.vertx.core.internal.concurrent.InboundMessageChannel
            protected void handleResume() {
                MessageConsumerImpl.this.full = false;
            }

            @Override // io.vertx.core.internal.concurrent.InboundMessageChannel
            protected void handlePause() {
                MessageConsumerImpl.this.full = true;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.vertx.core.internal.concurrent.InboundMessageChannel
            public void handleMessage(Message<T> message) {
                Handler<Message<T>> handler;
                synchronized (MessageConsumerImpl.this) {
                    handler = MessageConsumerImpl.this.handler;
                }
                if (handler != null) {
                    MessageConsumerImpl.this.dispatch(handler, message, contextInternal.duplicate());
                } else {
                    MessageConsumerImpl.this.handleDiscard(message, false);
                }
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.vertx.core.internal.concurrent.InboundMessageChannel
            public void handleDispose(Message<T> message) {
                MessageConsumerImpl.this.handleDiscard(message, false);
            }
        };
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public synchronized Future<Void> completion() {
        return this.result.future();
    }

    @Override // io.vertx.core.eventbus.impl.HandlerRegistration, io.vertx.core.eventbus.MessageConsumer
    public synchronized Future<Void> unregister() {
        this.handler = null;
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
        this.pending.close();
        Future<Void> unregister = super.unregister();
        if (this.registered) {
            this.registered = false;
            Promise<Void> promise = this.result;
            unregister.onComplete2(asyncResult -> {
                promise.tryFail("Consumer unregistered before registration completed");
            });
            this.result = this.context.promise();
        }
        return unregister;
    }

    private void handleDiscard(Message<T> message, boolean z) {
        if (this.discardHandler != null) {
            this.discardHandler.handle(message);
        } else if (z) {
            if (log.isWarnEnabled()) {
                log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
            }
        } else if (log.isWarnEnabled()) {
            log.warn("Discarding message since the consumer is not registered. address: " + this.address);
        }
        discardMessage(message);
    }

    @Override // io.vertx.core.eventbus.impl.HandlerRegistration
    protected void doReceive(Message<T> message) {
        if (this.full) {
            handleDiscard(message, true);
        } else {
            this.pending.write((InboundMessageChannel<Message<T>>) message);
        }
    }

    @Override // io.vertx.core.eventbus.impl.HandlerRegistration
    protected void dispatch(Message<T> message, ContextInternal contextInternal, Handler<Message<T>> handler) {
        if (handler == null) {
            throw new NullPointerException();
        }
        contextInternal.dispatch(message, handler);
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: handler */
    public synchronized MessageConsumer<T> handler2(Handler<Message<T>> handler) {
        if (handler != null) {
            synchronized (this) {
                this.handler = handler;
                if (!this.registered) {
                    this.registered = true;
                    Promise<Void> promise = this.result;
                    PromiseInternal<T> promise2 = this.context.promise();
                    register(true, this.localOnly, promise2);
                    promise2.future().onComplete2(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            promise.tryComplete();
                        } else {
                            promise.tryFail(asyncResult.cause());
                        }
                    });
                }
            }
        } else {
            unregister();
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: pause */
    public synchronized MessageConsumer<T> pause2() {
        this.pending.pause();
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: resume */
    public MessageConsumer<T> resume2() {
        return fetch2(Long.MAX_VALUE);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: fetch */
    public synchronized MessageConsumer<T> fetch2(long j) {
        this.pending.fetch(j);
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    public synchronized MessageConsumer<T> endHandler(Handler<Void> handler) {
        if (handler != null) {
            ContextInternal orCreateContext = this.context.owner().getOrCreateContext();
            this.endHandler = r5 -> {
                orCreateContext.runOnContext(r4 -> {
                    handler.handle(null);
                });
            };
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public synchronized Handler<Message<T>> getHandler() {
        return this.handler;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    public /* bridge */ /* synthetic */ ReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ ReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
