package org.apache.beam.runners.fnexecution.state;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/state/GrpcStateService.class */
public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase implements StateDelegator, FnService {
    private final ConcurrentMap<String, StateRequestHandler> requestHandlers = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<Inbound> clients = new ConcurrentLinkedQueue<>();

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/GrpcStateService$Inbound.class */
    private class Inbound implements StreamObserver<BeamFnApi.StateRequest> {
        private final StreamObserver<BeamFnApi.StateResponse> outboundObserver;

        Inbound(StreamObserver<BeamFnApi.StateResponse> streamObserver) {
            this.outboundObserver = streamObserver;
        }

        public void onNext(BeamFnApi.StateRequest stateRequest) {
            try {
                ((StateRequestHandler) GrpcStateService.this.requestHandlers.getOrDefault(stateRequest.getInstructionId(), this::handlerNotFound)).handle(stateRequest).whenComplete((builder, th) -> {
                    this.outboundObserver.onNext(th == null ? builder.setId(stateRequest.getId()).build() : createErrorResponse(stateRequest.getId(), th));
                });
            } catch (Exception e) {
                this.outboundObserver.onNext(createErrorResponse(stateRequest.getId(), e));
            }
        }

        public void onError(Throwable th) {
            this.outboundObserver.onCompleted();
        }

        public void onCompleted() {
            this.outboundObserver.onCompleted();
        }

        private CompletionStage<BeamFnApi.StateResponse.Builder> handlerNotFound(BeamFnApi.StateRequest stateRequest) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(BeamFnApi.StateResponse.newBuilder().setError(String.format("Unknown process bundle instruction id '%s'", stateRequest.getInstructionId())));
            return completableFuture;
        }

        private BeamFnApi.StateResponse createErrorResponse(String str, Throwable th) {
            return BeamFnApi.StateResponse.newBuilder().setId(str).setError(Throwables.getStackTraceAsString(th)).build();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/state/GrpcStateService$Registration.class */
    private class Registration implements StateDelegator.Registration {
        private final String processBundleInstructionId;

        private Registration(String str) {
            this.processBundleInstructionId = str;
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
        public void deregister() {
            GrpcStateService.this.requestHandlers.remove(this.processBundleInstructionId);
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
        public void abort() {
            deregister();
        }
    }

    public static GrpcStateService create() {
        return new GrpcStateService();
    }

    private GrpcStateService() {
    }

    @Override // org.apache.beam.sdk.fn.server.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
        Exception exc = null;
        Iterator<Inbound> it = this.clients.iterator();
        while (it.hasNext()) {
            Inbound next = it.next();
            try {
                if (!(next.outboundObserver instanceof ServerCallStreamObserver) || !next.outboundObserver.isCancelled()) {
                    next.outboundObserver.onCompleted();
                }
            } catch (Exception e) {
                if (exc == null) {
                    exc = e;
                } else {
                    exc.addSuppressed(e);
                }
            }
        }
        if (exc != null) {
            throw exc;
        }
    }

    public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> streamObserver) {
        Inbound inbound = new Inbound(streamObserver);
        this.clients.add(inbound);
        return inbound;
    }

    @Override // org.apache.beam.runners.fnexecution.state.StateDelegator
    public StateDelegator.Registration registerForProcessBundleInstructionId(String str, StateRequestHandler stateRequestHandler) {
        this.requestHandlers.putIfAbsent(str, stateRequestHandler);
        return new Registration(str);
    }
}
