package org.apache.beam.runners.fnexecution.environment;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.ProcessManager;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.RemoteEnvironmentOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.class */
public class ProcessEnvironmentFactory implements EnvironmentFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
    private final ProcessManager processManager;
    private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
    private final ControlClientPool.Source clientSource;
    private final PipelineOptions pipelineOptions;

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory$Provider.class */
    public static class Provider implements EnvironmentFactory.Provider {
        private final PipelineOptions pipelineOptions;

        public Provider(PipelineOptions pipelineOptions) {
            this.pipelineOptions = pipelineOptions;
        }

        @Override // org.apache.beam.runners.fnexecution.environment.EnvironmentFactory.Provider
        public EnvironmentFactory createEnvironmentFactory(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4, ControlClientPool controlClientPool, IdGenerator idGenerator) {
            return ProcessEnvironmentFactory.create(ProcessManager.create(), grpcFnServer4, controlClientPool.getSource(), idGenerator, this.pipelineOptions);
        }
    }

    public static ProcessEnvironmentFactory create(ProcessManager processManager, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer, ControlClientPool.Source source, IdGenerator idGenerator, PipelineOptions pipelineOptions) {
        return new ProcessEnvironmentFactory(processManager, grpcFnServer, source, pipelineOptions);
    }

    private ProcessEnvironmentFactory(ProcessManager processManager, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer, ControlClientPool.Source source, PipelineOptions pipelineOptions) {
        this.processManager = processManager;
        this.provisioningServiceServer = grpcFnServer;
        this.clientSource = source;
        this.pipelineOptions = pipelineOptions;
    }

    @Override // org.apache.beam.runners.fnexecution.environment.EnvironmentFactory
    public RemoteEnvironment createEnvironment(RunnerApi.Environment environment, String str) throws Exception {
        Preconditions.checkState(environment.getUrn().equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.PROCESS)), "The passed environment does not contain a ProcessPayload.");
        RunnerApi.ProcessPayload parseFrom = RunnerApi.ProcessPayload.parseFrom(environment.getPayload());
        String command = parseFrom.getCommand();
        String url = this.provisioningServiceServer.getApiServiceDescriptor().getUrl();
        String semiPersistDir = this.pipelineOptions.as(RemoteEnvironmentOptions.class).getSemiPersistDir();
        ImmutableList.Builder add = ImmutableList.builder().add(String.format("--id=%s", str)).add(String.format("--provision_endpoint=%s", url));
        if (semiPersistDir != null) {
            add.add(String.format("--semi_persist_dir=%s", semiPersistDir));
        }
        LOG.debug("Creating Process for worker ID {}", str);
        InstructionRequestHandler instructionRequestHandler = null;
        try {
            ProcessManager.RunningProcess startProcess = this.processManager.startProcess(str, command, add.build(), parseFrom.getEnvMap());
            while (instructionRequestHandler == null) {
                try {
                    startProcess.isAliveOrThrow();
                    instructionRequestHandler = this.clientSource.take(str, Duration.ofSeconds(5L));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (TimeoutException e2) {
                    LOG.info("Still waiting for startup of environment '{}' for worker id {}", parseFrom.getCommand(), str);
                }
            }
            return ProcessEnvironment.create(this.processManager, environment, str, instructionRequestHandler);
        } catch (Exception e3) {
            try {
                this.processManager.stopProcess(str);
            } catch (Exception e4) {
                e3.addSuppressed(e4);
            }
            throw e3;
        }
    }
}
