package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue;
import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;

/* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/OperationChannel.class */
public class OperationChannel implements MessagePassingQueue.Consumer<HotRodOperation<?>> {
    private static final Log log;
    public static final AttributeKey<OperationChannel> OPERATION_CHANNEL_ATTRIBUTE_KEY;
    private final SocketAddress address;
    private final ChannelInitializer newChannelInvoker;
    private final Function<String, ClientTopology> currentCacheTopologyFunction;
    private final BiConsumer<OperationChannel, Throwable> connectionFailureListener;
    private volatile Channel channel;
    private boolean acceptingRequests;
    Codec codec;
    HeaderDecoder headerDecoder;
    ByteBuf buffer;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Runnable SEND_OPERATIONS = this::sendOperations;
    private final AtomicReference<CompletableFuture<Void>> attemptedConnect = new AtomicReference<>();
    private final MpscUnboundedArrayQueue<HotRodOperation<?>> queue = new MpscUnboundedArrayQueue<>(HotRodConstants.COUNTER_GET_AND_SET_RESPONSE);

    OperationChannel(SocketAddress socketAddress, ChannelInitializer channelInitializer, Function<String, ClientTopology> function, BiConsumer<OperationChannel, Throwable> biConsumer) {
        if (!$assertionsDisabled && (socketAddress instanceof InetSocketAddress) && !((InetSocketAddress) socketAddress).isUnresolved()) {
            throw new AssertionError();
        }
        this.address = socketAddress;
        this.newChannelInvoker = channelInitializer;
        this.currentCacheTopologyFunction = function;
        this.connectionFailureListener = biConsumer;
    }

    public static OperationChannel createAndStart(SocketAddress socketAddress, ChannelInitializer channelInitializer, Function<String, ClientTopology> function, BiConsumer<OperationChannel, Throwable> biConsumer) {
        OperationChannel operationChannel = new OperationChannel(socketAddress, channelInitializer, function, biConsumer);
        operationChannel.attemptConnect();
        return operationChannel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<Void> attemptConnect() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> compareAndExchange = this.attemptedConnect.compareAndExchange(null, completableFuture);
        if (compareAndExchange != null) {
            return compareAndExchange;
        }
        this.channel = null;
        ChannelFuture createChannel = this.newChannelInvoker.createChannel();
        createChannel.addListener(future -> {
            if (!future.isSuccess()) {
                Throwable cause = future.cause();
                log.tracef("Connection attempt to %s encountered exception for %s", this.address, cause);
                completableFuture.completeExceptionally(cause);
                this.attemptedConnect.compareAndSet(completableFuture, null);
                this.connectionFailureListener.accept(this, new TransportException(cause, this.address));
                return;
            }
            Channel channel = createChannel.channel();
            if (!$assertionsDisabled && !channel.eventLoop().inEventLoop()) {
                throw new AssertionError();
            }
            channel.attr(OPERATION_CHANNEL_ATTRIBUTE_KEY).set(this);
            this.channel = channel;
            this.headerDecoder = channel.pipeline().get(HeaderDecoder.class);
            this.codec = this.headerDecoder.getConfiguration().version().getCodec();
            this.connectionFailureListener.accept(this, null);
            this.channel.pipeline().fireUserEventTriggered(ActivationHandler.ACTIVATION_EVENT);
            log.tracef("OperationChannel %s connect complete to %s", this, channel);
        });
        return completableFuture;
    }

    public void setCodec(Codec codec) {
        if (!$assertionsDisabled && !this.channel.eventLoop().inEventLoop()) {
            throw new AssertionError();
        }
        this.codec = codec;
    }

    public boolean isAcceptingRequests() {
        CompletableFuture<Void> completableFuture = this.attemptedConnect.get();
        return completableFuture != null && completableFuture.isDone();
    }

    public void markAcceptingRequests() {
        this.channel.eventLoop().submit(() -> {
            this.attemptedConnect.get().complete(null);
            this.acceptingRequests = true;
            sendOperations();
        });
    }

    public void forceSendOperation(HotRodOperation<?> hotRodOperation) {
        if (!this.channel.eventLoop().inEventLoop()) {
            throw new IllegalArgumentException("Force sent operation " + String.valueOf(hotRodOperation) + " are required to be sent in the event loop only " + String.valueOf(this.channel.eventLoop()));
        }
        log.tracef("Immediately sending operation %s to channel %s", hotRodOperation, this.channel);
        long registerOperation = this.headerDecoder.registerOperation(hotRodOperation);
        Channel channel = this.channel;
        ByteBuf buffer = channel.alloc().buffer();
        Codec codec = this.codec.isUnsafeForTheHandshake() ? ProtocolVersion.SAFE_HANDSHAKE_PROTOCOL_VERSION.getCodec() : this.codec;
        codec.writeHeader(buffer, registerOperation, this.currentCacheTopologyFunction.apply(hotRodOperation.getCacheName()), hotRodOperation);
        hotRodOperation.writeOperationRequest(channel, buffer, codec);
        channel.writeAndFlush(buffer, channel.voidPromise());
    }

    public void sendOperation(HotRodOperation<?> hotRodOperation) {
        this.queue.offer(hotRodOperation);
        Channel channel = this.channel;
        if (channel != null) {
            log.tracef("Enqueued operation %s to send to channel %s", hotRodOperation, channel);
            channel.eventLoop().execute(this.SEND_OPERATIONS);
        } else {
            log.tracef("Enqueued operation %s to send to address %s when connected", hotRodOperation, this.address);
            attemptConnect();
        }
    }

    public Iterable<HotRodOperation<?>> reconnect(Throwable th) {
        if (!$assertionsDisabled && this.channel != null && !this.channel.eventLoop().inEventLoop()) {
            throw new AssertionError();
        }
        Channel channel = this.channel;
        this.channel = null;
        CompletableFuture<Void> andSet = this.attemptedConnect.getAndSet(null);
        if (andSet != null) {
            andSet.completeExceptionally(th);
        }
        if (this.acceptingRequests) {
            log.tracef("Attempting to reconnect channel %s after exception %s", channel, th);
            this.acceptingRequests = false;
            attemptConnect();
        } else {
            log.tracef("Channel %s was never fully accepted, not reconnecting after exception %s", channel, th);
        }
        ArrayList arrayList = new ArrayList();
        MpscUnboundedArrayQueue<HotRodOperation<?>> mpscUnboundedArrayQueue = this.queue;
        Objects.requireNonNull(arrayList);
        mpscUnboundedArrayQueue.drain((v1) -> {
            r1.add(v1);
        }, Integer.MAX_VALUE);
        return arrayList;
    }

    private void sendOperations() {
        Channel channel = this.channel;
        if (!$assertionsDisabled && channel != null && !channel.eventLoop().inEventLoop()) {
            throw new AssertionError();
        }
        if (!this.acceptingRequests || this.queue.isEmpty() || channel == null) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.tracef("OperationChannel %s Sending commands: %s enqueue to send to channel %s", this, Integer.valueOf(this.queue.size()), channel);
        }
        this.queue.drain(this, 256);
        if (this.buffer != null && this.buffer.isReadable()) {
            log.tracef("Flushing commands to channel %s", channel);
            channel.writeAndFlush(this.buffer, channel.voidPromise());
            this.buffer = null;
        }
        if (log.isTraceEnabled()) {
            log.tracef("Queue size after: %s", this.queue.size());
        }
        if (this.queue.isEmpty()) {
            return;
        }
        log.tracef("Resubmitting as more operations in queue after sending", new Object[0]);
        channel.eventLoop().execute(this.SEND_OPERATIONS);
    }

    public SocketAddress getAddress() {
        return this.address;
    }

    public void accept(HotRodOperation<?> hotRodOperation) {
        try {
            if (this.buffer == null) {
                this.buffer = this.channel.alloc().buffer();
            }
            this.codec.writeHeader(this.buffer, this.headerDecoder.registerOperation(hotRodOperation), this.currentCacheTopologyFunction.apply(hotRodOperation.getCacheName()), hotRodOperation);
            hotRodOperation.writeOperationRequest(this.channel, this.buffer, this.codec);
        } catch (Throwable th) {
            log.tracef(th, "Encountered exception while attempting to write to channel %s", this.channel);
        }
    }

    public Queue<HotRodOperation<?>> pendingChannelOperations() {
        return this.queue;
    }

    public List<HotRodOperation<?>> close() {
        this.acceptingRequests = false;
        CompletableFuture<Void> andSet = this.attemptedConnect.getAndSet(null);
        if (andSet != null) {
            andSet.complete(null);
        }
        if (this.channel != null) {
            this.channel.close();
        }
        ArrayList arrayList = new ArrayList();
        MpscUnboundedArrayQueue<HotRodOperation<?>> mpscUnboundedArrayQueue = this.queue;
        Objects.requireNonNull(arrayList);
        mpscUnboundedArrayQueue.drain((v1) -> {
            r1.add(v1);
        }, Integer.MAX_VALUE);
        return arrayList;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public String toString() {
        return "OperationChannel{address=" + String.valueOf(this.address) + ", queue.size=" + this.queue.size() + ", channel=" + String.valueOf(this.channel) + ", acceptingRequests=" + this.acceptingRequests + "}";
    }

    static {
        $assertionsDisabled = !OperationChannel.class.desiredAssertionStatus();
        log = LogFactory.getLog(OperationChannel.class);
        OPERATION_CHANNEL_ATTRIBUTE_KEY = AttributeKey.newInstance("hotrod-operation");
    }
}
