package org.apache.nifi.processors.gettcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:org/apache/nifi/processors/gettcp/ReceivingClient.class */
class ReceivingClient extends AbstractSocketHandler {
    private final ScheduledExecutorService connectionScheduler;
    private volatile int reconnectAttempts;
    private volatile long delayMillisBeforeReconnect;
    private volatile MessageHandler messageHandler;
    private volatile InetSocketAddress connectedAddress;

    public ReceivingClient(InetSocketAddress inetSocketAddress, ScheduledExecutorService scheduledExecutorService, int i, byte b) {
        super(inetSocketAddress, i, b);
        this.connectionScheduler = scheduledExecutorService;
    }

    public void setReconnectAttempts(int i) {
        this.reconnectAttempts = i;
    }

    public void setDelayMillisBeforeReconnect(long j) {
        this.delayMillisBeforeReconnect = j;
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override // org.apache.nifi.processors.gettcp.AbstractSocketHandler
    InetSocketAddress connect() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicReference atomicReference = new AtomicReference();
        this.connectionScheduler.execute(new Runnable() { // from class: org.apache.nifi.processors.gettcp.ReceivingClient.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ReceivingClient.this.rootChannel = ReceivingClient.this.doConnect(ReceivingClient.this.address);
                    countDownLatch.countDown();
                    ReceivingClient.this.connectedAddress = ReceivingClient.this.address;
                } catch (Exception e) {
                    if (ReceivingClient.this.logger.isInfoEnabled()) {
                        ReceivingClient.this.logger.info("Failed to connect to primary endpoint '" + ReceivingClient.this.address + "'.");
                    }
                    if (atomicInteger.incrementAndGet() <= ReceivingClient.this.reconnectAttempts) {
                        if (ReceivingClient.this.logger.isInfoEnabled()) {
                            ReceivingClient.this.logger.info("Will attempt to reconnect to '" + ReceivingClient.this.address + "'.");
                        }
                        ReceivingClient.this.connectionScheduler.schedule(this, ReceivingClient.this.delayMillisBeforeReconnect, TimeUnit.MILLISECONDS);
                    } else {
                        atomicReference.set(e);
                        ReceivingClient.this.logger.error("Failed to connect to secondary endpoint.");
                        countDownLatch.countDown();
                    }
                }
            }
        });
        try {
            if (!countDownLatch.await((this.reconnectAttempts * this.delayMillisBeforeReconnect) + 2000, TimeUnit.MILLISECONDS)) {
                this.logger.error("Exceeded wait time to connect. Possible deadlock, please report!. Interrupting.");
            } else if (atomicReference.get() != null) {
                throw ((Exception) atomicReference.get());
            }
            return this.connectedAddress;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Current thread is interrupted");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SocketChannel doConnect(InetSocketAddress inetSocketAddress) throws IOException {
        SocketChannel open = SocketChannel.open();
        if (!open.connect(inetSocketAddress)) {
            throw new IllegalStateException("Failed to connect to Server at: " + inetSocketAddress);
        }
        open.configureBlocking(false);
        open.register(this.selector, 1);
        return open;
    }

    @Override // org.apache.nifi.processors.gettcp.AbstractSocketHandler
    void processData(SelectionKey selectionKey, ByteBuffer byteBuffer) throws IOException {
        byte[] bArr = new byte[byteBuffer.limit()];
        this.logger.debug("Received message(size=" + bArr.length + ")");
        byteBuffer.get(bArr);
        boolean z = false;
        if (bArr[bArr.length - 1] != this.endOfMessageByte) {
            z = true;
            selectionKey.attach(1);
        } else {
            Integer num = (Integer) selectionKey.attachment();
            if (num != null && num.intValue() == 1) {
                z = true;
                selectionKey.attach(0);
            }
        }
        if (this.messageHandler != null) {
            this.messageHandler.handle(this.connectedAddress, bArr, z);
        }
    }
}
