package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.MqttComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.class */
public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter<IMqttAsyncClient, MqttConnectionOptions> implements MqttCallback, MqttComponent<MqttConnectionOptions> {
    private final Lock lock;
    private final MqttConnectionOptions connectionOptions;
    private IMqttAsyncClient mqttClient;

    @Nullable
    private MqttClientPersistence persistence;
    private SmartMessageConverter messageConverter;
    private Class<?> payloadType;
    private HeaderMapper<MqttProperties> headerMapper;
    private volatile boolean readyToSubscribeOnStart;

    /* loaded from: input_file:org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter$AcknowledgmentImpl.class */
    private static class AcknowledgmentImpl implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttAsyncClient ackClient;

        AcknowledgmentImpl(int i, int i2, IMqttAsyncClient iMqttAsyncClient) {
            this.id = i;
            this.qos = i2;
            this.ackClient = iMqttAsyncClient;
        }

        public void acknowledge() {
            try {
                this.ackClient.messageArrivedComplete(this.id, this.qos);
            } catch (MqttException e) {
                throw new IllegalStateException((Throwable) e);
            }
        }
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(String str, String str2, String... strArr) {
        super(str, str2, strArr);
        this.lock = new ReentrantLock();
        this.payloadType = byte[].class;
        this.headerMapper = new MqttHeaderMapper();
        Assert.hasText(str, "'url' cannot be null or empty");
        this.connectionOptions = new MqttConnectionOptions();
        this.connectionOptions.setServerURIs(new String[]{str});
        this.connectionOptions.setAutomaticReconnect(true);
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions mqttConnectionOptions, String str, String... strArr) {
        super(obtainServerUrlFromOptions(mqttConnectionOptions), str, strArr);
        this.lock = new ReentrantLock();
        this.payloadType = byte[].class;
        this.headerMapper = new MqttHeaderMapper();
        this.connectionOptions = mqttConnectionOptions;
        if (this.connectionOptions.isAutomaticReconnect()) {
            return;
        }
        this.logger.warn("It is recommended to set 'automaticReconnect' MQTT client option. Otherwise the current channel adapter restart should be used explicitly, e.g. via handling 'MqttConnectionFailedEvent' on client disconnection.");
    }

    public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager, String... strArr) {
        super(clientManager, strArr);
        this.lock = new ReentrantLock();
        this.payloadType = byte[].class;
        this.headerMapper = new MqttHeaderMapper();
        this.connectionOptions = clientManager.getConnectionInfo();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.springframework.integration.mqtt.core.MqttComponent
    public MqttConnectionOptions getConnectionInfo() {
        return this.connectionOptions;
    }

    public void setPersistence(@Nullable MqttClientPersistence mqttClientPersistence) {
        this.persistence = mqttClientPersistence;
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void setConverter(MqttMessageConverter mqttMessageConverter) {
        throw new UnsupportedOperationException("Use setMessageConverter(SmartMessageConverter) instead");
    }

    public void setMessageConverter(SmartMessageConverter smartMessageConverter) {
        this.messageConverter = smartMessageConverter;
    }

    public void setPayloadType(Class<?> cls) {
        Assert.notNull(cls, "'payloadType' must not be null.");
        this.payloadType = cls;
    }

    public void setHeaderMapper(HeaderMapper<MqttProperties> headerMapper) {
        Assert.notNull(headerMapper, "'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void onInit() {
        super.onInit();
        if (getClientManager() == null && this.mqttClient == null) {
            try {
                this.mqttClient = new MqttAsyncClient(getUrl(), getClientId(), this.persistence);
                this.mqttClient.setCallback(this);
                this.mqttClient.setManualAcks(isManualAcks());
            } catch (MqttException e) {
                throw new BeanCreationException("Cannot create 'MqttAsyncClient' for: " + getComponentName(), e);
            }
        }
        if (this.messageConverter == null) {
            setMessageConverter((SmartMessageConverter) getBeanFactory().getBean("integrationArgumentResolverMessageConverter", SmartMessageConverter.class));
        }
    }

    protected void doStart() {
        try {
            connect();
            if (this.readyToSubscribeOnStart) {
                subscribe();
            }
        } catch (MqttException e) {
            if (getConnectionInfo().isAutomaticReconnect()) {
                try {
                    this.mqttClient.reconnect();
                    return;
                } catch (MqttException e2) {
                    this.logger.error(e2, "MQTT client failed to connect. Never happens.");
                    return;
                }
            }
            ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
            }
            this.logger.error(e, "MQTT client failed to connect.");
        }
    }

    private void connect() throws MqttException {
        this.lock.lock();
        try {
            ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager = getClientManager();
            if (clientManager == null) {
                this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
            } else {
                this.mqttClient = clientManager.getClient();
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected void doStop() {
        this.topicLock.lock();
        this.readyToSubscribeOnStart = false;
        String[] topic = getTopic();
        try {
            if (this.mqttClient != null && this.mqttClient.isConnected()) {
                if (this.connectionOptions.isCleanStart()) {
                    unsubscribe(topic);
                    this.readyToSubscribeOnStart = true;
                }
                if (getClientManager() == null) {
                    this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
                    if (getConnectionInfo().isAutomaticReconnect()) {
                        MqttUtils.stopClientReconnectCycle(this.mqttClient);
                    }
                }
            }
        } catch (MqttException e) {
            this.logger.error(e, () -> {
                return "Error unsubscribing from " + Arrays.toString(topic);
            });
        } finally {
            this.topicLock.unlock();
        }
    }

    private void unsubscribe(String... strArr) throws MqttException {
        try {
            this.mqttClient.unsubscribe(strArr).waitForCompletion(getCompletionTimeout());
        } catch (ConcurrentModificationException e) {
            this.logger.error(e, () -> {
                return "Error unsubscribing from " + Arrays.toString(strArr);
            });
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void destroy() {
        super.destroy();
        try {
            if (getClientManager() == null && this.mqttClient != null) {
                this.mqttClient.close();
            }
        } catch (MqttException e) {
            this.logger.error(e, "Failed to close 'MqttAsyncClient'");
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void addTopic(String str, int i) {
        this.topicLock.lock();
        try {
            try {
                super.addTopic(str, i);
                if (this.mqttClient != null && this.mqttClient.isConnected()) {
                    MqttProperties mqttProperties = new MqttProperties();
                    mqttProperties.setSubscriptionIdentifiers(List.of(0));
                    this.mqttClient.subscribe(new MqttSubscription[]{new MqttSubscription(str, i)}, (Object) null, (MqttActionListener) null, this::messageArrived, mqttProperties).waitForCompletion(getCompletionTimeout());
                }
            } catch (MqttException e) {
                throw new MessagingException("Failed to subscribe to topic " + str, e);
            }
        } finally {
            this.topicLock.unlock();
        }
    }

    @Override // org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter
    public void removeTopic(String... strArr) {
        this.topicLock.lock();
        try {
            try {
                if (this.mqttClient != null && this.mqttClient.isConnected()) {
                    unsubscribe(strArr);
                }
                super.removeTopic(strArr);
                this.topicLock.unlock();
            } catch (MqttException e) {
                throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(strArr), e);
            }
        } catch (Throwable th) {
            this.topicLock.unlock();
            throw th;
        }
    }

    public void messageArrived(String str, MqttMessage mqttMessage) {
        Map headers = this.headerMapper.toHeaders(mqttMessage.getProperties());
        headers.put(MqttHeaders.ID, Integer.valueOf(mqttMessage.getId()));
        headers.put(MqttHeaders.RECEIVED_QOS, Integer.valueOf(mqttMessage.getQos()));
        headers.put(MqttHeaders.DUPLICATE, Boolean.valueOf(mqttMessage.isDuplicate()));
        headers.put(MqttHeaders.RECEIVED_RETAINED, Boolean.valueOf(mqttMessage.isRetained()));
        headers.put(MqttHeaders.RECEIVED_TOPIC, str);
        if (isManualAcks()) {
            headers.put("acknowledgmentCallback", new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.mqttClient));
        }
        Object payload = MqttMessage.class.isAssignableFrom(this.payloadType) ? mqttMessage : mqttMessage.getPayload();
        Message genericMessage = (MqttMessage.class.isAssignableFrom(this.payloadType) || byte[].class.isAssignableFrom(this.payloadType)) ? new GenericMessage(payload, headers) : this.messageConverter.toMessage(payload, new MessageHeaders(headers), this.payloadType);
        try {
            sendMessage(genericMessage);
        } catch (RuntimeException e) {
            Message message = genericMessage;
            this.logger.error(e, () -> {
                return "Unhandled exception for " + message;
            });
            throw e;
        }
    }

    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        if (!isRunning()) {
            this.readyToSubscribeOnStart = false;
            return;
        }
        MqttException exception = mqttDisconnectResponse.getException();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, exception));
        }
    }

    public void mqttErrorOccurred(MqttException mqttException) {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new MqttProtocolErrorEvent(this, mqttException));
        }
    }

    public void deliveryComplete(IMqttToken iMqttToken) {
    }

    @Override // org.springframework.integration.mqtt.core.ClientManager.ConnectCallback
    public void connectComplete(boolean z) {
        connectComplete(z, getUrl());
    }

    public void connectComplete(boolean z, String str) {
        if (isActive()) {
            subscribe();
        } else {
            this.readyToSubscribeOnStart = true;
        }
    }

    private void subscribe() {
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager = getClientManager();
        if (clientManager != null && this.mqttClient == null) {
            this.mqttClient = clientManager.getClient();
        }
        String[] topic = getTopic();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        this.topicLock.lock();
        try {
            try {
                if (topic.length == 0) {
                    this.topicLock.unlock();
                    return;
                }
                int[] qos = getQos();
                MqttSubscription[] mqttSubscriptionArr = (MqttSubscription[]) IntStream.range(0, topic.length).mapToObj(i -> {
                    return new MqttSubscription(topic[i], qos[i]);
                }).toArray(i2 -> {
                    return new MqttSubscription[i2];
                });
                MqttProperties mqttProperties = new MqttProperties();
                mqttProperties.setSubscriptionIdentifiers(List.of(0));
                this.mqttClient.subscribe(mqttSubscriptionArr, (Object) null, (MqttActionListener) null, this::messageArrived, mqttProperties).waitForCompletion(getCompletionTimeout());
                String str = "Connected and subscribed to " + Arrays.toString(topic);
                this.logger.debug(str);
                if (applicationEventPublisher != null) {
                    applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, str));
                }
                this.topicLock.unlock();
            } catch (MqttException e) {
                if (applicationEventPublisher != null) {
                    applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
                }
                this.logger.error(e, () -> {
                    return "Error subscribing to " + Arrays.toString(topic);
                });
                this.topicLock.unlock();
            }
        } catch (Throwable th) {
            this.topicLock.unlock();
            throw th;
        }
    }

    public void authPacketArrived(int i, MqttProperties mqttProperties) {
    }

    private static String obtainServerUrlFromOptions(MqttConnectionOptions mqttConnectionOptions) {
        Assert.notNull(mqttConnectionOptions, "'connectionOptions' must not be null");
        String[] serverURIs = mqttConnectionOptions.getServerURIs();
        Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
        return serverURIs[0];
    }
}
