package org.apache.beam.runners.fnexecution.control;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.class */
public class ReferenceCountingExecutableStageContextFactory implements ExecutableStageContext.Factory {
    private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountingExecutableStageContextFactory.class);
    private static final int MAX_RETRY = 3;
    private final Creator creator;
    private volatile transient ScheduledExecutorService executor;
    private volatile transient ConcurrentHashMap<String, WrappedContext> keyRegistry;
    private final SerializableFunction<Object, Boolean> isReleaseSynchronous;

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory$Creator.class */
    public interface Creator extends ThrowingFunction<JobInfo, ExecutableStageContext>, Serializable {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory$WrappedContext.class */
    public class WrappedContext implements ExecutableStageContext {
        private JobInfo jobInfo;
        private AtomicInteger referenceCount = new AtomicInteger(0);

        @VisibleForTesting
        ExecutableStageContext context;

        WrappedContext(JobInfo jobInfo, ExecutableStageContext executableStageContext) {
            this.jobInfo = jobInfo;
            this.context = executableStageContext;
        }

        @Override // org.apache.beam.runners.fnexecution.control.ExecutableStageContext
        public StageBundleFactory getStageBundleFactory(ExecutableStage executableStage) {
            return this.context.getStageBundleFactory(executableStage);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            ReferenceCountingExecutableStageContextFactory.this.scheduleRelease(this.jobInfo);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeActual() throws Exception {
            this.context.close();
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.jobInfo.jobId(), ((WrappedContext) obj).jobInfo.jobId());
        }

        @Pure
        public int hashCode() {
            return Objects.hash(this.jobInfo);
        }

        @SideEffectFree
        public String toString() {
            return "ContextWrapper{jobId='" + this.jobInfo + "', referenceCount=" + this.referenceCount + '}';
        }
    }

    public static ReferenceCountingExecutableStageContextFactory create(Creator creator, SerializableFunction<Object, Boolean> serializableFunction) {
        return new ReferenceCountingExecutableStageContextFactory(creator, serializableFunction);
    }

    private ReferenceCountingExecutableStageContextFactory(Creator creator, SerializableFunction<Object, Boolean> serializableFunction) {
        this.creator = creator;
        this.isReleaseSynchronous = serializableFunction;
    }

    @Override // org.apache.beam.runners.fnexecution.control.ExecutableStageContext.Factory
    public ExecutableStageContext get(JobInfo jobInfo) {
        for (int i = 0; i < MAX_RETRY; i++) {
            WrappedContext computeIfAbsent = getCache().computeIfAbsent(jobInfo.jobId(), str -> {
                try {
                    return new WrappedContext(jobInfo, (ExecutableStageContext) this.creator.apply(jobInfo));
                } catch (Exception e) {
                    throw new RuntimeException("Unable to create context for job " + jobInfo.jobId(), e);
                }
            });
            synchronized (computeIfAbsent) {
                if (computeIfAbsent.referenceCount != null) {
                    computeIfAbsent.referenceCount.incrementAndGet();
                    return computeIfAbsent;
                }
            }
        }
        throw new RuntimeException(String.format("Max retry %s exhausted while creating Context for job %s", Integer.valueOf(MAX_RETRY), jobInfo.jobId()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleRelease(JobInfo jobInfo) {
        WrappedContext wrappedContext = getCache().get(jobInfo.jobId());
        Preconditions.checkState(wrappedContext != null, "Releasing context for unknown job: " + jobInfo.jobId());
        int environmentCacheMillis = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
        if (environmentCacheMillis <= 0) {
            release(wrappedContext);
        } else if (((Boolean) this.isReleaseSynchronous.apply(this)).booleanValue()) {
            release(wrappedContext);
        } else {
            getExecutor().schedule(() -> {
                release(wrappedContext);
            }, environmentCacheMillis, TimeUnit.MILLISECONDS);
        }
    }

    private ConcurrentHashMap<String, WrappedContext> getCache() {
        ConcurrentHashMap<String, WrappedContext> concurrentHashMap;
        if (this.keyRegistry != null) {
            return this.keyRegistry;
        }
        synchronized (this) {
            if (this.keyRegistry == null) {
                this.keyRegistry = new ConcurrentHashMap<>();
            }
            concurrentHashMap = this.keyRegistry;
        }
        return concurrentHashMap;
    }

    private ScheduledExecutorService getExecutor() {
        ScheduledExecutorService scheduledExecutorService;
        if (this.executor != null) {
            return this.executor;
        }
        synchronized (this) {
            if (this.executor == null) {
                this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ScheduledExecutor-thread").setDaemon(true).build());
            }
            scheduledExecutorService = this.executor;
        }
        return scheduledExecutorService;
    }

    @VisibleForTesting
    void release(ExecutableStageContext executableStageContext) {
        WrappedContext wrappedContext = (WrappedContext) executableStageContext;
        synchronized (wrappedContext) {
            if (wrappedContext.referenceCount.decrementAndGet() == 0) {
                wrappedContext.referenceCount = null;
                if (getCache().remove(wrappedContext.jobInfo.jobId(), wrappedContext)) {
                    try {
                        wrappedContext.closeActual();
                    } catch (Throwable th) {
                        LOG.error("Unable to close ExecutableStageContext.", th);
                    }
                }
            }
        }
    }
}
