package io.gravitee.exchange.connector.websocket;

import io.gravitee.common.utils.RxHelper;
import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.CommandAdapter;
import io.gravitee.exchange.api.command.CommandHandler;
import io.gravitee.exchange.api.command.Reply;
import io.gravitee.exchange.api.command.ReplyAdapter;
import io.gravitee.exchange.api.websocket.command.ExchangeSerDe;
import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter;
import io.gravitee.exchange.api.websocket.protocol.ProtocolVersion;
import io.gravitee.exchange.connector.embedded.EmbeddedExchangeConnector;
import io.gravitee.exchange.connector.websocket.channel.WebSocketConnectorChannel;
import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory;
import io.gravitee.exchange.connector.websocket.exception.WebSocketConnectorException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.http.WebSocketConnectOptions;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.http.WebSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector.class */
public class WebSocketExchangeConnector extends EmbeddedExchangeConnector {
    private static final Logger log = LoggerFactory.getLogger(WebSocketExchangeConnector.class);
    private final ProtocolVersion protocolVersion;
    private final List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers;
    private final List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> commandAdapters;
    private final List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> replyAdapters;
    private final Vertx vertx;
    private final WebSocketConnectorClientFactory webSocketConnectorClientFactory;
    private final ExchangeSerDe exchangeSerDe;

    /* loaded from: input_file:io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector$WebSocketExchangeConnectorBuilder.class */
    public static abstract class WebSocketExchangeConnectorBuilder<C extends WebSocketExchangeConnector, B extends WebSocketExchangeConnectorBuilder<C, B>> extends EmbeddedExchangeConnector.EmbeddedExchangeConnectorBuilder<C, B> {
        private ProtocolVersion protocolVersion;
        private List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers;
        private List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> commandAdapters;
        private List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> replyAdapters;
        private Vertx vertx;
        private WebSocketConnectorClientFactory webSocketConnectorClientFactory;
        private ExchangeSerDe exchangeSerDe;

        public B protocolVersion(ProtocolVersion protocolVersion) {
            this.protocolVersion = protocolVersion;
            return mo2self();
        }

        public B commandHandlers(List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> list) {
            this.commandHandlers = list;
            return mo2self();
        }

        public B commandAdapters(List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> list) {
            this.commandAdapters = list;
            return mo2self();
        }

        public B replyAdapters(List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> list) {
            this.replyAdapters = list;
            return mo2self();
        }

        public B vertx(Vertx vertx) {
            this.vertx = vertx;
            return mo2self();
        }

        public B webSocketConnectorClientFactory(WebSocketConnectorClientFactory webSocketConnectorClientFactory) {
            this.webSocketConnectorClientFactory = webSocketConnectorClientFactory;
            return mo2self();
        }

        public B exchangeSerDe(ExchangeSerDe exchangeSerDe) {
            this.exchangeSerDe = exchangeSerDe;
            return mo2self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // 
        /* renamed from: self, reason: merged with bridge method [inline-methods] */
        public abstract B mo2self();

        @Override // 
        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public abstract C mo1build();

        public String toString() {
            return "WebSocketExchangeConnector.WebSocketExchangeConnectorBuilder(super=" + super.toString() + ", protocolVersion=" + this.protocolVersion + ", commandHandlers=" + this.commandHandlers + ", commandAdapters=" + this.commandAdapters + ", replyAdapters=" + this.replyAdapters + ", vertx=" + this.vertx + ", webSocketConnectorClientFactory=" + this.webSocketConnectorClientFactory + ", exchangeSerDe=" + this.exchangeSerDe + ")";
        }
    }

    /* loaded from: input_file:io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector$WebSocketExchangeConnectorBuilderImpl.class */
    private static final class WebSocketExchangeConnectorBuilderImpl extends WebSocketExchangeConnectorBuilder<WebSocketExchangeConnector, WebSocketExchangeConnectorBuilderImpl> {
        private WebSocketExchangeConnectorBuilderImpl() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.gravitee.exchange.connector.websocket.WebSocketExchangeConnector.WebSocketExchangeConnectorBuilder
        /* renamed from: self */
        public WebSocketExchangeConnectorBuilderImpl mo2self() {
            return this;
        }

        @Override // io.gravitee.exchange.connector.websocket.WebSocketExchangeConnector.WebSocketExchangeConnectorBuilder
        /* renamed from: build */
        public WebSocketExchangeConnector mo1build() {
            return new WebSocketExchangeConnector(this);
        }
    }

    public WebSocketExchangeConnector(ProtocolVersion protocolVersion, List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> list, List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> list2, List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> list3, Vertx vertx, WebSocketConnectorClientFactory webSocketConnectorClientFactory, ExchangeSerDe exchangeSerDe) {
        this.protocolVersion = protocolVersion;
        this.commandHandlers = list != null ? new ArrayList(list) : new ArrayList();
        this.commandAdapters = list != null ? new ArrayList(list2) : new ArrayList();
        this.replyAdapters = list != null ? new ArrayList(list3) : new ArrayList();
        this.vertx = vertx;
        this.webSocketConnectorClientFactory = webSocketConnectorClientFactory;
        this.exchangeSerDe = exchangeSerDe;
    }

    public Completable initialize() {
        return Completable.fromRunnable(() -> {
            setPrimary(false);
        }).andThen(connect()).flatMapCompletable(webSocket -> {
            this.connectorChannel = new WebSocketConnectorChannel(this.commandHandlers, this.commandAdapters, this.replyAdapters, this.vertx, webSocket, (ProtocolAdapter) this.protocolVersion.adapterFactory().apply(this.exchangeSerDe));
            return this.connectorChannel.initialize().doOnComplete(() -> {
                webSocket.closeHandler(r6 -> {
                    log.debug("Exchange Connector has been closed with status code '{}'", webSocket.closeStatusCode());
                    if (Objects.equals(webSocket.closeStatusCode(), (short) 1000)) {
                        return;
                    }
                    log.warn("Exchange Connector closed abnormally, reconnecting...");
                    initialize().onErrorComplete().subscribeOn(Schedulers.io()).subscribe();
                });
            });
        }).retryWhen(RxHelper.retryExponentialBackoff(1L, 300L, TimeUnit.SECONDS, 1.5d, th -> {
            return (th instanceof WebSocketConnectorException) && ((WebSocketConnectorException) th).isRetryable();
        })).doOnError(th2 -> {
            log.error("Unable to connect to Exchange Controller Endpoint.");
        }).doOnComplete(() -> {
            log.info("Exchange Connector is now connected and ready");
        });
    }

    private Single<WebSocket> connect() {
        WebSocketConnectorClientFactory webSocketConnectorClientFactory = this.webSocketConnectorClientFactory;
        Objects.requireNonNull(webSocketConnectorClientFactory);
        return Maybe.fromCallable(webSocketConnectorClientFactory::nextEndpoint).switchIfEmpty(Maybe.fromRunnable(() -> {
            throw new WebSocketConnectorException("No Exchange Controller Endpoint is defined. Please check your configuration", false);
        })).toSingle().flatMap(webSocketEndpoint -> {
            log.debug("Trying to connect to the Exchange Controller WebSocket '{}'", webSocketEndpoint.getUrl());
            HttpClient createHttpClient = this.webSocketConnectorClientFactory.createHttpClient(webSocketEndpoint);
            WebSocketConnectOptions addHeader = new WebSocketConnectOptions().setURI(webSocketEndpoint.resolvePath("/exchange/controller")).addHeader("X-Gravitee-Exchange-Protocol", this.protocolVersion.version());
            if (this.webSocketConnectorClientFactory.getConfiguration().headers() != null) {
                Map<String, String> headers = this.webSocketConnectorClientFactory.getConfiguration().headers();
                Objects.requireNonNull(addHeader);
                headers.forEach(addHeader::addHeader);
            }
            return createHttpClient.rxWebSocket(addHeader).doOnSuccess(webSocket -> {
                this.webSocketConnectorClientFactory.resetEndpointRetries();
                log.debug("Exchange Connector has successfully connected to the Exchange Controller WebSocket");
            }).onErrorResumeNext(th -> {
                log.error("Unable to connect to the Exchange Controller Endpoint {} times, retrying...", Integer.valueOf(this.webSocketConnectorClientFactory.endpointRetries()), th);
                return createHttpClient.close().andThen(Single.error(new WebSocketConnectorException("Unable to connect to Exchange Controller Endpoint", th, true)));
            });
        });
    }

    public void addCommandHandlers(List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> list) {
        if (list != null) {
            list.forEach(commandHandler -> {
                if (this.commandHandlers.stream().noneMatch(commandHandler -> {
                    return commandHandler.supportType().equals(commandHandler.supportType());
                })) {
                    this.commandHandlers.add(commandHandler);
                }
            });
        }
        super.addCommandHandlers(list);
    }

    protected WebSocketExchangeConnector(WebSocketExchangeConnectorBuilder<?, ?> webSocketExchangeConnectorBuilder) {
        super(webSocketExchangeConnectorBuilder);
        this.protocolVersion = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).protocolVersion;
        this.commandHandlers = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).commandHandlers;
        this.commandAdapters = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).commandAdapters;
        this.replyAdapters = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).replyAdapters;
        this.vertx = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).vertx;
        this.webSocketConnectorClientFactory = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).webSocketConnectorClientFactory;
        this.exchangeSerDe = ((WebSocketExchangeConnectorBuilder) webSocketExchangeConnectorBuilder).exchangeSerDe;
    }

    public static WebSocketExchangeConnectorBuilder<?, ?> builder() {
        return new WebSocketExchangeConnectorBuilderImpl();
    }
}
