package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieClusteringJob.class */
public class HoodieClusteringJob {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieClusteringJob.class);
    private final Config cfg;
    private final TypedProperties props;
    private final JavaSparkContext jsc;
    private HoodieTableMetaClient metaClient;

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieClusteringJob$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
        public String basePath = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only used when set --mode execute. If the instant time is not provided with --mode execute, the earliest scheduled clustering instant time is used by default. When set \"--mode scheduleAndExecute\" this instant-time will be ignored.")
        public String clusteringInstantTime = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert")
        public int parallelism = 1;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
        public String sparkMemory = null;

        @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
        public int retry = 0;

        @Parameter(names = {"--skip-clean", "-sc"}, description = "do not trigger clean after clustering", required = false)
        public Boolean skipClean = true;

        @Parameter(names = {"--schedule", "-sch"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
        public Boolean runSchedule = false;

        @Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.")
        public Boolean retryLastFailedClusteringJob = false;

        @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately")
        public String runningMode = null;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--job-max-processing-time-ms", "-jt"}, description = "Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.")
        public long maxProcessingTimeMs = 0;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for clustering")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        public String toString() {
            return "HoodieClusteringJobConfig{\n   --base-path " + this.basePath + ", \n   --table-name " + this.tableName + ", \n   --instant-time " + this.clusteringInstantTime + ", \n   --parallelism " + this.parallelism + ", \n   --spark-master " + this.sparkMaster + ", \n   --spark-memory " + this.sparkMemory + ", \n   --retry " + this.retry + ", \n   --skipClean " + this.skipClean + ", \n   --schedule " + this.runSchedule + ", \n   --retry-last-failed-clustering-job " + this.retryLastFailedClusteringJob + ", \n   --mode " + this.runningMode + ", \n   --job-max-processing-time-ms " + this.maxProcessingTimeMs + ", \n   --props " + this.propsFilePath + ", \n   --hoodie-conf " + this.configs + ", \n\n}";
        }
    }

    public HoodieClusteringJob(JavaSparkContext javaSparkContext, Config config) {
        this(javaSparkContext, config, UtilHelpers.buildProperties(javaSparkContext.hadoopConfiguration(), config.propsFilePath, config.configs), UtilHelpers.createMetaClient(javaSparkContext, config.basePath, true));
    }

    public HoodieClusteringJob(JavaSparkContext javaSparkContext, Config config, TypedProperties typedProperties, HoodieTableMetaClient hoodieTableMetaClient) {
        this.cfg = config;
        this.jsc = javaSparkContext;
        this.props = typedProperties;
        this.metaClient = hoodieTableMetaClient;
        this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
        if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
            UtilHelpers.addLockOptions(config.basePath, this.metaClient.getBasePath().toUri().getScheme(), this.props);
        }
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            throw new HoodieException("Clustering failed for basePath: " + config.basePath);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("clustering-" + config.tableName, config.sparkMaster, config.sparkMemory);
        int cluster = new HoodieClusteringJob(buildSparkContext, config).cluster(config.retry);
        String format = String.format("Clustering with basePath: %s, tableName: %s, runningMode: %s", config.basePath, config.tableName, config.runningMode);
        if (cluster != 0) {
            throw new HoodieException(format + " failed");
        }
        LOG.info(format + " success");
        buildSparkContext.stop();
    }

    private static void validateRunningMode(Config config) {
        if (StringUtils.isNullOrEmpty(config.runningMode)) {
            config.runningMode = config.runSchedule.booleanValue() ? "schedule" : "execute";
        }
    }

    public int cluster(int i) {
        validateRunningMode(this.cfg);
        return UtilHelpers.retry(i, () -> {
            String lowerCase = this.cfg.runningMode.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1319569547:
                    if (lowerCase.equals("execute")) {
                        z = 2;
                        break;
                    }
                    break;
                case -697920873:
                    if (lowerCase.equals("schedule")) {
                        z = false;
                        break;
                    }
                    break;
                case 820813733:
                    if (lowerCase.equals(UtilHelpers.PURGE_PENDING_INSTANT)) {
                        z = 3;
                        break;
                    }
                    break;
                case 1514639253:
                    if (lowerCase.equals("scheduleandexecute")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    LOG.info("Running Mode: [schedule]; Do schedule");
                    Option<String> doSchedule = doSchedule(this.jsc);
                    int i2 = doSchedule.isPresent() ? 0 : -1;
                    if (i2 == 0) {
                        LOG.info("The schedule instant time is " + doSchedule.get());
                    }
                    return Integer.valueOf(i2);
                case true:
                    LOG.info("Running Mode: [scheduleandexecute]");
                    return Integer.valueOf(doScheduleAndCluster(this.jsc));
                case true:
                    LOG.info("Running Mode: [execute]; Do cluster");
                    return Integer.valueOf(doCluster(this.jsc));
                case true:
                    LOG.info("Running Mode: [purge_pending_instant];");
                    return Integer.valueOf(doPurgePendingInstant(this.jsc));
                default:
                    LOG.error("Unsupported running mode [" + this.cfg.runningMode + "], quit the job directly");
                    return -1;
            }
        }, "Cluster failed");
    }

    private int doCluster(JavaSparkContext javaSparkContext) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            if (StringUtils.isNullOrEmpty(this.cfg.clusteringInstantTime)) {
                Option<HoodieInstant> firstPendingClusterInstant = this.metaClient.getActiveTimeline().getFirstPendingClusterInstant();
                if (!firstPendingClusterInstant.isPresent()) {
                    LOG.info("There is no scheduled clustering in the table.");
                    if (createHoodieClient != null) {
                        if (0 != 0) {
                            try {
                                createHoodieClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createHoodieClient.close();
                        }
                    }
                    return 0;
                }
                this.cfg.clusteringInstantTime = firstPendingClusterInstant.get().requestedTime();
                LOG.info("Found the earliest scheduled clustering instant which will be executed: " + this.cfg.clusteringInstantTime);
            }
            Option<HoodieCommitMetadata> commitMetadata = createHoodieClient.cluster(this.cfg.clusteringInstantTime).getCommitMetadata();
            clean(createHoodieClient);
            int handleErrors = UtilHelpers.handleErrors(commitMetadata.get(), this.cfg.clusteringInstantTime);
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            return handleErrors;
        } catch (Throwable th4) {
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            throw th4;
        }
    }

    public Option<String> doSchedule() throws Exception {
        return doSchedule(this.jsc);
    }

    private Option<String> doSchedule(JavaSparkContext javaSparkContext) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            try {
                Option<String> doSchedule = doSchedule(createHoodieClient);
                if (createHoodieClient != null) {
                    if (0 != 0) {
                        try {
                            createHoodieClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createHoodieClient.close();
                    }
                }
                return doSchedule;
            } finally {
            }
        } catch (Throwable th3) {
            if (createHoodieClient != null) {
                if (th != null) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            throw th3;
        }
    }

    private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> sparkRDDWriteClient) {
        if (this.cfg.clusteringInstantTime == null) {
            return sparkRDDWriteClient.scheduleClustering(Option.empty());
        }
        sparkRDDWriteClient.scheduleClusteringAtInstant(this.cfg.clusteringInstantTime, Option.empty());
        return Option.of(this.cfg.clusteringInstantTime);
    }

    private int doScheduleAndCluster(JavaSparkContext javaSparkContext) throws Exception {
        LOG.info("Step 1: Do schedule");
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            Option<String> empty = Option.empty();
            if (this.cfg.retryLastFailedClusteringJob.booleanValue()) {
                HoodieSparkTable create = HoodieSparkTable.create(createHoodieClient.getConfig(), createHoodieClient.getEngineContext());
                createHoodieClient.validateAgainstTableProperties(create.getMetaClient().getTableConfig(), createHoodieClient.getConfig());
                Option<HoodieInstant> lastPendingClusterInstant = create.getActiveTimeline().getLastPendingClusterInstant();
                if (lastPendingClusterInstant.isPresent()) {
                    HoodieInstant hoodieInstant = lastPendingClusterInstant.get();
                    if (TimelineUtils.parseDateFromInstantTime(hoodieInstant.requestedTime()).getTime() + this.cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
                        LOG.info("Found failed clustering instant at : " + hoodieInstant + "; Will rollback the failed clustering and re-trigger again.");
                        empty = Option.of(hoodieInstant.requestedTime());
                    } else {
                        LOG.info(hoodieInstant + " might still be in progress, will trigger a new clustering job.");
                    }
                }
            }
            Option<String> doSchedule = empty.isPresent() ? empty : doSchedule(createHoodieClient);
            if (!doSchedule.isPresent()) {
                LOG.info("Couldn't generate cluster plan");
                if (createHoodieClient != null) {
                    if (0 != 0) {
                        try {
                            createHoodieClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createHoodieClient.close();
                    }
                }
                return -1;
            }
            LOG.info("The schedule instant time is " + doSchedule.get());
            LOG.info("Step 2: Do cluster");
            Option<HoodieCommitMetadata> commitMetadata = createHoodieClient.cluster(doSchedule.get()).getCommitMetadata();
            clean(createHoodieClient);
            int handleErrors = UtilHelpers.handleErrors(commitMetadata.get(), doSchedule.get());
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            return handleErrors;
        } catch (Throwable th4) {
            if (createHoodieClient != null) {
                if (0 != 0) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            throw th4;
        }
    }

    private int doPurgePendingInstant(JavaSparkContext javaSparkContext) throws Exception {
        this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
        SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient = UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, UtilHelpers.getSchemaFromLatestInstant(this.metaClient), this.cfg.parallelism, Option.empty(), this.props);
        Throwable th = null;
        try {
            try {
                createHoodieClient.purgePendingClustering(this.cfg.clusteringInstantTime);
                if (createHoodieClient == null) {
                    return 0;
                }
                if (0 == 0) {
                    createHoodieClient.close();
                    return 0;
                }
                try {
                    createHoodieClient.close();
                    return 0;
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                    return 0;
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createHoodieClient != null) {
                if (th != null) {
                    try {
                        createHoodieClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createHoodieClient.close();
                }
            }
            throw th4;
        }
    }

    private void clean(SparkRDDWriteClient<?> sparkRDDWriteClient) {
        if (this.cfg.skipClean.booleanValue() || !sparkRDDWriteClient.getConfig().isAutoClean()) {
            return;
        }
        sparkRDDWriteClient.clean();
    }
}
