package io.helidon.config.etcd.internal.client.v3;

import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.helidon.common.tls.Tls;
import io.helidon.config.etcd.internal.client.EtcdClient;
import io.helidon.config.etcd.internal.client.EtcdClientException;
import io.helidon.config.etcd.internal.client.proto.Event;
import io.helidon.config.etcd.internal.client.proto.KVGrpc;
import io.helidon.config.etcd.internal.client.proto.PutRequest;
import io.helidon.config.etcd.internal.client.proto.RangeRequest;
import io.helidon.config.etcd.internal.client.proto.RangeResponse;
import io.helidon.config.etcd.internal.client.proto.WatchCreateRequest;
import io.helidon.config.etcd.internal.client.proto.WatchGrpc;
import io.helidon.config.etcd.internal.client.proto.WatchRequest;
import io.helidon.config.etcd.internal.client.proto.WatchResponse;
import io.helidon.webclient.grpc.GrpcClient;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/* loaded from: input_file:io/helidon/config/etcd/internal/client/v3/EtcdV3Client.class */
public class EtcdV3Client implements EtcdClient {
    private static final Tls DISABLE_TLS = Tls.builder().enabled(false).build();
    private final Map<String, SubmissionPublisher<Long>> publishers = new ConcurrentHashMap();
    private final KVGrpc.KVBlockingStub kvStub;
    private final WatchGrpc.WatchStub watchStub;

    public EtcdV3Client(URI... uriArr) {
        if (uriArr.length != 1) {
            throw new IllegalArgumentException("EtcdV3Client only supports a single URI");
        }
        GrpcClient build = GrpcClient.builder().baseUri(uriArr[0]).tls(DISABLE_TLS).build();
        this.kvStub = KVGrpc.newBlockingStub(build.channel());
        this.watchStub = WatchGrpc.newStub(build.channel());
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient
    public Long revision(String str) throws EtcdClientException {
        try {
            RangeResponse range = this.kvStub.range(RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(str)).m3213build());
            if (range.getCount() == 0) {
                return null;
            }
            return Long.valueOf(range.getKvs(0).getModRevision());
        } catch (StatusRuntimeException e) {
            throw new EtcdClientException("Cannot retrieve a value for the key: " + str, (Throwable) e);
        }
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient
    public String get(String str) throws EtcdClientException {
        try {
            RangeResponse range = this.kvStub.range(RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(str)).m3213build());
            if (range.getCount() == 0) {
                return null;
            }
            return range.getKvs(0).getValue().toStringUtf8();
        } catch (StatusRuntimeException e) {
            throw new EtcdClientException("Cannot retrieve a value for the key: " + str, (Throwable) e);
        }
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient
    public void put(String str, String str2) throws EtcdClientException {
        this.kvStub.put(PutRequest.newBuilder().setKey(ByteString.copyFromUtf8(str)).setValue(ByteString.copyFromUtf8(str2)).m3119build());
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient
    public Flow.Publisher<Long> watch(String str, Executor executor) throws EtcdClientException {
        final SubmissionPublisher<Long> computeIfAbsent = this.publishers.computeIfAbsent(str, str2 -> {
            return new SubmissionPublisher(executor, Flow.defaultBufferSize());
        });
        StreamObserver<WatchResponse> streamObserver = new StreamObserver<WatchResponse>(this) { // from class: io.helidon.config.etcd.internal.client.v3.EtcdV3Client.1
            public void onNext(WatchResponse watchResponse) {
                List<Event> eventsList = watchResponse.getEventsList();
                SubmissionPublisher submissionPublisher = computeIfAbsent;
                eventsList.forEach(event -> {
                    submissionPublisher.submit(Long.valueOf(event.getKv().getVersion()));
                });
            }

            public void onError(Throwable th) {
                computeIfAbsent.closeExceptionally(th);
            }

            public void onCompleted() {
            }
        };
        this.watchStub.watch(streamObserver).onNext(WatchRequest.newBuilder().setCreateRequest(WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(str))).m3933build());
        return computeIfAbsent;
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient
    public Flow.Publisher<Long> watch(String str) throws EtcdClientException {
        return watch(str, (v0) -> {
            v0.run();
        });
    }

    @Override // io.helidon.config.etcd.internal.client.EtcdClient, java.lang.AutoCloseable
    public void close() {
        this.publishers.values().forEach((v0) -> {
            v0.close();
        });
    }
}
