package io.gravitee.exchange.api.websocket.channel;

import io.gravitee.exchange.api.channel.Channel;
import io.gravitee.exchange.api.channel.exception.ChannelClosedException;
import io.gravitee.exchange.api.channel.exception.ChannelInactiveException;
import io.gravitee.exchange.api.channel.exception.ChannelInitializationException;
import io.gravitee.exchange.api.channel.exception.ChannelNoReplyException;
import io.gravitee.exchange.api.channel.exception.ChannelReplyException;
import io.gravitee.exchange.api.channel.exception.ChannelTimeoutException;
import io.gravitee.exchange.api.channel.exception.ChannelUnknownCommandException;
import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.CommandAdapter;
import io.gravitee.exchange.api.command.CommandHandler;
import io.gravitee.exchange.api.command.CommandStatus;
import io.gravitee.exchange.api.command.Reply;
import io.gravitee.exchange.api.command.ReplyAdapter;
import io.gravitee.exchange.api.command.goodbye.GoodByeCommand;
import io.gravitee.exchange.api.command.goodbye.GoodByeCommandPayload;
import io.gravitee.exchange.api.command.hello.HelloCommand;
import io.gravitee.exchange.api.command.hello.HelloReply;
import io.gravitee.exchange.api.command.hello.HelloReplyPayload;
import io.gravitee.exchange.api.command.noreply.NoReply;
import io.gravitee.exchange.api.command.unknown.UnknownCommandHandler;
import io.gravitee.exchange.api.command.unknown.UnknownReply;
import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter;
import io.gravitee.exchange.api.websocket.protocol.ProtocolExchange;
import io.gravitee.exchange.api.websocket.protocol.legacy.ignored.IgnoredReply;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.WebSocketBase;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/exchange/api/websocket/channel/AbstractWebSocketChannel.class */
public abstract class AbstractWebSocketChannel implements Channel {
    private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketChannel.class);
    private static final int PING_DELAY = 5000;
    protected final Vertx vertx;
    protected final WebSocketBase webSocket;
    protected final ProtocolAdapter protocolAdapter;
    protected String targetId;
    protected boolean active;
    protected final String id = UUID.randomUUID().toString();
    protected final Map<String, CommandHandler<? extends Command<?>, ? extends Reply<?>>> commandHandlers = new ConcurrentHashMap();
    protected final Map<String, CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> commandAdapters = new ConcurrentHashMap();
    protected final Map<String, ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> replyAdapters = new ConcurrentHashMap();
    protected final Map<String, SingleEmitter<? extends Reply<?>>> resultEmitters = new ConcurrentHashMap();
    protected AtomicInteger inflightCommandCount = new AtomicInteger(0);
    private long pingTaskId = -1;

    protected AbstractWebSocketChannel(List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> list, List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> list2, List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> list3, Vertx vertx, WebSocketBase webSocketBase, ProtocolAdapter protocolAdapter) {
        addCommandHandlers(list);
        addCommandHandlers(List.of(new UnknownCommandHandler()));
        addCommandHandlers(protocolAdapter.commandHandlers());
        addCommandAdapters(list2);
        addCommandAdapters(protocolAdapter.commandAdapters());
        addReplyAdapters(list3);
        addReplyAdapters(protocolAdapter.replyAdapters());
        this.vertx = vertx;
        this.webSocket = webSocketBase;
        this.protocolAdapter = protocolAdapter;
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public String id() {
        return this.id;
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public String targetId() {
        return this.targetId;
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public boolean isActive() {
        return this.active;
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public boolean hasPendingCommands() {
        return !this.resultEmitters.isEmpty() || this.inflightCommandCount.get() > 0;
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public Completable initialize() {
        return Completable.create(completableEmitter -> {
            this.webSocket.closeHandler(r6 -> {
                log.warn("Channel '{}' for target '{}' is closing", this.id, this.targetId);
                this.active = false;
                cleanChannel();
            });
            this.webSocket.pongHandler(buffer -> {
                log.trace("Receiving pong frame from channel '{}' for target '{}'", this.id, this.targetId);
            });
            this.webSocket.textMessageHandler(str -> {
                this.webSocket.close((short) 1003, "Unsupported text frame").subscribe();
            });
            this.webSocket.binaryMessageHandler(buffer2 -> {
                if (buffer2.length() > 0) {
                    ProtocolExchange read = this.protocolAdapter.read(buffer2);
                    try {
                        if (ProtocolExchange.Type.COMMAND == read.type()) {
                            receiveCommand(completableEmitter, read.asCommand());
                        } else if (ProtocolExchange.Type.REPLY == read.type()) {
                            receiveReply(read.asReply());
                        } else {
                            this.webSocket.close((short) 1002, "Exchange message unknown").subscribe();
                        }
                    } catch (Exception e) {
                        log.warn(String.format("An error occurred when trying to decode incoming websocket exchange [%s]. Closing Socket.", read), e);
                        this.webSocket.close((short) 1011, "Unexpected error while handling incoming websocket exchange").subscribe();
                    }
                }
            });
            if (expectHelloCommand()) {
                return;
            }
            this.active = true;
            completableEmitter.onComplete();
        }).doOnComplete(() -> {
            log.debug("Channel '{}' has been successfully initialized", this.id);
        }).doOnError(th -> {
            log.error("Unable to initialize channel '{}'", this.id);
        });
    }

    private <C extends Command<?>> void receiveCommand(CompletableEmitter completableEmitter, C c) {
        if (c == null) {
            this.webSocket.close((short) 1002, "Unrecognized incoming exchange").subscribe();
            completableEmitter.onError(new ChannelUnknownCommandException("Unrecognized incoming exchange"));
        } else {
            log.trace("Handling received command '{}' of type '{}'", c.getId(), c.getType());
            CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>> commandAdapter = this.commandAdapters.get(c.getType());
            (commandAdapter != null ? commandAdapter.adapt(this.targetId, c) : Single.just(c)).flatMapCompletable(command -> {
                CommandHandler<Command<?>, Reply<?>> commandHandler = (CommandHandler) this.commandHandlers.get(command.getType());
                if (expectHelloCommand() && !this.active && !Objects.equals(command.getType(), HelloCommand.COMMAND_TYPE)) {
                    this.webSocket.close((short) 1002, "Hello Command is first expected to initialize the exchange channel").subscribe();
                    completableEmitter.onError(new ChannelInitializationException("Hello Command is first expected to initialize the channel"));
                    return Completable.complete();
                }
                if (Objects.equals(command.getType(), HelloCommand.COMMAND_TYPE)) {
                    return handleHelloCommand(completableEmitter, command, commandHandler);
                }
                if (Objects.equals(command.getType(), GoodByeCommand.COMMAND_TYPE)) {
                    return handleGoodByeCommand(command, commandHandler);
                }
                if (commandHandler != null) {
                    return handleCommandAsync(command, commandHandler);
                }
                log.info("No handler found for command type '{}'. Ignoring", command.getType());
                return writeReply(new NoReply(command.getId(), "No handler found for command type '%s'. Ignoring".formatted(command.getType())));
            }).onErrorResumeNext(th -> {
                log.warn("Unexpected internal error occurred when handling command '%s' of type '%s'".formatted(c.getId(), c.getType()), th);
                return writeReply(new NoReply(c.getId(), "Unexpected internal error occurred"));
            }).subscribe();
        }
    }

    protected abstract boolean expectHelloCommand();

    private void receiveReply(Reply<?> reply) {
        SingleEmitter<? extends Reply<?>> remove = this.resultEmitters.remove(reply.getCommandId());
        if (remove == null) {
            log.debug("No reply emitter for received reply '{}' of type '{}'. Ignoring", reply.getCommandId(), reply.getType());
            return;
        }
        log.trace("Handling received reply '{}' of type '{}'", reply.getCommandId(), reply.getType());
        ReplyAdapter<? extends Reply<?>, ? extends Reply<?>> replyAdapter = this.replyAdapters.get(reply.getType());
        (replyAdapter != null ? replyAdapter.adapt(this.targetId, reply) : Single.just(reply)).doOnSuccess(reply2 -> {
            if (reply2 instanceof UnknownReply) {
                remove.onError(new ChannelUnknownCommandException(reply2.getErrorDetails()));
            } else if ((reply2 instanceof NoReply) || (reply2 instanceof IgnoredReply)) {
                remove.onError(new ChannelNoReplyException(reply2.getErrorDetails()));
            } else {
                remove.onSuccess(reply2);
            }
            if (reply2.stopOnErrorStatus() && reply2.getCommandStatus() == CommandStatus.ERROR) {
                this.webSocket.close().subscribe();
            }
        }).doOnError(th -> {
            log.warn("Unable to handle reply '{}' for command '{}'", reply.getType(), reply.getCommandId());
            remove.onError(new ChannelReplyException(th));
        }).subscribe();
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public Completable close() {
        return Completable.fromRunnable(() -> {
            this.webSocket.close((short) 1000).subscribe();
            cleanChannel();
        });
    }

    protected void cleanChannel() {
        this.active = false;
        this.resultEmitters.forEach((str, singleEmitter) -> {
            if (singleEmitter.isDisposed()) {
                return;
            }
            singleEmitter.onError(new ChannelClosedException());
        });
        this.resultEmitters.clear();
        if (this.pingTaskId != -1) {
            this.vertx.cancelTimer(this.pingTaskId);
            this.pingTaskId = -1L;
        }
        if (this.webSocket == null || this.webSocket.isClosed()) {
            return;
        }
        this.webSocket.close((short) 1011).subscribe();
    }

    protected Completable handleHelloCommand(CompletableEmitter completableEmitter, Command<?> command, CommandHandler<Command<?>, Reply<?>> commandHandler) {
        return commandHandler != null ? handleCommand(command, commandHandler, false).doOnSuccess(reply -> {
            if (reply.getCommandStatus() == CommandStatus.SUCCEEDED) {
                P payload = reply.getPayload();
                if (!(payload instanceof HelloReplyPayload)) {
                    completableEmitter.onError(new ChannelInitializationException("Unable to parse hello reply payload"));
                    return;
                }
                this.targetId = ((HelloReplyPayload) payload).getTargetId();
                this.active = true;
                startPingTask();
                completableEmitter.onComplete();
            }
        }).ignoreElement() : Completable.fromRunnable(() -> {
            startPingTask();
            completableEmitter.onComplete();
        });
    }

    private void startPingTask() {
        this.pingTaskId = this.vertx.setPeriodic(5000L, l -> {
            if (this.webSocket.isClosed()) {
                return;
            }
            this.webSocket.writePing(Buffer.buffer()).subscribe();
        });
    }

    protected Completable handleGoodByeCommand(Command<?> command, CommandHandler<Command<?>, Reply<?>> commandHandler) {
        return commandHandler != null ? handleCommand(command, commandHandler, true).doOnSuccess(reply -> {
            if (reply.getCommandStatus() == CommandStatus.SUCCEEDED) {
                P payload = command.getPayload();
                if ((payload instanceof GoodByeCommandPayload) && ((GoodByeCommandPayload) payload).isReconnect()) {
                    this.webSocket.close((short) 1013, "GoodBye Command with reconnection requested.").subscribe();
                } else {
                    this.webSocket.close((short) 1000, "GoodBye Command without reconnection.").subscribe();
                }
            }
        }).doFinally(this::cleanChannel).ignoreElement() : Completable.fromRunnable(() -> {
            this.webSocket.close((short) 1013).subscribe();
            cleanChannel();
        });
    }

    protected Completable handleCommandAsync(Command<?> command, CommandHandler<Command<?>, Reply<?>> commandHandler) {
        return handleCommand(command, commandHandler, false).ignoreElement();
    }

    protected Single<Reply<?>> handleCommand(Command<?> command, CommandHandler<Command<?>, Reply<?>> commandHandler, boolean z) {
        return Completable.fromRunnable(() -> {
            this.inflightCommandCount.incrementAndGet();
        }).andThen(Single.defer(() -> {
            return commandHandler.handle(command);
        })).flatMap(reply -> {
            return !z ? writeReply(reply).andThen(Single.just(reply)) : Single.just(reply);
        }).doOnError(th -> {
            log.warn("Unable to handle command '{}' with id '{}'", command.getType(), command.getId());
            this.webSocket.close((short) 1011, "Unexpected error").subscribe();
        }).doFinally(() -> {
            this.inflightCommandCount.decrementAndGet();
        });
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public <C extends Command<?>, R extends Reply<?>> Single<R> send(C c) {
        return send(c, false);
    }

    protected Single<HelloReply> sendHelloCommand(HelloCommand helloCommand) {
        return send(helloCommand, true);
    }

    protected <C extends Command<?>, R extends Reply<?>> Single<R> send(C c, boolean z) {
        return Single.defer(() -> {
            if (!z && !this.active) {
                return Single.error(new ChannelInactiveException());
            }
            CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>> commandAdapter = this.commandAdapters.get(c.getType());
            return commandAdapter != null ? commandAdapter.adapt(this.targetId, c) : Single.just(c);
        }).flatMap(command -> {
            return Single.create(singleEmitter -> {
                this.resultEmitters.put(command.getId(), singleEmitter);
                Completable writeCommand = writeCommand(command);
                Objects.requireNonNull(singleEmitter);
                writeCommand.doOnError(singleEmitter::onError).onErrorComplete().subscribe();
            }).timeout(command.getReplyTimeoutMs(), TimeUnit.MILLISECONDS, Single.error(() -> {
                if (command.getReplyTimeoutMs() > 0) {
                    log.warn("No reply received in time for command '{}' with id '{}'", command.getType(), command.getId());
                }
                throw new ChannelTimeoutException();
            }));
        }).onErrorResumeNext(th -> {
            CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>> commandAdapter = this.commandAdapters.get(c.getType());
            return commandAdapter != null ? commandAdapter.onError(c, th) : Single.error(th);
        }).doFinally(() -> {
            this.resultEmitters.remove(c.getId());
        });
    }

    protected <C extends Command<?>> Completable writeCommand(C c) {
        return writeToSocket(c.getId(), ProtocolExchange.builder().type(ProtocolExchange.Type.COMMAND).exchangeType(c.getType()).exchange(c).build());
    }

    protected <R extends Reply<?>> Completable writeReply(R r) {
        return Single.defer(() -> {
            ReplyAdapter<? extends Reply<?>, ? extends Reply<?>> replyAdapter = this.replyAdapters.get(r.getType());
            return replyAdapter != null ? replyAdapter.adapt(this.targetId, r) : Single.just(r);
        }).flatMapCompletable(reply -> {
            return writeToSocket(reply.getCommandId(), ProtocolExchange.builder().type(ProtocolExchange.Type.REPLY).exchangeType(reply.getType()).exchange(reply).build());
        });
    }

    private Completable writeToSocket(String str, ProtocolExchange protocolExchange) {
        return !this.webSocket.isClosed() ? this.webSocket.writeBinaryMessage(this.protocolAdapter.write(protocolExchange)).doOnComplete(() -> {
            log.trace("Write exchange '{}' for '{}' and id '{}' to websocket successfully", new Object[]{protocolExchange.type(), protocolExchange.exchangeType(), str});
        }).onErrorResumeNext(th -> {
            log.error("An error occurred when trying to send exchange '{}' for '{}' and id '{}'", new Object[]{protocolExchange.type(), protocolExchange.exchangeType(), str});
            return Completable.error(new Exception("Write to socket failed"));
        }) : Completable.error(new ChannelClosedException());
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public void addCommandHandlers(List<CommandHandler<? extends Command<?>, ? extends Reply<?>>> list) {
        if (list != null) {
            list.forEach(commandHandler -> {
                this.commandHandlers.putIfAbsent(commandHandler.supportType(), commandHandler);
            });
        }
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public void addCommandAdapters(List<CommandAdapter<? extends Command<?>, ? extends Command<?>, ? extends Reply<?>>> list) {
        if (list != null) {
            list.forEach(commandAdapter -> {
                this.commandAdapters.putIfAbsent(commandAdapter.supportType(), commandAdapter);
            });
        }
    }

    @Override // io.gravitee.exchange.api.channel.Channel
    public void addReplyAdapters(List<ReplyAdapter<? extends Reply<?>, ? extends Reply<?>>> list) {
        if (list != null) {
            list.forEach(replyAdapter -> {
                this.replyAdapters.putIfAbsent(replyAdapter.supportType(), replyAdapter);
            });
        }
    }
}
