package org.apache.camel.component.salesforce.internal.streaming;

import java.io.IOException;
import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.CookieStore;
import java.net.HttpCookie;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.StreamingApiConsumer;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.service.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.http.jetty.JettyHttpClientTransport;
import org.cometd.client.transport.ClientTransport;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.http.HttpCookieStore;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.class */
public class SubscriptionHelper extends ServiceSupport {
    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int HANDSHAKE_TIMEOUT_SEC = 120;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final String SFDC_FIELD = "sfdc";
    private static final String FAILURE_REASON_FIELD = "failureReason";
    private static final String SERVER_TOO_BUSY_ERROR = "503::";
    private static final String AUTHENTICATION_INVALID = "401::Authentication invalid";
    private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*";
    BayeuxClient client;
    private final SalesforceComponent component;
    private SalesforceSession session;
    private final long maxBackoff;
    private final long backoffIncrement;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private final Map<String, Set<StreamingApiConsumer>> channelToConsumers = new ConcurrentHashMap();
    private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> consumerToListener = new ConcurrentHashMap();
    private final Set<String> channelsToSubscribe = ConcurrentHashMap.newKeySet();
    private final ClientSessionChannel.MessageListener handshakeListener = createHandshakeListener();
    private final ClientSessionChannel.MessageListener subscriptionListener = createSubscriptionListener();
    private final ClientSessionChannel.MessageListener connectListener = createConnectionListener();
    private final AtomicLong handshakeBackoff = new AtomicLong();

    public SubscriptionHelper(SalesforceComponent salesforceComponent) {
        this.component = salesforceComponent;
        this.backoffIncrement = salesforceComponent.getConfig().getBackoffIncrement();
        this.maxBackoff = salesforceComponent.getConfig().getMaxBackoff();
    }

    private ClientSessionChannel.MessageListener createHandshakeListener() {
        return (clientSessionChannel, message) -> {
            this.component.getHttpClient().getWorkerPool().execute(() -> {
                LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
                if (message.isSuccessful()) {
                    if (this.channelToConsumers.isEmpty()) {
                        return;
                    }
                    this.channelsToSubscribe.clear();
                    this.channelsToSubscribe.addAll(this.channelToConsumers.keySet());
                    LOG.info("Handshake successful. Channels to subscribe: {}", this.channelsToSubscribe);
                    return;
                }
                LOG.warn("Handshake failure: {}", message);
                this.handshakeError = (String) message.get("error");
                this.handshakeException = getFailure(message);
                if (this.handshakeError != null && this.handshakeError.startsWith("403::") && getFailureReason(message).equals(AUTHENTICATION_INVALID)) {
                    LOG.debug("attempting login due to handshake error: 403 -> 401::Authentication invalid");
                    this.session.attemptLoginUntilSuccessful(this.backoffIncrement, this.maxBackoff);
                }
                LOG.debug("Handshake failed, so try again.");
                this.client.handshake();
            });
        };
    }

    private ClientSessionChannel.MessageListener createConnectionListener() {
        return (clientSessionChannel, message) -> {
            this.component.getHttpClient().getWorkerPool().execute(() -> {
                LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
                if (message.isSuccessful()) {
                    if (this.channelsToSubscribe.isEmpty()) {
                        return;
                    }
                    LOG.info("Subscribing to channels: {}", this.channelsToSubscribe);
                    Iterator<String> it = this.channelsToSubscribe.iterator();
                    while (it.hasNext()) {
                        Iterator<StreamingApiConsumer> it2 = this.channelToConsumers.get(it.next()).iterator();
                        while (it2.hasNext()) {
                            subscribe(it2.next());
                        }
                    }
                    return;
                }
                LOG.warn("Connect failure: {}", message);
                this.connectError = (String) message.get("error");
                this.connectException = getFailure(message);
                if (this.connectError != null && this.connectError.equals(AUTHENTICATION_INVALID)) {
                    LOG.debug("connectError: {}", this.connectError);
                    LOG.debug("Attempting login...");
                    this.session.attemptLoginUntilSuccessful(this.backoffIncrement, this.maxBackoff);
                }
                if (message.getAdvice() == null || "none".equals(message.getAdvice().get("reconnect"))) {
                    LOG.debug("Advice == none, so handshaking");
                    this.client.handshake();
                }
            });
        };
    }

    private ClientSessionChannel.MessageListener createSubscriptionListener() {
        return (clientSessionChannel, message) -> {
            this.component.getHttpClient().getWorkerPool().execute(() -> {
                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
                String obj = message.getOrDefault("subscription", "").toString();
                if (!message.isSuccessful()) {
                    LOG.warn("Subscription failure: {}", message);
                    this.channelToConsumers.getOrDefault(obj, Collections.emptySet()).stream().findFirst().ifPresent(streamingApiConsumer -> {
                        subscriptionFailed(streamingApiConsumer, message);
                    });
                } else {
                    LOG.info("Subscribed to channel {}", obj);
                    this.channelsToSubscribe.remove(obj);
                    this.handshakeBackoff.set(0L);
                }
            });
        };
    }

    private void subscriptionFailed(StreamingApiConsumer streamingApiConsumer, Message message) {
        String obj = message.getOrDefault("subscription", "").toString();
        Set<StreamingApiConsumer> orDefault = this.channelToConsumers.getOrDefault(obj, Collections.emptySet());
        String str = (String) message.get("error");
        if (str == null) {
            str = "Missing error message";
        }
        Exception failure = getFailure(message);
        Object[] objArr = new Object[2];
        objArr[0] = streamingApiConsumer.getTopicName();
        objArr[1] = failure != null ? failure.getMessage() : str;
        String format = String.format("Error subscribing to %s: %s", objArr);
        boolean z = true;
        LOG.warn(format);
        if (isTemporaryError(message)) {
            long andAdd = this.handshakeBackoff.getAndAdd(this.backoffIncrement);
            if (andAdd > this.maxBackoff) {
                LOG.error("Subscribe aborted after exceeding {} msecs backoff", Long.valueOf(this.maxBackoff));
            } else {
                z = false;
                try {
                    LOG.debug("Pausing for {} msecs before subscribe attempt", Long.valueOf(andAdd));
                    Thread.sleep(andAdd);
                    Iterator<StreamingApiConsumer> it = orDefault.iterator();
                    while (it.hasNext()) {
                        subscribe(it.next());
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Aborting subscribe on interrupt!", e);
                }
            }
        } else if (str.matches(INVALID_REPLAY_ID_PATTERN)) {
            z = false;
            long longValue = streamingApiConsumer.m636getEndpoint().getConfiguration().getFallBackReplayId().longValue();
            LOG.warn(str);
            LOG.warn("Falling back to replayId {} for channel {}", Long.valueOf(longValue), obj);
            REPLAY_EXTENSION.setReplayIdIfAbsent(obj, longValue);
            Iterator<StreamingApiConsumer> it2 = orDefault.iterator();
            while (it2.hasNext()) {
                subscribe(it2.next());
            }
        }
        if (!z || this.client == null) {
            return;
        }
        Iterator<StreamingApiConsumer> it3 = orDefault.iterator();
        while (it3.hasNext()) {
            it3.next().handleException(format, new SalesforceException(format, failure));
        }
    }

    protected void doStart() throws Exception {
        this.session = this.component.getSession();
        if (this.component.getLoginConfig().isLazyLogin()) {
            throw new CamelException("Lazy login is not supported by salesforce consumers.");
        }
        this.client = createClient(this.component, this.session);
        initMessageListeners();
        handshake();
    }

    private void initMessageListeners() {
        this.client.getChannel("/meta/handshake").addListener(this.handshakeListener);
        this.client.getChannel("/meta/subscribe").addListener(this.subscriptionListener);
        this.client.getChannel("/meta/connect").addListener(this.connectListener);
    }

    private void handshake() throws CamelException {
        this.client.handshake();
        if (this.client.waitFor(TimeUnit.MILLISECONDS.convert(120L, TimeUnit.SECONDS), BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            return;
        }
        if (this.handshakeException != null) {
            throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), this.handshakeException);
        }
        if (this.handshakeError != null) {
            throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
        }
        if (this.connectException != null) {
            throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), this.connectException);
        }
        if (this.connectError == null) {
            throw new CamelException(String.format("Handshake request timeout after %s seconds", Integer.valueOf(HANDSHAKE_TIMEOUT_SEC)));
        }
        throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.lang.Exception] */
    private static Exception getFailure(Message message) {
        Object obj = null;
        if (message.get(EXCEPTION_FIELD) != null) {
            obj = (Exception) message.get(EXCEPTION_FIELD);
        } else if (message.get(FAILURE_FIELD) != null) {
            obj = (Exception) ((Map) message.get(FAILURE_FIELD)).get(EXCEPTION_FIELD);
        } else {
            String failureReason = getFailureReason(message);
            if (failureReason != null) {
                obj = new SalesforceException(failureReason, (Throwable) null);
            }
        }
        return obj;
    }

    private void closeChannel(String str) {
        if (this.client == null) {
            return;
        }
        ClientSessionChannel channel = this.client.getChannel(str);
        Iterator it = channel.getListeners().iterator();
        while (it.hasNext()) {
            channel.removeListener((ClientSessionChannel.ClientSessionChannelListener) it.next());
        }
        channel.release();
    }

    protected void doStop() throws Exception {
        closeChannel("/meta/connect");
        closeChannel("/meta/subscribe");
        closeChannel("/meta/handshake");
        if (this.client == null) {
            return;
        }
        this.client.disconnect();
        if (!this.client.waitFor(60000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0])) {
            LOG.warn("Could not disconnect client connected to: {}", getEndpointUrl(this.component));
            this.client.abort();
        }
        this.client = null;
        if (this.session != null) {
            this.session.logout();
        }
        LOG.debug("Stopped the helper and destroyed the client");
    }

    static BayeuxClient createClient(SalesforceComponent salesforceComponent, final SalesforceSession salesforceSession) throws SalesforceException {
        SalesforceHttpClient httpClient = salesforceComponent.getConfig().getHttpClient();
        HashMap hashMap = new HashMap();
        hashMap.put("maxNetworkDelay", 120000);
        if (salesforceComponent.getLongPollingTransportProperties() != null) {
            hashMap.putAll(salesforceComponent.getLongPollingTransportProperties());
        }
        if (salesforceSession.getAccessToken() == null && !salesforceComponent.getLoginConfig().isLazyLogin()) {
            salesforceSession.login(null);
        }
        final CookieStore cookieStore = new CookieManager().getCookieStore();
        final HttpCookieStore.Default r0 = new HttpCookieStore.Default();
        BayeuxClient bayeuxClient = new BayeuxClient(getEndpointUrl(salesforceComponent), new JettyHttpClientTransport(hashMap, httpClient) { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.1
            protected void customize(Request request) {
                super.customize(request);
                String accessToken = salesforceSession.getAccessToken();
                if (accessToken == null) {
                    try {
                        accessToken = salesforceSession.login(null);
                    } catch (SalesforceException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                }
                String str = new String(accessToken);
                request.headers(mutable -> {
                    mutable.add(HttpHeader.AUTHORIZATION, "OAuth " + str);
                });
            }

            protected void storeCookies(URI uri, Map<String, List<String>> map) {
                try {
                    CookieManager cookieManager = new CookieManager(cookieStore, CookiePolicy.ACCEPT_ALL);
                    cookieManager.put(uri, map);
                    Iterator<HttpCookie> it = cookieManager.getCookieStore().getCookies().iterator();
                    while (it.hasNext()) {
                        r0.add(uri, org.eclipse.jetty.http.HttpCookie.from(it.next()));
                    }
                } catch (IOException e) {
                    if (SubscriptionHelper.LOG.isDebugEnabled()) {
                        SubscriptionHelper.LOG.debug("Could not parse cookies", e);
                    }
                }
            }

            protected HttpCookieStore getHttpCookieStore() {
                return r0;
            }
        }, new ClientTransport[0]);
        bayeuxClient.addExtension(REPLAY_EXTENSION);
        return bayeuxClient;
    }

    public void subscribe(StreamingApiConsumer streamingApiConsumer) {
        this.lock.lock();
        try {
            String channelName = getChannelName(streamingApiConsumer.getTopicName());
            this.channelToConsumers.computeIfAbsent(channelName, str -> {
                return ConcurrentHashMap.newKeySet();
            }).add(streamingApiConsumer);
            this.channelsToSubscribe.add(channelName);
            setReplayIdIfAbsent(streamingApiConsumer.m636getEndpoint());
            LOG.info("Subscribing to channel {}...", channelName);
            this.client.getChannel(channelName).subscribe(this.consumerToListener.computeIfAbsent(streamingApiConsumer, streamingApiConsumer2 -> {
                return (clientSessionChannel, message) -> {
                    LOG.debug("Received Message: {}", message);
                    streamingApiConsumer.processMessage(clientSessionChannel, message);
                };
            }));
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private static boolean isTemporaryError(Message message) {
        String failureReason = getFailureReason(message);
        return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
    }

    private static String getFailureReason(Message message) {
        Map map;
        String str = null;
        if (message.getExt() != null && (map = (Map) message.getExt().get(SFDC_FIELD)) != null) {
            str = (String) map.get(FAILURE_REASON_FIELD);
        }
        return str;
    }

    private void setReplayIdIfAbsent(SalesforceEndpoint salesforceEndpoint) {
        String topicName = salesforceEndpoint.getTopicName();
        Optional<Long> determineReplayIdFor = determineReplayIdFor(salesforceEndpoint, topicName);
        if (determineReplayIdFor.isPresent()) {
            REPLAY_EXTENSION.setReplayIdIfAbsent(getChannelName(topicName), determineReplayIdFor.get().longValue());
        }
    }

    static Optional<Long> determineReplayIdFor(SalesforceEndpoint salesforceEndpoint, String str) {
        String channelName = getChannelName(str);
        Long replayId = salesforceEndpoint.getReplayId();
        SalesforceComponent m628getComponent = salesforceEndpoint.m628getComponent();
        SalesforceEndpointConfig configuration = salesforceEndpoint.getConfiguration();
        Map<String, Long> initialReplayIdMap = configuration.getInitialReplayIdMap();
        Long orDefault = initialReplayIdMap.getOrDefault(str, initialReplayIdMap.get(channelName));
        Long defaultReplayId = configuration.getDefaultReplayId();
        SalesforceEndpointConfig config = m628getComponent.getConfig();
        Map<String, Long> initialReplayIdMap2 = config.getInitialReplayIdMap();
        return Stream.of((Object[]) new Long[]{replayId, orDefault, initialReplayIdMap2.getOrDefault(str, initialReplayIdMap2.get(channelName)), defaultReplayId, config.getDefaultReplayId()}).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
    }

    static String getChannelName(String str) {
        StringBuilder sb = new StringBuilder();
        if (str.charAt(0) != '/') {
            sb.append('/');
        }
        if (str.indexOf(47, 1) > 0) {
            sb.append(str);
        } else {
            sb.append("topic/");
            sb.append(str);
        }
        return sb.toString();
    }

    public void unsubscribe(StreamingApiConsumer streamingApiConsumer) {
        this.lock.lock();
        try {
            String channelName = getChannelName(streamingApiConsumer.getTopicName());
            Set<StreamingApiConsumer> set = this.channelToConsumers.get(channelName);
            if (set != null) {
                set.remove(streamingApiConsumer);
                if (set.isEmpty()) {
                    this.channelToConsumers.remove(channelName);
                }
            }
            ClientSessionChannel.MessageListener remove = this.consumerToListener.remove(streamingApiConsumer);
            if (remove != null) {
                LOG.debug("Unsubscribing from channel {}...", channelName);
                ClientSessionChannel channel = this.client.getChannel(channelName);
                channel.unsubscribe(remove);
                channel.release();
            }
        } finally {
            this.lock.unlock();
        }
    }

    static String getEndpointUrl(SalesforceComponent salesforceComponent) {
        if (Double.parseDouble(salesforceComponent.getConfig().getApiVersion()) == 36.0d) {
            if ((salesforceComponent.getConfig().getDefaultReplayId() == null && salesforceComponent.getConfig().getInitialReplayIdMap().isEmpty()) ? false : true) {
                return salesforceComponent.getSession().getInstanceUrl() + "/cometd/replay/" + salesforceComponent.getConfig().getApiVersion();
            }
        }
        return salesforceComponent.getSession().getInstanceUrl() + "/cometd/" + salesforceComponent.getConfig().getApiVersion();
    }
}
