package io.envoyproxy.controlplane.server;

import io.envoyproxy.controlplane.cache.DeltaResponse;
import io.envoyproxy.controlplane.cache.DeltaWatch;
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.VersionedResource;
import io.envoyproxy.controlplane.server.exception.RequestException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.class */
public abstract class DeltaDiscoveryRequestStreamObserver<V, X, Y> implements StreamObserver<V> {
    private static final AtomicLongFieldUpdater<DeltaDiscoveryRequestStreamObserver> streamNonceUpdater = AtomicLongFieldUpdater.newUpdater(DeltaDiscoveryRequestStreamObserver.class, "streamNonce");
    private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
    final long streamId;
    private final String defaultTypeUrl;
    private final StreamObserver<X> responseObserver;
    private final Executor executor;
    final DiscoveryServer<?, ?, V, X, Y> discoveryServer;
    private volatile boolean isClosing;
    private volatile long streamNonce = 0;
    volatile boolean hasClusterChanged = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeltaDiscoveryRequestStreamObserver(String str, StreamObserver<X> streamObserver, long j, Executor executor, DiscoveryServer<?, ?, V, X, Y> discoveryServer) {
        this.defaultTypeUrl = str;
        this.responseObserver = streamObserver;
        this.streamId = j;
        this.executor = executor;
        this.discoveryServer = discoveryServer;
    }

    public void onNext(V v) {
        DeltaXdsRequest wrapDeltaXdsRequest = this.discoveryServer.wrapDeltaXdsRequest(v);
        String typeUrl = wrapDeltaXdsRequest.getTypeUrl().isEmpty() ? this.defaultTypeUrl : wrapDeltaXdsRequest.getTypeUrl();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}] request {}[{}] with nonce {} from versions {}", new Object[]{Long.valueOf(this.streamId), typeUrl, String.join(", ", wrapDeltaXdsRequest.getResourceNamesSubscribeList()), wrapDeltaXdsRequest.getResponseNonce(), wrapDeltaXdsRequest.getInitialResourceVersionsMap()});
        }
        try {
            this.discoveryServer.runStreamDeltaRequestCallbacks(this.streamId, v);
            String latestVersion = latestVersion(typeUrl) == null ? "" : latestVersion(typeUrl);
            updateSubscriptions(typeUrl, wrapDeltaXdsRequest.getResourceNamesSubscribeList(), wrapDeltaXdsRequest.getResourceNamesUnsubscribeList());
            if (!wrapDeltaXdsRequest.getResponseNonce().isEmpty()) {
                LatestDeltaDiscoveryResponse clearResponse = clearResponse(typeUrl, wrapDeltaXdsRequest.getResponseNonce());
                if (!wrapDeltaXdsRequest.hasErrorDetail()) {
                    updateTrackedResources(typeUrl, clearResponse.resourceVersions(), clearResponse.removedResources());
                }
            }
            if (responseCount(typeUrl) == 0) {
                String str = latestVersion;
                computeWatch(typeUrl, () -> {
                    return this.discoveryServer.configWatcher.createDeltaWatch(wrapDeltaXdsRequest, str, resourceVersions(typeUrl), pendingResources(typeUrl), isWildcard(typeUrl), deltaResponse -> {
                        this.executor.execute(() -> {
                            send(deltaResponse, typeUrl);
                        });
                    }, this.hasClusterChanged);
                });
            }
        } catch (RequestException e) {
            onError(e);
        }
    }

    public void onError(Throwable th) {
        if (!Status.fromThrowable(th).getCode().equals(Status.CANCELLED.getCode())) {
            LOGGER.error("[{}] stream closed with error", Long.valueOf(this.streamId), th);
        }
        try {
            this.discoveryServer.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamCloseWithError(this.streamId, this.defaultTypeUrl, th);
            });
            closeWithError(Status.fromThrowable(th).asException());
        } finally {
            cancel();
        }
    }

    public void onCompleted() {
        LOGGER.debug("[{}] stream closed", Long.valueOf(this.streamId));
        try {
            this.discoveryServer.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamClose(this.streamId, this.defaultTypeUrl);
            });
            synchronized (this.responseObserver) {
                if (!this.isClosing) {
                    this.isClosing = true;
                    this.responseObserver.onCompleted();
                }
            }
        } finally {
            cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCancelled() {
        LOGGER.info("[{}] stream cancelled", Long.valueOf(this.streamId));
        cancel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeWithError(Throwable th) {
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                this.isClosing = true;
                this.responseObserver.onError(th);
            }
        }
        cancel();
    }

    private void send(DeltaResponse deltaResponse, String str) {
        String l = Long.toString(streamNonceUpdater.getAndIncrement(this));
        X makeDeltaResponse = this.discoveryServer.makeDeltaResponse(str, deltaResponse.version(), l, (List) deltaResponse.resources().entrySet().stream().map(entry -> {
            return this.discoveryServer.makeResource((String) entry.getKey(), ((VersionedResource) entry.getValue()).version(), this.discoveryServer.protoResourcesSerializer.serialize(((VersionedResource) entry.getValue()).resource(), Resources.getResourceApiVersion(str)));
        }).collect(Collectors.toList()), deltaResponse.removedResources());
        LOGGER.debug("[{}] response {} with nonce {} version {}", new Object[]{Long.valueOf(this.streamId), str, l, deltaResponse.version()});
        this.discoveryServer.runStreamDeltaResponseCallbacks(this.streamId, deltaResponse.request(), makeDeltaResponse);
        setResponse(str, l, LatestDeltaDiscoveryResponse.create(l, deltaResponse.version(), (Map) deltaResponse.resources().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return ((VersionedResource) entry2.getValue()).version();
        })), deltaResponse.removedResources()));
        setLatestVersion(str, deltaResponse.version());
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                try {
                    this.responseObserver.onNext(makeDeltaResponse);
                } catch (StatusRuntimeException e) {
                    if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
                        throw e;
                    }
                }
            }
        }
    }

    abstract void cancel();

    abstract boolean ads();

    abstract void setLatestVersion(String str, String str2);

    abstract String latestVersion(String str);

    abstract void setResponse(String str, String str2, LatestDeltaDiscoveryResponse latestDeltaDiscoveryResponse);

    abstract LatestDeltaDiscoveryResponse clearResponse(String str, String str2);

    abstract int responseCount(String str);

    abstract Map<String, String> resourceVersions(String str);

    abstract Set<String> pendingResources(String str);

    abstract boolean isWildcard(String str);

    abstract void updateTrackedResources(String str, Map<String, String> map, List<String> list);

    abstract void updateSubscriptions(String str, List<String> list, List<String> list2);

    abstract void computeWatch(String str, Supplier<DeltaWatch> supplier);
}
