package org.apache.druid.messages.client;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.messages.MessageBatch;
import org.apache.druid.rpc.ServiceClosedException;
import org.apache.druid.server.DruidNode;

/* loaded from: input_file:org/apache/druid/messages/client/MessageRelay.class */
public class MessageRelay<MessageType> implements Closeable {
    private static final Logger log = new Logger(MessageRelay.class);
    public static final long INIT = -1;
    private final String selfHost;
    private final DruidNode serverNode;
    private final MessageRelayClient<MessageType> client;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final MessageRelay<MessageType>.Collector collector;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/messages/client/MessageRelay$Collector.class */
    public class Collector {
        private final MessageListener<MessageType> listener;
        private final AtomicLong epoch = new AtomicLong(-1);
        private final AtomicLong watermark = new AtomicLong(-1);
        private final AtomicReference<ListenableFuture<?>> currentCall = new AtomicReference<>();

        public Collector(MessageListener<MessageType> messageListener) {
            this.listener = messageListener;
        }

        private void start() {
            if (!this.watermark.compareAndSet(-1L, 0L)) {
                throw new ISE("Already started", new Object[0]);
            }
            this.listener.serverAdded(MessageRelay.this.serverNode);
            issueNextGetMessagesCall();
        }

        private void issueNextGetMessagesCall() {
            if (MessageRelay.this.closed.get()) {
                return;
            }
            final long j = this.epoch.get();
            final long j2 = this.watermark.get();
            MessageRelay.log.debug("Getting messages from server[%s] for client[%s] (current state: epoch[%s] watermark[%s]).", new Object[]{MessageRelay.this.serverNode.getHostAndPortToUse(), MessageRelay.this.selfHost, Long.valueOf(j), Long.valueOf(j2)});
            final ListenableFuture<MessageBatch<MessageType>> messages = MessageRelay.this.client.getMessages(MessageRelay.this.selfHost, j, j2);
            if (this.currentCall.compareAndSet(null, messages)) {
                Futures.addCallback(messages, new FutureCallback<MessageBatch<MessageType>>() { // from class: org.apache.druid.messages.client.MessageRelay.Collector.1
                    public void onSuccess(MessageBatch<MessageType> messageBatch) {
                        MessageRelay.log.debug("Received message batch: %s", new Object[]{messageBatch});
                        Collector.this.currentCall.compareAndSet(messages, null);
                        long startWatermark = messageBatch.getStartWatermark() + messageBatch.getMessages().size();
                        if (j == -1) {
                            Collector.this.epoch.set(messageBatch.getEpoch());
                            Collector.this.watermark.set(startWatermark);
                        } else if (Collector.this.epoch.get() != messageBatch.getEpoch() || !Collector.this.watermark.compareAndSet(messageBatch.getStartWatermark(), startWatermark)) {
                            MessageRelay.log.error("Incorrect epoch + watermark from server[%s] for client[%s] (expected[%s:%s] but got[%s:%s]). Closing collector.", new Object[]{MessageRelay.this.serverNode.getHostAndPortToUse(), MessageRelay.this.selfHost, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(messageBatch.getEpoch()), Long.valueOf(messageBatch.getStartWatermark())});
                            MessageRelay.this.close();
                            return;
                        }
                        for (MessageType messagetype : messageBatch.getMessages()) {
                            try {
                                Collector.this.listener.messageReceived(messagetype);
                            } catch (Throwable th) {
                                MessageRelay.log.warn(th, "Failed to handle message[%s] from server[%s] for client[%s].", new Object[]{messagetype, MessageRelay.this.selfHost, MessageRelay.this.serverNode.getHostAndPortToUse()});
                            }
                        }
                        Collector.this.issueNextGetMessagesCall();
                    }

                    public void onFailure(Throwable th) {
                        Collector.this.currentCall.compareAndSet(messages, null);
                        if (!(th instanceof CancellationException) && !(th instanceof ServiceClosedException)) {
                            MessageRelay.log.error(th, "Fatal error contacting server[%s] for client[%s] (current state: epoch[%s] watermark[%s]). Closing collector.", new Object[]{MessageRelay.this.serverNode.getHostAndPortToUse(), MessageRelay.this.selfHost, Long.valueOf(j), Long.valueOf(j2)});
                        }
                        MessageRelay.this.close();
                    }
                }, Execs.directExecutor());
            } else {
                MessageRelay.log.error("Fatal error: too many outgoing calls to server[%s] for client[%s] (current state: epoch[%s] watermark[%s]). Closing collector.", new Object[]{MessageRelay.this.serverNode.getHostAndPortToUse(), MessageRelay.this.selfHost, Long.valueOf(j), Long.valueOf(j2)});
                MessageRelay.this.close();
            }
        }

        public void stop() {
            ListenableFuture<?> andSet = this.currentCall.getAndSet(null);
            if (andSet != null) {
                andSet.cancel(true);
            }
            try {
                this.listener.serverRemoved(MessageRelay.this.serverNode);
            } catch (Throwable th) {
                MessageRelay.log.warn(th, "Failed to close server[%s]", new Object[]{MessageRelay.this.serverNode.getHostAndPortToUse()});
            }
        }
    }

    public MessageRelay(String str, DruidNode druidNode, MessageRelayClient<MessageType> messageRelayClient, MessageListener<MessageType> messageListener) {
        this.selfHost = str;
        this.serverNode = druidNode;
        this.client = messageRelayClient;
        this.collector = new Collector(messageListener);
    }

    public void start() {
        this.collector.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.collector.stop();
        }
    }
}
