package io.envoyproxy.controlplane.server;

import com.google.common.base.Strings;
import com.google.protobuf.Any;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.Response;
import io.envoyproxy.controlplane.cache.Watch;
import io.envoyproxy.controlplane.cache.XdsRequest;
import io.envoyproxy.controlplane.server.exception.RequestException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.class */
public abstract class DiscoveryRequestStreamObserver<T, U> implements StreamObserver<T> {
    private static final AtomicLongFieldUpdater<DiscoveryRequestStreamObserver> streamNonceUpdater = AtomicLongFieldUpdater.newUpdater(DiscoveryRequestStreamObserver.class, "streamNonce");
    private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
    final long streamId;
    final DiscoveryServer<T, U, ?, ?, ?> discoveryServer;
    private final String defaultTypeUrl;
    private final StreamObserver<U> responseObserver;
    private final Executor executor;
    private volatile boolean isClosing;
    private volatile long streamNonce = 0;
    volatile boolean hasClusterChanged = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiscoveryRequestStreamObserver(String str, StreamObserver<U> streamObserver, long j, Executor executor, DiscoveryServer<T, U, ?, ?, ?> discoveryServer) {
        this.defaultTypeUrl = str;
        this.responseObserver = streamObserver;
        this.streamId = j;
        this.executor = executor;
        this.discoveryServer = discoveryServer;
    }

    public void onNext(T t) {
        XdsRequest wrapXdsRequest = this.discoveryServer.wrapXdsRequest(t);
        String typeUrl = wrapXdsRequest.getTypeUrl().isEmpty() ? this.defaultTypeUrl : wrapXdsRequest.getTypeUrl();
        String responseNonce = wrapXdsRequest.getResponseNonce();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}] request {}[{}] with nonce {} from version {}", new Object[]{Long.valueOf(this.streamId), typeUrl, String.join(", ", (Iterable<? extends CharSequence>) wrapXdsRequest.getResourceNamesList()), responseNonce, wrapXdsRequest.getVersionInfo()});
        }
        try {
            this.discoveryServer.runStreamRequestCallbacks(this.streamId, t);
            LatestDiscoveryResponse latestResponse = latestResponse(typeUrl);
            String nonce = latestResponse == null ? null : latestResponse.nonce();
            if (Strings.isNullOrEmpty(nonce) || nonce.equals(responseNonce)) {
                if (!wrapXdsRequest.hasErrorDetail() && latestResponse != null) {
                    setAckedResources(typeUrl, latestResponse.resourceNames());
                }
                computeWatch(typeUrl, () -> {
                    return this.discoveryServer.configWatcher.createWatch(ads(), wrapXdsRequest, ackedResources(typeUrl), response -> {
                        this.executor.execute(() -> {
                            send(response, typeUrl);
                        });
                    }, this.hasClusterChanged, this.discoveryServer.startupConfigs().allowDefaultEmptyEdsUpdate());
                });
            }
        } catch (RequestException e) {
            onError(e);
        }
    }

    public void onError(Throwable th) {
        if (!Status.fromThrowable(th).getCode().equals(Status.CANCELLED.getCode())) {
            LOGGER.error("[{}] stream closed with error", Long.valueOf(this.streamId), th);
        }
        try {
            this.discoveryServer.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamCloseWithError(this.streamId, this.defaultTypeUrl, th);
            });
            closeWithError(Status.fromThrowable(th).asException());
        } finally {
            cancel();
        }
    }

    public void onCompleted() {
        LOGGER.debug("[{}] stream closed", Long.valueOf(this.streamId));
        try {
            this.discoveryServer.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamClose(this.streamId, this.defaultTypeUrl);
            });
            synchronized (this.responseObserver) {
                if (!this.isClosing) {
                    this.isClosing = true;
                    this.responseObserver.onCompleted();
                }
            }
        } finally {
            cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCancelled() {
        LOGGER.info("[{}] stream cancelled", Long.valueOf(this.streamId));
        cancel();
    }

    void closeWithError(Throwable th) {
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                this.isClosing = true;
                this.responseObserver.onError(th);
            }
        }
        cancel();
    }

    private void send(Response response, String str) {
        String l = Long.toString(streamNonceUpdater.getAndIncrement(this));
        Collection<Any> serialize = this.discoveryServer.protoResourcesSerializer.serialize(response.resources(), Resources.getResourceApiVersion(str));
        LOGGER.debug("[{}] response {} with nonce {} version {}", new Object[]{Long.valueOf(this.streamId), str, l, response.version()});
        U makeResponse = this.discoveryServer.makeResponse(response.version(), serialize, str, l);
        this.discoveryServer.runStreamResponseCallbacks(this.streamId, response.request(), makeResponse);
        setLatestResponse(str, LatestDiscoveryResponse.create(l, (Set) response.resources().stream().map(Resources::getResourceName).collect(Collectors.toSet())));
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                try {
                    this.responseObserver.onNext(makeResponse);
                } catch (StatusRuntimeException e) {
                    if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
                        throw e;
                    }
                }
            }
        }
    }

    abstract void cancel();

    abstract boolean ads();

    abstract LatestDiscoveryResponse latestResponse(String str);

    abstract void setLatestResponse(String str, LatestDiscoveryResponse latestDiscoveryResponse);

    abstract Set<String> ackedResources(String str);

    abstract void setAckedResources(String str, Set<String> set);

    abstract void computeWatch(String str, Supplier<Watch> supplier);
}
