package org.apache.beam.runners.dataflow.util;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Seconds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/dataflow/util/PackageUtil.class */
public class PackageUtil implements Closeable {
    private static final int SANE_CLASSPATH_SIZE = 1000;
    private static final int DEFAULT_THREAD_POOL_SIZE = 32;
    private final ExecutorService executorService;
    private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
    private static final Sleeper DEFAULT_SLEEPER = Sleeper.DEFAULT;
    private static final CreateOptions DEFAULT_CREATE_OPTIONS = GcsCreateOptions.builder().setGcsUploadBufferSizeBytes(1048576).setMimeType("application/octet-stream").build();
    private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5));
    private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/dataflow/util/PackageUtil$PackageAttributes.class */
    public static abstract class PackageAttributes {
        public static PackageAttributes forFileToStage(String str, String str2, String str3, String str4) throws IOException {
            String str5;
            File file = new File(str);
            if (!file.exists()) {
                throw new FileNotFoundException(String.format("Non-existent file to stage: %s", file.getAbsolutePath()));
            }
            Preconditions.checkState(!file.isDirectory(), "Source file must not be a directory.");
            boolean z = -1;
            switch (str3.hashCode()) {
                case 28176576:
                    if (str3.equals("dataflow-worker.jar")) {
                        z = false;
                        break;
                    }
                    break;
                case 957523188:
                    if (str3.equals("windmill_main")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                case true:
                    str5 = Environments.createStagingFileName(file, Files.asByteSource(file).hash(Hashing.sha256()));
                    PackageUtil.LOG.info("Staging custom {} as {}", str3, str5);
                    break;
                default:
                    str5 = str3;
                    break;
            }
            DataflowPackage dataflowPackage = new DataflowPackage();
            dataflowPackage.setLocation(FileSystems.matchNewResource(str4, true).resolve(str5, ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
            dataflowPackage.setName(str3);
            return new AutoValue_PackageUtil_PackageAttributes(file, null, dataflowPackage, file.length(), str2);
        }

        public static PackageAttributes forBytesToStage(byte[] bArr, String str, String str2) {
            HashCode hash = Hashing.sha256().newHasher().putBytes(bArr).hash();
            long length = bArr.length;
            String createStagingFileName = Environments.createStagingFileName(new File(str), hash);
            String resourceId = FileSystems.matchNewResource(str2, true).resolve(createStagingFileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
            DataflowPackage dataflowPackage = new DataflowPackage();
            dataflowPackage.setName(createStagingFileName);
            dataflowPackage.setLocation(resourceId);
            return new AutoValue_PackageUtil_PackageAttributes(null, bArr, dataflowPackage, length, hash.toString());
        }

        public PackageAttributes withPackageName(String str) {
            DataflowPackage dataflowPackage = new DataflowPackage();
            dataflowPackage.setName(str);
            dataflowPackage.setLocation(getDestination().getLocation());
            return new AutoValue_PackageUtil_PackageAttributes(getSource(), getBytes(), dataflowPackage, getSize(), getHash());
        }

        public abstract File getSource();

        public abstract byte[] getBytes();

        public abstract DataflowPackage getDestination();

        public abstract long getSize();

        public abstract String getHash();

        public String getSourceDescription() {
            return getSource() != null ? getSource().toString() : String.format("<%s bytes, hash %s>", Long.valueOf(getSize()), getHash());
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/dataflow/util/PackageUtil$StagedFile.class */
    public static abstract class StagedFile {
        public static StagedFile of(String str, String str2, String str3) {
            return new AutoValue_PackageUtil_StagedFile(str, str2, str3);
        }

        public abstract String getSource();

        public abstract String getSha256();

        public abstract String getDestination();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/dataflow/util/PackageUtil$StagingResult.class */
    public static abstract class StagingResult {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract PackageAttributes getPackageAttributes();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean alreadyStaged();

        public static StagingResult cached(PackageAttributes packageAttributes) {
            return new AutoValue_PackageUtil_StagingResult(packageAttributes, true);
        }

        public static StagingResult uploaded(PackageAttributes packageAttributes) {
            return new AutoValue_PackageUtil_StagingResult(packageAttributes, false);
        }
    }

    private PackageUtil(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public static PackageUtil withDefaultThreadPool() {
        return withExecutorService(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, MoreExecutors.platformThreadFactory())));
    }

    public static PackageUtil withExecutorService(ExecutorService executorService) {
        return new PackageUtil(executorService);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdown();
    }

    private CompletionStage<PackageAttributes> computePackageAttributes(String str, String str2, String str3, String str4) {
        return MoreFutures.supplyAsync(() -> {
            return PackageAttributes.forFileToStage(str, str2, str3, str4);
        }, this.executorService);
    }

    private boolean alreadyStaged(PackageAttributes packageAttributes) throws IOException {
        try {
            return FileSystems.matchSingleFileSpec(packageAttributes.getDestination().getLocation()).sizeBytes() == packageAttributes.getSize();
        } catch (FileNotFoundException e) {
            return false;
        }
    }

    public CompletionStage<StagingResult> stagePackage(PackageAttributes packageAttributes, Sleeper sleeper, CreateOptions createOptions) {
        return MoreFutures.supplyAsync(() -> {
            return stagePackageSynchronously(packageAttributes, sleeper, createOptions);
        }, this.executorService);
    }

    private StagingResult stagePackageSynchronously(PackageAttributes packageAttributes, Sleeper sleeper, CreateOptions createOptions) throws IOException, InterruptedException {
        String sourceDescription = packageAttributes.getSourceDescription();
        String location = packageAttributes.getDestination().getLocation();
        try {
            return tryStagePackageWithRetry(packageAttributes, sleeper, createOptions);
        } catch (Exception e) {
            throw new RuntimeException(String.format("Could not stage %s to %s", sourceDescription, location), e);
        }
    }

    private StagingResult tryStagePackageWithRetry(PackageAttributes packageAttributes, Sleeper sleeper, CreateOptions createOptions) throws IOException, InterruptedException {
        String sourceDescription = packageAttributes.getSourceDescription();
        String location = packageAttributes.getDestination().getLocation();
        BackOff gcpBackOff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
        while (!alreadyStaged(packageAttributes)) {
            try {
                return tryStagePackage(packageAttributes, createOptions);
            } catch (IOException e) {
                if (ERROR_EXTRACTOR.accessDenied(e)) {
                    String format = String.format("Uploaded failed due to permissions error, will NOT retry staging of %s. Please verify credentials are valid and that you have write access to %s. Stale credentials can be resolved by executing 'gcloud auth application-default login'.", sourceDescription, location);
                    LOG.error(format);
                    throw new IOException(format, e);
                }
                long nextBackOffMillis = gcpBackOff.nextBackOffMillis();
                if (nextBackOffMillis == -1) {
                    LOG.error("Upload failed, will NOT retry staging of package: {}", sourceDescription, e);
                    throw new RuntimeException(String.format("Could not stage %s to %s", sourceDescription, location), e);
                }
                LOG.warn("Upload attempt failed, sleeping before retrying staging of package: {}", sourceDescription, e);
                sleeper.sleep(nextBackOffMillis);
            }
        }
        LOG.debug("Skipping file already staged: {} at {}", sourceDescription, location);
        return StagingResult.cached(packageAttributes);
    }

    private StagingResult tryStagePackage(PackageAttributes packageAttributes, CreateOptions createOptions) throws IOException, InterruptedException {
        String sourceDescription = packageAttributes.getSourceDescription();
        String location = packageAttributes.getDestination().getLocation();
        LOG.info("Uploading {} to {}", sourceDescription, location);
        WritableByteChannel create = FileSystems.create(FileSystems.matchNewResource(location, false), createOptions);
        Throwable th = null;
        try {
            if (packageAttributes.getBytes() != null) {
                ByteSource.wrap(packageAttributes.getBytes()).copyTo(Channels.newOutputStream(create));
            } else {
                File source = packageAttributes.getSource();
                Preconditions.checkState(source != null, "Internal inconsistency: we tried to stage something to %s, but neither a source file nor the byte content was specified", location);
                Files.asByteSource(source).copyTo(Channels.newOutputStream(create));
            }
            return StagingResult.uploaded(packageAttributes);
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<DataflowPackage> stageClasspathElements(Collection<StagedFile> collection, String str, CreateOptions createOptions) {
        return stageClasspathElements(collection, str, DEFAULT_SLEEPER, createOptions);
    }

    List<DataflowPackage> stageClasspathElements(Collection<StagedFile> collection, String str) {
        return stageClasspathElements(collection, str, DEFAULT_SLEEPER, DEFAULT_CREATE_OPTIONS);
    }

    public DataflowPackage stageToFile(byte[] bArr, String str, String str2, CreateOptions createOptions) {
        try {
            return ((StagingResult) MoreFutures.get(stagePackage(PackageAttributes.forBytesToStage(bArr, str, str2), DEFAULT_SLEEPER, createOptions))).getPackageAttributes().getDestination();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while staging pipeline", e);
        } catch (ExecutionException e2) {
            throw new RuntimeException("Error while staging pipeline", e2.getCause());
        }
    }

    List<DataflowPackage> stageClasspathElements(Collection<StagedFile> collection, String str, Sleeper sleeper, CreateOptions createOptions) {
        LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to prepare for execution.", Integer.valueOf(collection.size()));
        Instant now = Instant.now();
        if (collection.size() > SANE_CLASSPATH_SIZE) {
            LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically copies to all workers. Having this many entries on your classpath may be indicative of an issue in your pipeline. You may want to consider trimming the classpath to necessary dependencies only, using --filesToStage pipeline option to override what files are being staged, or bundling several dependencies into one.", Integer.valueOf(collection.size()));
        }
        Preconditions.checkArgument(str != null, "Can't stage classpath elements because no staging location has been provided");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ArrayList arrayList = new ArrayList();
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        for (StagedFile stagedFile : collection) {
            String destination = stagedFile.getDestination();
            String source = stagedFile.getSource();
            String sha256 = stagedFile.getSha256();
            File file = new File(source);
            if (file.exists()) {
                arrayList.add(computePackageAttributes(source, sha256, destination, str).thenComposeAsync(packageAttributes -> {
                    String location = packageAttributes.getDestination().getLocation();
                    if (newConcurrentHashSet.add(location)) {
                        return stagePackage(packageAttributes, sleeper, createOptions);
                    }
                    LOG.debug("Upload of {} skipped because it was already queued", location);
                    return CompletableFuture.completedFuture(StagingResult.cached(packageAttributes));
                }).thenApply(stagingResult -> {
                    if (stagingResult.alreadyStaged()) {
                        atomicInteger2.incrementAndGet();
                    } else {
                        atomicInteger.incrementAndGet();
                    }
                    return stagingResult.getPackageAttributes().getDestination();
                }));
            } else {
                LOG.warn("Skipping non-existent file to stage {}.", file);
            }
        }
        try {
            CompletionStage allAsList = MoreFutures.allAsList(arrayList);
            boolean z = false;
            do {
                try {
                    MoreFutures.get(allAsList, 3L, TimeUnit.MINUTES);
                    z = true;
                } catch (TimeoutException e) {
                    LOG.info("Still staging {} files", Integer.valueOf(collection.size()));
                }
            } while (!z);
            List<DataflowPackage> list = (List) MoreFutures.get(allAsList);
            LOG.info("Staging files complete: {} files cached, {} files newly uploaded in {} seconds", new Object[]{Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger.get()), Integer.valueOf(Seconds.secondsBetween(now, Instant.now()).getSeconds())});
            return list;
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while staging packages", e2);
        } catch (ExecutionException e3) {
            throw new RuntimeException("Error while staging packages", e3.getCause());
        }
    }
}
