package io.openlineage.spark.agent;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineageConfig;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.client.circuitBreaker.CircuitBreakerFactory;
import io.openlineage.client.circuitBreaker.NoOpCircuitBreaker;
import io.openlineage.client.metrics.MicrometerProvider;
import io.openlineage.client.utils.RuntimeUtils;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.api.SparkOpenLineageConfig;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Option;

/* loaded from: input_file:io/openlineage/spark/agent/OpenLineageSparkListener.class */
public class OpenLineageSparkListener extends SparkListener {
    private static ContextFactory contextFactory;
    private static final Function0<Option<SparkContext>> activeSparkContext;
    private static CircuitBreaker circuitBreaker;
    private static MeterRegistry meterRegistry;
    private static String sparkVersion;
    private final boolean isDisabled = checkIfDisabled();
    private Optional<Integer> activeJobId = Optional.empty();
    private static final Logger log = LoggerFactory.getLogger(OpenLineageSparkListener.class);
    private static final Map<Long, ExecutionContext> sparkSqlExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static final Map<Integer, ExecutionContext> rddExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static WeakHashMap<RDD<?>, Configuration> outputs = new WeakHashMap<>();
    private static JobMetricsHolder jobMetrics = JobMetricsHolder.getInstance();
    private static final Function1<SparkSession, SparkContext> sparkContextFromSession = ScalaConversionUtils.toScalaFn((v0) -> {
        return v0.sparkContext();
    });

    public static void init(ContextFactory contextFactory2) {
        contextFactory = contextFactory2;
        meterRegistry = contextFactory2.getMeterRegistry();
        clear();
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        if (this.isDisabled) {
            return;
        }
        initializeContextFactoryIfNotInitialized();
        if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
            sparkSQLExecStart((SparkListenerSQLExecutionStart) sparkListenerEvent);
        } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionEnd) {
            sparkSQLExecEnd((SparkListenerSQLExecutionEnd) sparkListenerEvent);
        }
    }

    private void sparkSQLExecStart(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        getSparkSQLExecutionContext(sparkListenerSQLExecutionStart.executionId()).ifPresent(executionContext -> {
            meterRegistry.counter("openlineage.spark.event.sql.start", new String[0]).increment();
            circuitBreaker.run(() -> {
                this.activeJobId.ifPresent(num -> {
                    executionContext.setActiveJobId(num);
                });
                executionContext.start(sparkListenerSQLExecutionStart);
                return null;
            });
        });
    }

    private void sparkSQLExecEnd(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        log.debug("sparkSQLExecEnd with activeJobId {}", this.activeJobId);
        ExecutionContext remove = sparkSqlExecutionRegistry.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        meterRegistry.counter("openlineage.spark.event.sql.end", new String[0]).increment();
        if (remove != null) {
            circuitBreaker.run(() -> {
                this.activeJobId.ifPresent(num -> {
                    remove.setActiveJobId(num);
                });
                remove.end(sparkListenerSQLExecutionEnd);
                return null;
            });
        } else {
            contextFactory.createSparkSQLExecutionContext(sparkListenerSQLExecutionEnd).ifPresent(executionContext -> {
                circuitBreaker.run(() -> {
                    this.activeJobId.ifPresent(num -> {
                        executionContext.setActiveJobId(num);
                    });
                    executionContext.end(sparkListenerSQLExecutionEnd);
                    return null;
                });
            });
        }
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        if (this.isDisabled) {
            return;
        }
        this.activeJobId = Optional.of(Integer.valueOf(sparkListenerJobStart.jobId()));
        log.debug("onJobStart called {}", sparkListenerJobStart);
        initializeContextFactoryIfNotInitialized();
        meterRegistry.counter("openlineage.spark.event.job.start", new String[0]).increment();
        Optional flatMap = ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(sparkContextFromSession).orElse(activeSparkContext)).flatMap(sparkContext -> {
            return Optional.ofNullable(sparkContext.dagScheduler()).map(dAGScheduler -> {
                return dAGScheduler.jobIdToActiveJob().get(Integer.valueOf(sparkListenerJobStart.jobId()));
            });
        }).flatMap(ScalaConversionUtils::asJavaOptional);
        Stream stream = ScalaConversionUtils.fromSeq(sparkListenerJobStart.stageIds()).stream();
        Class<Integer> cls = Integer.class;
        Integer.class.getClass();
        Set<Integer> set = (Set) stream.map(cls::cast).collect(Collectors.toSet());
        if (sparkVersion.startsWith("3")) {
            jobMetrics.addJobStages(sparkListenerJobStart.jobId(), set);
        }
        ((Optional) ((Optional) Optional.ofNullable(getSqlExecutionId(sparkListenerJobStart.properties())).map((v0) -> {
            return Optional.of(v0);
        }).orElseGet(() -> {
            return ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(sparkContextFromSession).orElse(activeSparkContext)).flatMap(sparkContext2 -> {
                return Optional.ofNullable(sparkContext2.dagScheduler()).map(dAGScheduler -> {
                    return dAGScheduler.jobIdToActiveJob().get(Integer.valueOf(sparkListenerJobStart.jobId()));
                }).flatMap(ScalaConversionUtils::asJavaOptional);
            }).map(activeJob -> {
                return getSqlExecutionId(activeJob.properties());
            });
        })).map(Long::parseLong).map(l -> {
            return getExecutionContext(sparkListenerJobStart.jobId(), l.longValue());
        }).orElseGet(() -> {
            return getExecutionContext(sparkListenerJobStart.jobId());
        })).ifPresent(executionContext -> {
            executionContext.getClass();
            flatMap.ifPresent(executionContext::setActiveJob);
            circuitBreaker.run(() -> {
                executionContext.start(sparkListenerJobStart);
                return null;
            });
        });
    }

    private String getSqlExecutionId(Properties properties) {
        return properties.getProperty("spark.sql.execution.id");
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        if (this.isDisabled) {
            return;
        }
        ExecutionContext remove = rddExecutionRegistry.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        meterRegistry.counter("openlineage.spark.event.job.end", new String[0]).increment();
        circuitBreaker.run(() -> {
            if (remove == null) {
                return null;
            }
            remove.end(sparkListenerJobEnd);
            return null;
        });
        if (sparkVersion.startsWith("3")) {
            jobMetrics.cleanUp(sparkListenerJobEnd.jobId());
        }
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        if (this.isDisabled || sparkVersion.startsWith("2")) {
            return;
        }
        log.debug("onTaskEnd {}", sparkListenerTaskEnd);
        jobMetrics.addMetrics(sparkListenerTaskEnd.stageId(), sparkListenerTaskEnd.taskMetrics());
    }

    public static ExecutionContext getSparkApplicationExecutionContext() {
        return contextFactory.createSparkApplicationExecutionContext((SparkContext) ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(sparkContextFromSession).orElse(activeSparkContext)).orElse(null));
    }

    public static Optional<ExecutionContext> getSparkSQLExecutionContext(long j) {
        return Optional.ofNullable(sparkSqlExecutionRegistry.computeIfAbsent(Long.valueOf(j), l -> {
            return contextFactory.createSparkSQLExecutionContext(j).orElse(null);
        }));
    }

    public static Optional<ExecutionContext> getExecutionContext(int i) {
        return Optional.ofNullable(rddExecutionRegistry.computeIfAbsent(Integer.valueOf(i), num -> {
            return contextFactory.createRddExecutionContext(i);
        }));
    }

    public static Optional<ExecutionContext> getExecutionContext(int i, long j) {
        Optional<ExecutionContext> sparkSQLExecutionContext = getSparkSQLExecutionContext(j);
        sparkSQLExecutionContext.ifPresent(executionContext -> {
            rddExecutionRegistry.put(Integer.valueOf(i), executionContext);
        });
        return sparkSQLExecutionContext;
    }

    public static Configuration getConfigForRDD(RDD<?> rdd) {
        return outputs.get(rdd);
    }

    private static void clear() {
        sparkSqlExecutionRegistry.clear();
        rddExecutionRegistry.clear();
        outputs.clear();
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        if (this.isDisabled) {
            return;
        }
        meterRegistry.counter("openlineage.spark.event.app.end", new String[0]).increment();
        meterRegistry.counter("openlineage.spark.event.app.end.memoryusage", new String[0]).increment(RuntimeUtils.getMemoryFractionUsage());
        circuitBreaker.run(() -> {
            getSparkApplicationExecutionContext().end(sparkListenerApplicationEnd);
            return null;
        });
        close();
        super.onApplicationEnd(sparkListenerApplicationEnd);
    }

    public static void close() {
        clear();
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        if (this.isDisabled) {
            return;
        }
        initializeContextFactoryIfNotInitialized(sparkListenerApplicationStart.appName());
        meterRegistry.counter("openlineage.spark.event.app.start", new String[0]).increment();
        meterRegistry.counter("openlineage.spark.event.app.start.memoryusage", new String[0]).increment(RuntimeUtils.getMemoryFractionUsage());
        circuitBreaker.run(() -> {
            getSparkApplicationExecutionContext().start(sparkListenerApplicationStart);
            return null;
        });
    }

    private void initializeContextFactoryIfNotInitialized() {
        if (contextFactory != null) {
            return;
        }
        ScalaConversionUtils.asJavaOptional((Option) activeSparkContext.apply()).ifPresent(sparkContext -> {
            initializeContextFactoryIfNotInitialized(sparkContext.appName());
        });
    }

    private void initializeContextFactoryIfNotInitialized(String str) {
        if (contextFactory != null) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv == null) {
            log.warn("OpenLineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
        } else {
            initializeContextFactoryIfNotInitialized(sparkEnv.conf(), str);
        }
    }

    private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String str) {
        if (contextFactory != null) {
            return;
        }
        try {
            SparkOpenLineageConfig parse = ArgumentParser.parse(sparkConf);
            initializeMetrics(parse);
            contextFactory = new ContextFactory(new EventEmitter(parse, str), meterRegistry, parse);
            circuitBreaker = new CircuitBreakerFactory(parse.getCircuitBreaker()).build();
        } catch (URISyntaxException e) {
            log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e);
        }
    }

    private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
        meterRegistry = MicrometerProvider.addMeterRegistryFromConfig(openLineageConfig.getMetricsConfig());
        String join = (openLineageConfig.getFacetsConfig() == null || openLineageConfig.getFacetsConfig().getDisabledFacets() == null) ? "" : String.join(ArgumentParser.DISABLED_FACETS_SEPARATOR, openLineageConfig.getFacetsConfig().getDisabledFacets());
        meterRegistry.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", join)));
        String str = join;
        ((CompositeMeterRegistry) meterRegistry).getRegistries().forEach(meterRegistry2 -> {
            meterRegistry2.config().commonTags(Tags.of(Tag.of("openlineage.spark.integration.version", Versions.getVersion()), Tag.of("openlineage.spark.version", sparkVersion), Tag.of("openlineage.spark.disabled.facets", str)));
        });
    }

    private static boolean checkIfDisabled() {
        return Boolean.parseBoolean(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED"));
    }

    static {
        SparkContext$ sparkContext$ = SparkContext$.MODULE$;
        sparkContext$.getClass();
        activeSparkContext = ScalaConversionUtils.toScalaFn(sparkContext$::getActive);
        circuitBreaker = new NoOpCircuitBreaker();
        sparkVersion = package$.MODULE$.SPARK_VERSION();
    }
}
