package org.apache.beam.runners.spark;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarUtils;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkStreamingPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
import org.apache.beam.sdk.util.construction.graph.TrivialNativeTransformExpander;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineRunner.class */
public class SparkPipelineRunner implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
    private final SparkPipelineOptions pipelineOptions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineRunner$SparkPipelineRunnerConfiguration.class */
    public static class SparkPipelineRunnerConfiguration {

        @Option(name = "--base-job-name", usage = "The job to run. This must correspond to a subdirectory of the jar's BEAM-PIPELINE directory. *Only needs to be specified if the jar contains multiple pipelines.*")
        @Nullable
        private String baseJobName;

        private SparkPipelineRunnerConfiguration() {
            this.baseJobName = null;
        }
    }

    public SparkPipelineRunner(SparkPipelineOptions sparkPipelineOptions) {
        this.pipelineOptions = sparkPipelineOptions;
    }

    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
        SparkPipelineResult portableBatchMode;
        boolean z = this.pipelineOptions.isStreaming() || PipelineTranslatorUtils.hasUnboundedPCollections(pipeline);
        SparkPortablePipelineTranslator sparkStreamingPortablePipelineTranslator = z ? new SparkStreamingPortablePipelineTranslator() : new SparkBatchPortablePipelineTranslator();
        RunnerApi.Pipeline forKnownUrns = TrivialNativeTransformExpander.forKnownUrns(ProtoOverrides.updateTransform("beam:transform:pardo:v1", pipeline, SplittableParDoExpander.createSizedReplacement()), sparkStreamingPortablePipelineTranslator.knownUrns());
        RunnerApi.Pipeline pipeline2 = forKnownUrns.getComponents().getTransformsMap().values().stream().anyMatch(pTransform -> {
            return "beam:runner:executable_stage:v1".equals(pTransform.getSpec().getUrn());
        }) ? forKnownUrns : GreedyPipelineFuser.fuse(forKnownUrns).toPipeline();
        SparkCommonPipelineOptions.prepareFilesToStage(this.pipelineOptions);
        JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.pipelineOptions);
        MetricsEnvironment.setMetricsSupported(true);
        MetricsAccumulator.init(this.pipelineOptions, sparkContext);
        SparkTranslationContext createTranslationContext = sparkStreamingPortablePipelineTranslator.createTranslationContext(sparkContext, this.pipelineOptions, jobInfo);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DefaultSparkRunner-thread").build());
        LOG.info("Running job {} on Spark master {}", jobInfo.jobId(), sparkContext.master());
        if (z) {
            JavaStreamingContext streamingContext = ((SparkStreamingTranslationContext) createTranslationContext).getStreamingContext();
            streamingContext.addStreamingListener(new JavaStreamingListenerWrapper(new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
            for (JavaStreamingListener javaStreamingListener : ((SparkContextOptions) this.pipelineOptions.as(SparkContextOptions.class)).getListeners()) {
                LOG.info("Registered listener {}." + javaStreamingListener.getClass().getSimpleName());
                streamingContext.addStreamingListener(new JavaStreamingListenerWrapper(javaStreamingListener));
            }
            streamingContext.addStreamingListener(new JavaStreamingListenerWrapper(new GlobalWatermarkHolder.WatermarkAdvancingStreamingListener()));
            streamingContext.checkpoint(this.pipelineOptions.getCheckpointDir());
            Long streamingTimeoutMs = ((SparkPortableStreamingPipelineOptions) this.pipelineOptions.as(SparkPortableStreamingPipelineOptions.class)).getStreamingTimeoutMs();
            SparkPortablePipelineTranslator sparkPortablePipelineTranslator = sparkStreamingPortablePipelineTranslator;
            portableBatchMode = new SparkPipelineResult.PortableStreamingMode(newSingleThreadExecutor.submit(() -> {
                sparkPortablePipelineTranslator.translate(pipeline2, createTranslationContext);
                LOG.info("Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
                createTranslationContext.computeOutputs();
                streamingContext.start();
                try {
                    streamingContext.awaitTerminationOrTimeout(streamingTimeoutMs.longValue());
                } catch (InterruptedException e) {
                    LOG.warn("Streaming context interrupted, shutting down.", e);
                }
                streamingContext.stop();
                LOG.info("Job {} finished.", jobInfo.jobId());
            }), streamingContext);
        } else {
            SparkPortablePipelineTranslator sparkPortablePipelineTranslator2 = sparkStreamingPortablePipelineTranslator;
            portableBatchMode = new SparkPipelineResult.PortableBatchMode(newSingleThreadExecutor.submit(() -> {
                sparkPortablePipelineTranslator2.translate(pipeline2, createTranslationContext);
                LOG.info("Job {}: Pipeline translated successfully. Computing outputs", jobInfo.jobId());
                createTranslationContext.computeOutputs();
                LOG.info("Job {} finished.", jobInfo.jobId());
            }), sparkContext);
        }
        newSingleThreadExecutor.shutdown();
        portableBatchMode.waitUntilFinish();
        new MetricsPusher(MetricsAccumulator.getInstance().m26value(), this.pipelineOptions.as(MetricsOptions.class), portableBatchMode).start();
        return portableBatchMode;
    }

    public static void main(String[] strArr) throws Exception {
        FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
        SparkPipelineRunnerConfiguration parseArgs = parseArgs(strArr);
        String defaultJobName = parseArgs.baseJobName == null ? PortablePipelineJarUtils.getDefaultJobName() : parseArgs.baseJobName;
        Preconditions.checkArgument(defaultJobName != null, "No default job name found. Job name must be set using --base-job-name.");
        RunnerApi.Pipeline pipelineFromClasspath = PortablePipelineJarUtils.getPipelineFromClasspath(defaultJobName);
        Struct pipelineOptionsFromClasspath = PortablePipelineJarUtils.getPipelineOptionsFromClasspath(defaultJobName);
        String str = (String) ArtifactApi.CommitManifestResponse.Constants.NO_ARTIFACTS_STAGED_TOKEN.getValueDescriptor().getOptions().getExtension(RunnerApi.beamConstant);
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) PipelineOptionsTranslation.fromProto(pipelineOptionsFromClasspath).as(SparkPipelineOptions.class);
        String format = String.format("%s_%s", sparkPipelineOptions.getJobName(), UUID.randomUUID().toString());
        if (sparkPipelineOptions.getAppName() == null) {
            LOG.debug("App name was null. Using invocationId {}", format);
            sparkPipelineOptions.setAppName(format);
        }
        try {
            new SparkPipelineRunner(sparkPipelineOptions).run(pipelineFromClasspath, JobInfo.create(format, sparkPipelineOptions.getJobName(), str, PipelineOptionsTranslation.toProto(sparkPipelineOptions)));
            LOG.info("Job {} finished successfully.", format);
        } catch (Exception e) {
            throw new RuntimeException(String.format("Job %s failed.", format), e);
        }
    }

    private static SparkPipelineRunnerConfiguration parseArgs(String[] strArr) {
        SparkPipelineRunnerConfiguration sparkPipelineRunnerConfiguration = new SparkPipelineRunnerConfiguration();
        CmdLineParser cmdLineParser = new CmdLineParser(sparkPipelineRunnerConfiguration);
        try {
            cmdLineParser.parseArgument(strArr);
            return sparkPipelineRunnerConfiguration;
        } catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", e);
            cmdLineParser.printUsage(System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
    }
}
