package io.smallrye.reactive.messaging.mqtt.session.impl;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayProvider;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.smallrye.reactive.messaging.mqtt.session.SessionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.class */
public class MqttClientSessionImpl implements MqttClientSession {
    private static final Logger log = LoggerFactory.getLogger(MqttClientSessionImpl.class);
    private final VertxInternal vertx;
    private final MqttClientSessionOptions options;
    private final ReconnectDelayProvider reconnectDelay;
    private volatile boolean running;
    private MqttClient client;
    private Long reconnectTimer;
    private volatile Handler<MqttPublishMessage> messageHandler;
    private volatile Handler<Throwable> exceptionHandler;
    private volatile Handler<SessionEvent> sessionStateHandler;
    private volatile Handler<SubscriptionEvent> subscriptionStateHandler;
    private volatile Handler<Integer> publishCompleteHandler;
    private volatile Handler<Integer> publishCompletionExpirationHandler;
    private volatile Handler<Integer> publishCompletionUnknownPacketIdHandler;
    private final Map<String, RequestedQoS> subscriptions = new HashMap();
    private final Map<Integer, LinkedHashMap<String, RequestedQoS>> pendingSubscribes = new HashMap();
    private final Map<Integer, List<String>> pendingUnsubscribes = new HashMap();
    private volatile SessionState state = SessionState.DISCONNECTED;
    private final Map<String, SubscriptionState> subscriptionStates = new ConcurrentHashMap();
    private final List<Handler<AsyncResult<Void>>> notifyConnected = new LinkedList();
    private final List<Handler<AsyncResult<Void>>> notifyStopped = new LinkedList();
    private final Map<String, List<Handler<AsyncResult<Integer>>>> notifySubscribed = new HashMap();
    private final Map<String, List<Handler<AsyncResult<Void>>>> notifyUnsubscribed = new HashMap();

    public MqttClientSessionImpl(Vertx vertx, MqttClientSessionOptions mqttClientSessionOptions) {
        this.vertx = (VertxInternal) vertx;
        this.options = mqttClientSessionOptions;
        this.reconnectDelay = mqttClientSessionOptions.getReconnectDelay().createProvider();
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public Future<Void> start() {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(r5 -> {
            doStart(promise);
        });
        return promise.future();
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        try {
            this.vertx.runOnContext(r5 -> {
                doStop(promise);
            });
        } catch (RejectedExecutionException e) {
        }
        return promise.future();
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public SessionState getState() {
        return this.state;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public SubscriptionState getSubscriptionState(String str) {
        return this.subscriptionStates.get(str);
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public Future<Integer> subscribe(String str, RequestedQoS requestedQoS) {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(r9 -> {
            doSubscribe(str, requestedQoS, promise);
        });
        return promise.future();
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public Future<Void> unsubscribe(String str) {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(r7 -> {
            doUnsubscribe(str, promise);
        });
        return promise.future();
    }

    private void doStart(Handler<AsyncResult<Void>> handler) {
        if (this.running) {
            if (handler != null) {
                if (this.state == SessionState.CONNECTED) {
                    handler.handle(Future.succeededFuture());
                    return;
                } else {
                    this.notifyConnected.add(handler);
                    return;
                }
            }
            return;
        }
        if (handler != null) {
            this.notifyConnected.add(handler);
        }
        this.reconnectDelay.reset();
        this.running = true;
        switch (this.state) {
            case DISCONNECTED:
                createConnection();
                return;
            case CONNECTING:
            case CONNECTED:
            case DISCONNECTING:
            default:
                return;
        }
    }

    private void doStop(Handler<AsyncResult<Void>> handler) {
        if (!this.running) {
            if (handler != null) {
                if (this.state == SessionState.DISCONNECTED) {
                    handler.handle(Future.succeededFuture());
                    return;
                } else {
                    this.notifyStopped.add(handler);
                    return;
                }
            }
            return;
        }
        if (handler != null) {
            this.notifyStopped.add(handler);
        }
        this.running = false;
        if (this.reconnectTimer != null) {
            this.vertx.cancelTimer(this.reconnectTimer.longValue());
        }
        switch (this.state) {
            case DISCONNECTED:
            case CONNECTING:
            case DISCONNECTING:
            default:
                return;
            case CONNECTED:
                closeConnection((Throwable) new VertxException("Stop requested"));
                return;
        }
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession sessionStateHandler(Handler<SessionEvent> handler) {
        this.sessionStateHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession subscriptionStateHandler(Handler<SubscriptionEvent> handler) {
        this.subscriptionStateHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession publishCompletionHandler(Handler<Integer> handler) {
        this.publishCompleteHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession publishCompletionExpirationHandler(Handler<Integer> handler) {
        this.publishCompletionExpirationHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession publishCompletionUnknownPacketIdHandler(Handler<Integer> handler) {
        this.publishCompletionUnknownPacketIdHandler = handler;
        return this;
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public MqttClientSession messageHandler(Handler<MqttPublishMessage> handler) {
        this.messageHandler = handler;
        return this;
    }

    private void setState(SessionState sessionState, Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("setState - current: %s, next: %s", this.state, sessionState), th);
        }
        switch (sessionState) {
            case DISCONNECTED:
                this.pendingUnsubscribes.clear();
                this.pendingSubscribes.clear();
                Iterator<String> it = this.subscriptions.keySet().iterator();
                while (it.hasNext()) {
                    notifySubscriptionState(it.next(), SubscriptionState.UNSUBSCRIBED, null);
                }
                break;
            case CONNECTED:
                this.reconnectDelay.reset();
                break;
        }
        if (this.state != sessionState) {
            this.state = sessionState;
            Handler<SessionEvent> handler = this.sessionStateHandler;
            if (handler != null) {
                handler.handle(new SessionEventImpl(sessionState, th));
            }
        }
        switch (this.state) {
            case DISCONNECTED:
                if (this.running) {
                    scheduleReconnect();
                    return;
                }
                Iterator<Handler<AsyncResult<Void>>> it2 = this.notifyConnected.iterator();
                while (it2.hasNext()) {
                    it2.next().handle(Future.failedFuture("Session stopped"));
                }
                this.notifyConnected.clear();
                Iterator<Handler<AsyncResult<Void>>> it3 = this.notifyStopped.iterator();
                while (it3.hasNext()) {
                    it3.next().handle(Future.succeededFuture());
                }
                this.notifyStopped.clear();
                return;
            case CONNECTING:
            case DISCONNECTING:
            default:
                return;
            case CONNECTED:
                if (!this.running) {
                    closeConnection((Throwable) null);
                    return;
                }
                Iterator<Handler<AsyncResult<Void>>> it4 = this.notifyConnected.iterator();
                while (it4.hasNext()) {
                    it4.next().handle(Future.succeededFuture());
                }
                this.notifyConnected.clear();
                return;
        }
    }

    private void notifySubscriptionState(String str, SubscriptionState subscriptionState, Integer num) {
        List<Handler<AsyncResult<Integer>>> remove;
        List<Handler<AsyncResult<Void>>> remove2;
        if (log.isDebugEnabled()) {
            log.debug(String.format("notifySubscriptionState - topic: %s, state: %s, grantedQoS: %s", str, subscriptionState, num));
        }
        this.subscriptionStates.put(str, subscriptionState);
        Handler<SubscriptionEvent> handler = this.subscriptionStateHandler;
        if (handler != null) {
            handler.handle(new SubscriptionEventImpl(str, subscriptionState, num));
        }
        if ((subscriptionState == SubscriptionState.SUBSCRIBED || subscriptionState == SubscriptionState.FAILED) && (remove = this.notifySubscribed.remove(str)) != null) {
            for (Handler<AsyncResult<Integer>> handler2 : remove) {
                if (num != null) {
                    handler2.handle(Future.succeededFuture(num));
                } else {
                    handler2.handle(Future.failedFuture("Unable to subscribe"));
                }
            }
        }
        if (subscriptionState != SubscriptionState.UNSUBSCRIBED || (remove2 = this.notifyUnsubscribed.remove(str)) == null) {
            return;
        }
        for (Handler<AsyncResult<Void>> handler3 : remove2) {
            if (num != null) {
                handler3.handle(Future.succeededFuture());
            } else {
                handler3.handle(Future.failedFuture("Unable to subscribe"));
            }
        }
    }

    private void scheduleReconnect() {
        log.debug("Scheduling reconnect");
        if (this.reconnectTimer == null) {
            Duration nextDelay = nextDelay();
            if (log.isDebugEnabled()) {
                log.debug("Next delay: " + String.valueOf(nextDelay));
            }
            long timer = this.vertx.setTimer(nextDelay.toMillis(), l -> {
                createConnection();
            });
            if (log.isDebugEnabled()) {
                log.debug("Timer set: " + timer);
            }
            this.reconnectTimer = Long.valueOf(timer);
        }
    }

    private Duration nextDelay() {
        return this.reconnectDelay.nextDelay();
    }

    private void createConnection() {
        log.debug("Creating connection");
        this.reconnectTimer = null;
        this.client = MqttClient.create(this.vertx, this.options);
        this.client.exceptionHandler(this::exceptionCaught);
        this.client.closeHandler(r3 -> {
            connectionClosed();
        });
        this.client.publishHandler(this::serverPublished);
        this.client.subscribeCompletionHandler(this::subscribeCompleted);
        this.client.unsubscribeCompletionHandler(this::unsubscribeCompleted);
        this.client.publishCompletionHandler(this::publishComplete);
        this.client.publishCompletionExpirationHandler(this::publishExpired);
        this.client.publishCompletionUnknownPacketIdHandler(this::publishCompletionUnknown);
        setState(SessionState.CONNECTING, null);
        this.client.connect(this.options.getPort(), this.options.getHostname(), this.options.getServerName().orElse(this.options.getHostname()), this::connectCompleted);
    }

    private void exceptionCaught(Throwable th) {
        log.debug("Caught exception", th);
        closeConnection(th);
        Handler<Throwable> handler = this.exceptionHandler;
        if (handler != null) {
            handler.handle(th);
        }
    }

    private void closeConnection(Throwable th) {
        log.debug("Closing connection", th);
        setState(SessionState.DISCONNECTING, th);
        this.client.disconnect().onComplete(this::disconnectCompleted);
    }

    private void connectCompleted(AsyncResult<MqttConnAckMessage> asyncResult) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Connect completed - result: %s, cause: %s", asyncResult.result(), asyncResult.cause()));
        }
        if (asyncResult.failed() || asyncResult.result() == null) {
            setState(SessionState.DISCONNECTED, asyncResult.cause());
            return;
        }
        MqttConnAckMessage mqttConnAckMessage = (MqttConnAckMessage) asyncResult.result();
        setState(SessionState.CONNECTED, null);
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = Boolean.valueOf(!this.subscriptions.isEmpty());
            objArr[1] = Boolean.valueOf(this.options.isCleanSession());
            objArr[2] = Boolean.valueOf(mqttConnAckMessage.isSessionPresent());
            logger.debug(String.format("Subscriptions: %s, cleanSession: %s, sessionPresent: %s", objArr));
        }
        if (this.options.isCleanSession() || !mqttConnAckMessage.isSessionPresent()) {
            requestSubscribe(new LinkedHashMap<>(this.subscriptions));
        } else {
            log.debug("Session present on broker, subscriptions request not sent. Be sure that the subscriptions on the broker side are the same that this client needs.");
            this.subscriptions.forEach((str, requestedQoS) -> {
                notifySubscriptionState(str, SubscriptionState.SUBSCRIBED, Integer.valueOf(requestedQoS.toInteger()));
            });
        }
    }

    private void disconnectCompleted(AsyncResult<?> asyncResult) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Disconnect completed - result: %s, cause: %s", asyncResult.result(), asyncResult.cause()));
        }
        connectionClosed(asyncResult.cause());
    }

    private void closeConnection(String str) {
        closeConnection(new VertxException(str).fillInStackTrace());
    }

    private void connectionClosed() {
        if (this.state != SessionState.DISCONNECTING) {
            connectionClosed(new VertxException("Connection closed"));
        }
    }

    private void connectionClosed(Throwable th) {
        if (log.isDebugEnabled()) {
            log.debug("Connection closed", th);
        } else {
            log.info("Connection closed: " + (th != null ? th.getMessage() : "<unknown>"));
        }
        if (this.client != null) {
            this.client.exceptionHandler((Handler) null);
            this.client.publishHandler((Handler) null);
            this.client.closeHandler((Handler) null);
            this.client.subscribeCompletionHandler((Handler) null);
            this.client.publishCompletionHandler((Handler) null);
            this.client.publishCompletionExpirationHandler((Handler) null);
            this.client.publishCompletionUnknownPacketIdHandler((Handler) null);
            this.client = null;
        }
        setState(SessionState.DISCONNECTED, th);
    }

    private void serverPublished(MqttPublishMessage mqttPublishMessage) {
        if (log.isDebugEnabled()) {
            log.debug("Server published: " + String.valueOf(mqttPublishMessage));
        }
        Handler<MqttPublishMessage> handler = this.messageHandler;
        if (handler != null) {
            handler.handle(mqttPublishMessage);
        }
    }

    private void doSubscribe(String str, RequestedQoS requestedQoS, Handler<AsyncResult<Integer>> handler) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Request to subscribe to: %s / %s", str, requestedQoS));
        }
        RequestedQoS requestedQoS2 = this.subscriptions.get(str);
        if (requestedQoS2 != null) {
            if (log.isDebugEnabled()) {
                log.debug("Already subscribed with: " + String.valueOf(requestedQoS2));
            }
            if (handler != null) {
                handler.handle(Future.succeededFuture(Integer.valueOf(requestedQoS2.toInteger())));
                return;
            }
            return;
        }
        this.subscriptions.put(str, requestedQoS);
        if (handler != null) {
            this.notifySubscribed.computeIfAbsent(str, str2 -> {
                return new LinkedList();
            }).add(handler);
        }
        if (log.isDebugEnabled()) {
            log.debug(String.format("Requesting subscribe: %s / %s", str, requestedQoS));
        }
        requestSubscribe(new LinkedHashMap<>(Collections.singletonMap(str, requestedQoS)));
    }

    private void doUnsubscribe(String str, Handler<AsyncResult<Void>> handler) {
        if (this.subscriptions.remove(str) == null) {
            handler.handle(Future.succeededFuture());
            return;
        }
        if (handler != null) {
            this.notifyUnsubscribed.computeIfAbsent(str, str2 -> {
                return new LinkedList();
            }).add(handler);
        }
        if (log.isDebugEnabled()) {
            log.debug("Requesting unsubscribe: " + str);
        }
        requestUnsubscribe(Collections.singletonList(str));
    }

    private void requestSubscribe(LinkedHashMap<String, RequestedQoS> linkedHashMap) {
        if (linkedHashMap.isEmpty() || this.client == null || !this.client.isConnected()) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Request Subscribe to: " + String.valueOf(linkedHashMap));
        }
        this.client.subscribe((Map) linkedHashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Integer.valueOf(((RequestedQoS) entry.getValue()).toInteger());
        }))).onComplete(asyncResult -> {
            subscribeSent(asyncResult, linkedHashMap);
        });
    }

    private void requestUnsubscribe(List<String> list) {
        if (list.isEmpty() || this.client == null || !this.client.isConnected()) {
            return;
        }
        for (String str : list) {
            this.client.unsubscribe(str).onComplete(asyncResult -> {
                unsubscribeSent(asyncResult, Collections.singletonList(str));
            });
        }
    }

    private void subscribeSent(AsyncResult<Integer> asyncResult, LinkedHashMap<String, RequestedQoS> linkedHashMap) {
        if (asyncResult.failed() || asyncResult.result() == null) {
            Iterator<String> it = linkedHashMap.keySet().iterator();
            while (it.hasNext()) {
                notifySubscriptionState(it.next(), SubscriptionState.UNSUBSCRIBED, null);
            }
        } else {
            Iterator<String> it2 = linkedHashMap.keySet().iterator();
            while (it2.hasNext()) {
                notifySubscriptionState(it2.next(), SubscriptionState.SUBSCRIBING, null);
            }
            this.pendingSubscribes.put((Integer) asyncResult.result(), linkedHashMap);
        }
    }

    private void unsubscribeSent(AsyncResult<Integer> asyncResult, List<String> list) {
        if (asyncResult.failed() || asyncResult.result() == null) {
            closeConnection(String.format("Failed to send unsubscribe request: %s", asyncResult.cause()));
        } else {
            this.pendingUnsubscribes.put((Integer) asyncResult.result(), list);
        }
    }

    private void subscribeCompleted(MqttSubAckMessage mqttSubAckMessage) {
        LinkedHashMap<String, RequestedQoS> remove = this.pendingSubscribes.remove(Integer.valueOf(mqttSubAckMessage.messageId()));
        if (remove == null) {
            closeConnection(String.format("Unexpected subscription ack response - messageId: %s", Integer.valueOf(mqttSubAckMessage.messageId())));
            return;
        }
        if (remove.size() != mqttSubAckMessage.grantedQoSLevels().size()) {
            closeConnection(String.format("Mismatch of topics on subscription ack - expected: %d, actual: %d", Integer.valueOf(remove.size()), Integer.valueOf(mqttSubAckMessage.grantedQoSLevels().size())));
            return;
        }
        int i = 0;
        Iterator<String> it = remove.keySet().iterator();
        while (it.hasNext()) {
            notifySubscriptionState(it.next(), SubscriptionState.SUBSCRIBED, (Integer) mqttSubAckMessage.grantedQoSLevels().get(i));
            i++;
        }
    }

    private void unsubscribeCompleted(Integer num) {
        List<String> remove = this.pendingUnsubscribes.remove(num);
        if (remove != null) {
            Iterator<String> it = remove.iterator();
            while (it.hasNext()) {
                notifySubscriptionState(it.next(), SubscriptionState.UNSUBSCRIBED, null);
            }
        }
    }

    @Override // io.smallrye.reactive.messaging.mqtt.session.MqttClientSession
    public Future<Integer> publish(String str, Buffer buffer, MqttQoS mqttQoS, boolean z, boolean z2) {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(r14 -> {
            doPublish(str, buffer, mqttQoS, z, z2).onComplete(promise);
        });
        return promise.future();
    }

    private Future<Integer> doPublish(String str, Buffer buffer, MqttQoS mqttQoS, boolean z, boolean z2) {
        return (this.client == null || !this.client.isConnected()) ? Future.failedFuture("Session is not connected") : this.client.publish(str, buffer, mqttQoS, z, z2);
    }

    private void publishComplete(Integer num) {
        Handler<Integer> handler = this.publishCompleteHandler;
        if (handler != null) {
            handler.handle(num);
        }
    }

    private void publishExpired(Integer num) {
        Handler<Integer> handler = this.publishCompletionExpirationHandler;
        if (handler != null) {
            handler.handle(num);
        }
    }

    private void publishCompletionUnknown(Integer num) {
        Handler<Integer> handler = this.publishCompletionUnknownPacketIdHandler;
        if (handler != null) {
            handler.handle(num);
        }
    }
}
