package org.apache.paimon.service.network;

import java.nio.channels.ClosedChannelException;
import org.apache.paimon.service.network.messages.MessageBody;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.messages.MessageType;
import org.apache.paimon.service.network.messages.RequestFailure;
import org.apache.paimon.shade.netty4.io.netty.buffer.ByteBuf;
import org.apache.paimon.shade.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.paimon.shade.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.paimon.shade.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/service/network/ClientHandler.class */
public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
    private final MessageSerializer<REQ, RESP> serializer;
    private final ClientHandlerCallback<RESP> callback;

    public ClientHandler(MessageSerializer<REQ, RESP> messageSerializer, ClientHandlerCallback<RESP> clientHandlerCallback) {
        this.serializer = (MessageSerializer) Preconditions.checkNotNull(messageSerializer);
        this.callback = (ClientHandlerCallback) Preconditions.checkNotNull(clientHandlerCallback);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        try {
            try {
                ByteBuf byteBuf = (ByteBuf) obj;
                MessageType deserializeHeader = MessageSerializer.deserializeHeader(byteBuf);
                if (deserializeHeader == MessageType.REQUEST_RESULT) {
                    this.callback.onRequestResult(MessageSerializer.getRequestId(byteBuf), this.serializer.deserializeResponse(byteBuf));
                } else {
                    if (deserializeHeader != MessageType.REQUEST_FAILURE) {
                        if (deserializeHeader != MessageType.SERVER_FAILURE) {
                            throw new IllegalStateException("Unexpected response type '" + deserializeHeader + "'");
                        }
                        throw MessageSerializer.deserializeServerFailure(byteBuf);
                    }
                    RequestFailure deserializeRequestFailure = MessageSerializer.deserializeRequestFailure(byteBuf);
                    this.callback.onRequestFailure(deserializeRequestFailure.getRequestId(), deserializeRequestFailure.getCause());
                }
                ReferenceCountUtil.release(obj);
            } catch (Throwable th) {
                try {
                    this.callback.onFailure(th);
                } catch (Throwable th2) {
                    LOG.error("Failed to notify callback about failure", th2);
                }
                ReferenceCountUtil.release(obj);
            }
        } catch (Throwable th3) {
            ReferenceCountUtil.release(obj);
            throw th3;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        try {
            this.callback.onFailure(th);
        } catch (Throwable th2) {
            LOG.error("Failed to notify callback about failure", th2);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        try {
            this.callback.onFailure(new ClosedChannelException());
        } catch (Throwable th) {
            LOG.error("Failed to notify callback about failure", th);
        }
    }
}
