package org.apache.beam.runners.dataflow;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/dataflow/DataflowPipelineJob.class */
public class DataflowPipelineJob implements PipelineResult {
    private final String jobId;
    private final DataflowPipelineOptions dataflowOptions;
    private final DataflowClient dataflowClient;
    private final DataflowMetrics dataflowMetrics;
    private PipelineResult.State terminalState;
    private DataflowPipelineJob replacedByJob;
    private final BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames;
    private long lastTimestamp;
    private String latestStateString;
    private final RunnerApi.Pipeline pipelineProto;
    private AtomicReference<FutureTask<PipelineResult.State>> cancelState;
    private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineJob.class);
    static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2);
    static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2);
    static final Duration DEFAULT_MAX_BACKOFF = Duration.standardMinutes(2);
    static final int MESSAGES_POLLING_RETRIES = 11;
    static final double DEFAULT_BACKOFF_EXPONENT = 1.5d;
    private static final FluentBackoff MESSAGES_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(MESSAGES_POLLING_INTERVAL).withMaxRetries(MESSAGES_POLLING_RETRIES).withExponent(DEFAULT_BACKOFF_EXPONENT).withMaxBackoff(DEFAULT_MAX_BACKOFF);
    static final int STATUS_POLLING_RETRIES = 4;
    protected static final FluentBackoff STATUS_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(STATUS_POLLING_INTERVAL).withMaxRetries(STATUS_POLLING_RETRIES).withExponent(DEFAULT_BACKOFF_EXPONENT);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.runners.dataflow.DataflowPipelineJob$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/DataflowPipelineJob$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$PipelineResult$State = new int[PipelineResult.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.DONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.CANCELLED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$PipelineResult$State[PipelineResult.State.UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public DataflowPipelineJob(DataflowClient dataflowClient, String str, DataflowPipelineOptions dataflowPipelineOptions, Map<AppliedPTransform<?, ?, ?>, String> map, RunnerApi.Pipeline pipeline) {
        this.terminalState = null;
        this.replacedByJob = null;
        this.lastTimestamp = Long.MIN_VALUE;
        this.cancelState = new AtomicReference<>();
        this.dataflowClient = dataflowClient;
        this.jobId = str;
        this.dataflowOptions = dataflowPipelineOptions;
        this.transformStepNames = HashBiMap.create((Map) MoreObjects.firstNonNull(map, ImmutableMap.of()));
        this.dataflowMetrics = new DataflowMetrics(this, this.dataflowClient);
        this.pipelineProto = pipeline;
    }

    public DataflowPipelineJob(DataflowClient dataflowClient, String str, DataflowPipelineOptions dataflowPipelineOptions, Map<AppliedPTransform<?, ?, ?>, String> map) {
        this(dataflowClient, str, dataflowPipelineOptions, map, null);
    }

    public String getJobId() {
        return this.jobId;
    }

    public String getProjectId() {
        return this.dataflowOptions.getProject();
    }

    public DataflowPipelineOptions getDataflowOptions() {
        return this.dataflowOptions;
    }

    public RunnerApi.Pipeline getPipelineProto() {
        return this.pipelineProto;
    }

    public String getRegion() {
        return this.dataflowOptions.getRegion();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BiMap<AppliedPTransform<?, ?, ?>, String> getTransformStepNames() {
        return this.transformStepNames;
    }

    public DataflowPipelineJob getReplacedByJob() {
        if (this.terminalState == null) {
            throw new IllegalStateException("getReplacedByJob() called before job terminated");
        }
        if (this.replacedByJob == null) {
            throw new IllegalStateException("getReplacedByJob() called for job that was not replaced");
        }
        return this.replacedByJob;
    }

    public PipelineResult.State waitUntilFinish() {
        return waitUntilFinish(Duration.millis(-1L));
    }

    public PipelineResult.State waitUntilFinish(Duration duration) {
        try {
            return waitUntilFinish(duration, new MonitoringUtil.LoggingHandler());
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            if (e instanceof RuntimeException) {
                throw ((RuntimeException) e);
            }
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public PipelineResult.State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler jobMessagesHandler) throws IOException, InterruptedException {
        Thread thread = new Thread(() -> {
            LOG.warn("Job is already running in Google Cloud Platform, Ctrl-C will not cancel it.\nTo cancel the job in the cloud, run:\n> {}", MonitoringUtil.getGcloudCancelCommand(this.dataflowOptions, getJobId()));
        });
        try {
            Runtime.getRuntime().addShutdownHook(thread);
            PipelineResult.State waitUntilFinish = waitUntilFinish(duration, jobMessagesHandler, Sleeper.DEFAULT, NanoClock.SYSTEM, new MonitoringUtil(this.dataflowClient));
            Runtime.getRuntime().removeShutdownHook(thread);
            return waitUntilFinish;
        } catch (Throwable th) {
            Runtime.getRuntime().removeShutdownHook(thread);
            throw th;
        }
    }

    @VisibleForTesting
    PipelineResult.State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler jobMessagesHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException {
        return waitUntilFinish(duration, jobMessagesHandler, sleeper, nanoClock, new MonitoringUtil(this.dataflowClient));
    }

    private static BackOff getMessagesBackoff(Duration duration) {
        FluentBackoff fluentBackoff = MESSAGES_BACKOFF_FACTORY;
        if (!duration.isShorterThan(Duration.ZERO)) {
            fluentBackoff = fluentBackoff.withMaxCumulativeBackoff(duration);
        }
        return BackOffAdapter.toGcpBackOff(fluentBackoff.backoff());
    }

    @VisibleForTesting
    PipelineResult.State waitUntilFinish(Duration duration, MonitoringUtil.JobMessagesHandler jobMessagesHandler, Sleeper sleeper, NanoClock nanoClock, MonitoringUtil monitoringUtil) throws IOException, InterruptedException {
        Exception exc;
        BackOff messagesBackoff = getMessagesBackoff(duration);
        long nanoTime = nanoClock.nanoTime();
        PipelineResult.State state = PipelineResult.State.UNKNOWN;
        do {
            try {
                state = getStateWithRetries(BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff()), sleeper);
                exc = processJobMessages(jobMessagesHandler, monitoringUtil);
                if (exc == null) {
                    if (state.isTerminal()) {
                        logTerminalState(state);
                        return state;
                    }
                    messagesBackoff = resetBackoff(duration, nanoClock, nanoTime);
                }
            } catch (IOException e) {
                exc = e;
                LOG.warn("Failed to get job state: {}", e.getMessage());
                LOG.debug("Failed to get job state.", e);
            }
        } while (BackOffUtils.next(sleeper, messagesBackoff));
        if (exc == null) {
            LOG.warn("No terminal state was returned within allotted timeout. State value {}", state);
            return null;
        }
        LOG.error("Failed to fetch job metadata.", exc);
        return null;
    }

    private void logTerminalState(PipelineResult.State state) {
        switch (AnonymousClass1.$SwitchMap$org$apache$beam$sdk$PipelineResult$State[state.ordinal()]) {
            case 1:
            case IsmFormat.Footer.VERSION /* 2 */:
                LOG.info("Job {} finished with status {}.", getJobId(), state);
                return;
            case 3:
                LOG.info("Job {} has been updated and is running as the new job with id {}. To access the updated job on the Dataflow monitoring console, please navigate to {}", new Object[]{getJobId(), getReplacedByJob().getJobId(), MonitoringUtil.getJobMonitoringPageURL(getReplacedByJob().getProjectId(), getRegion(), getReplacedByJob().getJobId())});
                return;
            default:
                LOG.info("Job {} failed with status {}.", getJobId(), state);
                return;
        }
    }

    private static BackOff resetBackoff(Duration duration, NanoClock nanoClock, long j) {
        BackOff messagesBackoff;
        if (duration.isLongerThan(Duration.ZERO)) {
            Duration minus = duration.minus(Duration.millis(((nanoClock.nanoTime() - j) + 999999) / 1000000));
            messagesBackoff = minus.isLongerThan(Duration.ZERO) ? getMessagesBackoff(minus) : BackOff.STOP_BACKOFF;
        } else {
            messagesBackoff = getMessagesBackoff(duration);
        }
        return messagesBackoff;
    }

    private Exception processJobMessages(MonitoringUtil.JobMessagesHandler jobMessagesHandler, MonitoringUtil monitoringUtil) throws IOException {
        if (jobMessagesHandler == null) {
            return null;
        }
        try {
            List<JobMessage> jobMessages = monitoringUtil.getJobMessages(getJobId(), this.lastTimestamp);
            if (!jobMessages.isEmpty()) {
                this.lastTimestamp = TimeUtil.fromCloudTime(jobMessages.get(jobMessages.size() - 1).getTime()).getMillis();
                jobMessagesHandler.process(jobMessages);
            }
            return null;
        } catch (GoogleJsonResponseException | SocketTimeoutException e) {
            LOG.warn("Failed to get job messages: {}", e.getMessage());
            LOG.debug("Failed to get job messages.", e);
            return e;
        }
    }

    public PipelineResult.State cancel() throws IOException {
        if (this.cancelState.compareAndSet(null, new FutureTask<>(() -> {
            Job job = new Job();
            job.setProjectId(getProjectId());
            String jobId = getJobId();
            job.setId(jobId);
            job.setRequestedState("JOB_STATE_CANCELLED");
            try {
                return MonitoringUtil.toState(this.dataflowClient.updateJob(jobId, job).getCurrentState());
            } catch (IOException e) {
                PipelineResult.State state = getState();
                if (state.isTerminal()) {
                    LOG.warn("Cancel failed because job is already terminated. State is {}", state);
                    return state;
                }
                if (e.getMessage().contains("has terminated")) {
                    LOG.warn("Cancel failed because job is already terminated.", e);
                    return state;
                }
                String format = String.format("Failed to cancel job in state %s, please go to the Developers Console to cancel it manually: %s", state, MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getRegion(), getJobId()));
                LOG.warn(format);
                throw new IOException(format, e);
            }
        }))) {
            this.cancelState.get().run();
        }
        try {
            return this.cancelState.get().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    public PipelineResult.State getState() {
        return this.terminalState != null ? this.terminalState : getStateWithRetriesOrUnknownOnException(BackOffAdapter.toGcpBackOff(STATUS_BACKOFF_FACTORY.backoff()), Sleeper.DEFAULT);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLatestStateString() {
        return this.latestStateString;
    }

    @VisibleForTesting
    PipelineResult.State getStateWithRetriesOrUnknownOnException(BackOff backOff, Sleeper sleeper) {
        try {
            return getStateWithRetries(backOff, sleeper);
        } catch (IOException e) {
            return PipelineResult.State.UNKNOWN;
        }
    }

    PipelineResult.State getStateWithRetries(BackOff backOff, Sleeper sleeper) throws IOException {
        if (this.terminalState != null) {
            return this.terminalState;
        }
        this.latestStateString = getJobWithRetries(backOff, sleeper).getCurrentState();
        return MonitoringUtil.toState(this.latestStateString);
    }

    private Job getJobWithRetries(BackOff backOff, Sleeper sleeper) throws IOException {
        do {
            try {
                Job job = this.dataflowClient.getJob(getJobId());
                PipelineResult.State state = MonitoringUtil.toState(job.getCurrentState());
                if (state.isTerminal()) {
                    this.terminalState = state;
                    this.replacedByJob = new DataflowPipelineJob(this.dataflowClient, job.getReplacedByJobId(), this.dataflowOptions, this.transformStepNames, this.pipelineProto);
                }
                return job;
            } catch (IOException e) {
                LOG.warn("There were problems getting current job status: {}.", e.getMessage());
                LOG.debug("Exception information:", e);
            }
        } while (nextBackOff(sleeper, backOff));
        throw e;
    }

    private boolean nextBackOff(Sleeper sleeper, BackOff backOff) {
        try {
            return BackOffUtils.next(sleeper, backOff);
        } catch (IOException | InterruptedException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(e);
        }
    }

    public MetricResults metrics() {
        return this.dataflowMetrics;
    }
}
