package org.apache.paimon.service.client;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.query.QueryLocation;
import org.apache.paimon.service.exceptions.UnknownPartitionBucketException;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.NetworkClient;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.utils.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/service/client/KvQueryClient.class */
public class KvQueryClient {
    private static final Logger LOG = LoggerFactory.getLogger(KvQueryClient.class);
    private final NetworkClient<KvRequest, KvResponse> networkClient;
    private final QueryLocation queryLocation;

    public KvQueryClient(QueryLocation queryLocation, int i) {
        this.queryLocation = queryLocation;
        this.networkClient = new NetworkClient<>("Kv Query Client", i, new MessageSerializer(new KvRequest.KvRequestDeserializer(), new KvResponse.KvResponseDeserializer()), new DisabledServiceRequestStats());
    }

    public CompletableFuture<BinaryRow[]> getValues(BinaryRow binaryRow, int i, BinaryRow[] binaryRowArr) {
        CompletableFuture<BinaryRow[]> completableFuture = new CompletableFuture<>();
        executeActionAsync(completableFuture, new KvRequest(binaryRow, i, binaryRowArr), false);
        return completableFuture;
    }

    private void executeActionAsync(CompletableFuture<BinaryRow[]> completableFuture, KvRequest kvRequest, boolean z) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<KvResponse> response = getResponse(kvRequest, z);
        response.whenCompleteAsync((kvResponse, th) -> {
            if (th == null) {
                completableFuture.complete(kvResponse.values());
            } else if (!(th instanceof UnknownPartitionBucketException) && !(th.getCause() instanceof ConnectException)) {
                completableFuture.completeExceptionally(th);
            } else {
                LOG.debug("Retrying after failing to retrieve state due to: {}.", th.getMessage());
                executeActionAsync(completableFuture, kvRequest, true);
            }
        });
        completableFuture.whenComplete((binaryRowArr, th2) -> {
            response.cancel(false);
        });
    }

    private CompletableFuture<KvResponse> getResponse(KvRequest kvRequest, boolean z) {
        InetSocketAddress location = this.queryLocation.getLocation(kvRequest.partition(), kvRequest.bucket(), z);
        return location == null ? FutureUtils.completedExceptionally(new RuntimeException("Cannot find address for bucket: " + kvRequest.bucket())) : this.networkClient.sendRequest(location, kvRequest);
    }

    public void shutdown() {
        try {
            shutdownFuture().get(60L, TimeUnit.SECONDS);
            LOG.info("{} was shutdown successfully.", this.networkClient.getClientName());
        } catch (Exception e) {
            LOG.warn(String.format("%s shutdown failed.", this.networkClient.getClientName()), e);
        }
    }

    public CompletableFuture<Void> shutdownFuture() {
        return this.networkClient.shutdown();
    }
}
