package org.apache.beam.runners.spark.structuredstreaming;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.core.metrics.NoOpMetricsSink;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.SparkSessionFactory;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.util.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.spark.SparkEnv$;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.class */
public final class SparkStructuredStreamingRunner extends PipelineRunner<SparkStructuredStreamingPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkStructuredStreamingRunner.class);
    private final SparkStructuredStreamingPipelineOptions options;

    public static SparkStructuredStreamingRunner create() {
        SparkStructuredStreamingPipelineOptions sparkStructuredStreamingPipelineOptions = (SparkStructuredStreamingPipelineOptions) PipelineOptionsFactory.as(SparkStructuredStreamingPipelineOptions.class);
        sparkStructuredStreamingPipelineOptions.setRunner(SparkStructuredStreamingRunner.class);
        return new SparkStructuredStreamingRunner(sparkStructuredStreamingPipelineOptions);
    }

    public static SparkStructuredStreamingRunner create(SparkStructuredStreamingPipelineOptions sparkStructuredStreamingPipelineOptions) {
        return new SparkStructuredStreamingRunner(sparkStructuredStreamingPipelineOptions);
    }

    public static SparkStructuredStreamingRunner fromOptions(PipelineOptions pipelineOptions) {
        return new SparkStructuredStreamingRunner((SparkStructuredStreamingPipelineOptions) PipelineOptionsValidator.validate(SparkStructuredStreamingPipelineOptions.class, pipelineOptions));
    }

    private SparkStructuredStreamingRunner(SparkStructuredStreamingPipelineOptions sparkStructuredStreamingPipelineOptions) {
        this.options = sparkStructuredStreamingPipelineOptions;
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SparkStructuredStreamingPipelineResult m57run(Pipeline pipeline) {
        MetricsEnvironment.setMetricsSupported(true);
        MetricsAccumulator.clear();
        LOG.info("*** SparkStructuredStreamingRunner is based on spark structured streaming framework and is no more \n based on RDD/DStream API. See\n https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html\n It is still experimental, its coverage of the Beam model is partial. ***");
        PipelineTranslator.detectStreamingMode(pipeline, this.options);
        Preconditions.checkArgument(!this.options.isStreaming(), "Streaming is not supported.");
        SparkSession orCreateSession = SparkSessionFactory.getOrCreateSession(this.options);
        MetricsAccumulator metricsAccumulator = MetricsAccumulator.getInstance(orCreateSession);
        SparkStructuredStreamingPipelineResult sparkStructuredStreamingPipelineResult = new SparkStructuredStreamingPipelineResult(runAsync(() -> {
            translatePipeline(orCreateSession, pipeline).evaluate();
        }), metricsAccumulator, sparkStopFn(orCreateSession, this.options.getUseActiveSparkSession()));
        if (this.options.getEnableSparkMetricSinks().booleanValue()) {
            registerMetricsSource(this.options.getAppName(), metricsAccumulator);
        }
        startMetricsPusher(sparkStructuredStreamingPipelineResult, metricsAccumulator);
        if (this.options.getTestMode()) {
            sparkStructuredStreamingPipelineResult.waitUntilFinish();
        }
        return sparkStructuredStreamingPipelineResult;
    }

    private EvaluationContext translatePipeline(SparkSession sparkSession, Pipeline pipeline) {
        if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
        }
        if (!ExperimentalOptions.hasExperiment(this.options, "disable_projection_pushdown")) {
            ProjectionPushdownOptimizer.optimize(pipeline);
        }
        PipelineTranslator.replaceTransforms(pipeline, this.options);
        return new PipelineTranslatorBatch().translate(pipeline, sparkSession, this.options);
    }

    private void registerMetricsSource(String str, MetricsAccumulator metricsAccumulator) {
        MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
        SparkBeamMetricSource sparkBeamMetricSource = new SparkBeamMetricSource(str + ".Beam", metricsAccumulator);
        metricsSystem.removeSource(sparkBeamMetricSource);
        metricsSystem.registerSource(sparkBeamMetricSource);
    }

    private void startMetricsPusher(SparkStructuredStreamingPipelineResult sparkStructuredStreamingPipelineResult, MetricsAccumulator metricsAccumulator) {
        MetricsOptions as = this.options.as(MetricsOptions.class);
        Class metricsSink = as.getMetricsSink();
        if (metricsSink == null || metricsSink.equals(NoOpMetricsSink.class)) {
            return;
        }
        new MetricsPusher(metricsAccumulator.m63value(), as, sparkStructuredStreamingPipelineResult).start();
    }

    private static Future<?> runAsync(Runnable runnable) {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SparkStructuredStreamingRunner-thread").build());
        Future<?> submit = newSingleThreadExecutor.submit(runnable);
        newSingleThreadExecutor.shutdown();
        return submit;
    }

    @Nullable
    private static Runnable sparkStopFn(SparkSession sparkSession, boolean z) {
        if (z) {
            return null;
        }
        return () -> {
            sparkSession.stop();
        };
    }
}
