package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
import org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator;
import org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.class */
public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduleCompactionActionExecutor.class);
    private final WriteOperationType operationType;
    private final Option<Map<String, String>> extraMetadata;
    private BaseHoodieCompactionPlanGenerator planGenerator;

    public ScheduleCompactionActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable<T, I, K, O> hoodieTable, String str, Option<Map<String, String>> option, WriteOperationType writeOperationType) {
        super(hoodieEngineContext, hoodieWriteConfig, hoodieTable, str);
        this.extraMetadata = option;
        this.operationType = writeOperationType;
        ValidationUtils.checkArgument(writeOperationType == WriteOperationType.COMPACT || writeOperationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported");
        initPlanGenerator(hoodieEngineContext, hoodieWriteConfig, hoodieTable);
    }

    private void initPlanGenerator(HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable<T, I, K, O> hoodieTable) {
        if (WriteOperationType.COMPACT.equals(this.operationType)) {
            this.planGenerator = createCompactionPlanGenerator(ConfigUtils.getStringWithAltKeys((Properties) hoodieWriteConfig.getProps(), HoodieCompactionConfig.COMPACTION_PLAN_GENERATOR, true), hoodieTable, hoodieEngineContext, hoodieWriteConfig);
        } else {
            this.planGenerator = new HoodieLogCompactionPlanGenerator(hoodieTable, hoodieEngineContext, hoodieWriteConfig, this);
        }
    }

    @Override // org.apache.hudi.table.action.BaseActionExecutor
    public Option<HoodieCompactionPlan> execute() {
        ValidationUtils.checkArgument(this.table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not " + this.table.getMetaClient().getTableType().name());
        if (!this.table.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) && !this.config.getWriteConcurrencyMode().supportsMultiWriter() && !this.config.getFailedWritesCleanPolicy().isLazy() && this.config.getEngineType() == EngineType.SPARK) {
            Option<HoodieInstant> firstInstant = this.table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction().firstInstant();
            if (firstInstant.isPresent() && !InstantComparison.compareTimestamps(firstInstant.get().requestedTime(), InstantComparison.GREATER_THAN, this.instantTime)) {
                LOG.warn("Earliest write inflight instant time must be later than compaction time. Earliest :" + firstInstant.get() + ", Compaction scheduled at " + this.instantTime + ". Hence skipping to schedule compaction");
                return Option.empty();
            }
        }
        HoodieCompactionPlan scheduleCompaction = scheduleCompaction();
        Option<HoodieCompactionPlan> empty = Option.empty();
        if (scheduleCompaction != null && CollectionUtils.nonEmpty(scheduleCompaction.getOperations())) {
            Option<Map<String, String>> option = this.extraMetadata;
            scheduleCompaction.getClass();
            option.ifPresent(scheduleCompaction::setExtraMetadata);
            if (this.operationType.equals(WriteOperationType.COMPACT)) {
                this.table.getActiveTimeline().saveToCompactionRequested(this.instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, this.instantTime), scheduleCompaction);
            } else {
                this.table.getActiveTimeline().saveToLogCompactionRequested(this.instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.LOG_COMPACTION_ACTION, this.instantTime), scheduleCompaction);
            }
            empty = Option.of(scheduleCompaction);
        }
        return empty;
    }

    @Nullable
    private HoodieCompactionPlan scheduleCompaction() {
        LOG.info("Checking if compaction needs to be run on " + this.config.getBasePath());
        if (!needCompact(this.config.getInlineCompactTriggerStrategy())) {
            return new HoodieCompactionPlan();
        }
        LOG.info("Generating compaction plan for merge on read table " + this.config.getBasePath());
        try {
            this.context.setJobStatus(getClass().getSimpleName(), "Compaction: generating compaction plan");
            return this.planGenerator.generateCompactionPlan(this.instantTime);
        } catch (IOException e) {
            throw new HoodieCompactionException("Could not schedule compaction " + this.config.getBasePath(), e);
        }
    }

    private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
        Option<Pair<HoodieTimeline, HoodieInstant>> completedDeltaCommitsSinceLatestCompaction = CompactionUtils.getCompletedDeltaCommitsSinceLatestCompaction(this.table.getActiveTimeline());
        return completedDeltaCommitsSinceLatestCompaction.isPresent() ? Option.of(Pair.of(Integer.valueOf(completedDeltaCommitsSinceLatestCompaction.get().getLeft().countInstants()), completedDeltaCommitsSinceLatestCompaction.get().getRight().requestedTime())) : Option.empty();
    }

    private Option<Pair<Integer, String>> getLatestDeltaCommitInfoSinceLastCompactionRequest() {
        Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsSinceLatestCompactionRequest = CompactionUtils.getDeltaCommitsSinceLatestCompactionRequest(this.table.getActiveTimeline());
        return deltaCommitsSinceLatestCompactionRequest.isPresent() ? Option.of(Pair.of(Integer.valueOf(deltaCommitsSinceLatestCompactionRequest.get().getLeft().countInstants()), deltaCommitsSinceLatestCompactionRequest.get().getRight().requestedTime())) : Option.empty();
    }

    private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
        boolean z;
        Option<Pair<Integer, String>> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
        if (!latestDeltaCommitInfo.isPresent()) {
            return false;
        }
        Pair<Integer, String> pair = latestDeltaCommitInfo.get();
        if (WriteOperationType.LOG_COMPACT.equals(this.operationType)) {
            return true;
        }
        int inlineCompactDeltaCommitMax = this.config.getInlineCompactDeltaCommitMax();
        int inlineCompactDeltaSecondsMax = this.config.getInlineCompactDeltaSecondsMax();
        switch (compactionTriggerStrategy) {
            case NUM_COMMITS:
                z = inlineCompactDeltaCommitMax <= pair.getLeft().intValue();
                if (z) {
                    LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", Integer.valueOf(inlineCompactDeltaCommitMax)));
                    break;
                }
                break;
            case NUM_COMMITS_AFTER_LAST_REQUEST:
                Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLastCompactionRequest = getLatestDeltaCommitInfoSinceLastCompactionRequest();
                if (!latestDeltaCommitInfoSinceLastCompactionRequest.isPresent()) {
                    return false;
                }
                z = inlineCompactDeltaCommitMax <= latestDeltaCommitInfoSinceLastCompactionRequest.get().getLeft().intValue();
                if (z) {
                    LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", Integer.valueOf(inlineCompactDeltaCommitMax)));
                    break;
                }
                break;
            case TIME_ELAPSED:
                z = ((long) inlineCompactDeltaSecondsMax) <= parsedToSeconds(this.instantTime).longValue() - parsedToSeconds(pair.getRight()).longValue();
                if (z) {
                    LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", Integer.valueOf(inlineCompactDeltaSecondsMax)));
                    break;
                }
                break;
            case NUM_OR_TIME:
                z = inlineCompactDeltaCommitMax <= pair.getLeft().intValue() || ((long) inlineCompactDeltaSecondsMax) <= parsedToSeconds(this.instantTime).longValue() - parsedToSeconds(pair.getRight()).longValue();
                if (z) {
                    LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", Integer.valueOf(inlineCompactDeltaCommitMax), Integer.valueOf(inlineCompactDeltaSecondsMax)));
                    break;
                }
                break;
            case NUM_AND_TIME:
                z = inlineCompactDeltaCommitMax <= pair.getLeft().intValue() && ((long) inlineCompactDeltaSecondsMax) <= parsedToSeconds(this.instantTime).longValue() - parsedToSeconds(pair.getRight()).longValue();
                if (z) {
                    LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", Integer.valueOf(inlineCompactDeltaCommitMax), Integer.valueOf(inlineCompactDeltaSecondsMax)));
                    break;
                }
                break;
            default:
                throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + this.config.getInlineCompactTriggerStrategy());
        }
        return z;
    }

    private Long parsedToSeconds(String str) {
        return Long.valueOf(TimelineUtils.parseDateFromInstantTimeSafely(str).orElseThrow(() -> {
            return new HoodieCompactionException("Failed to parse timestamp " + str);
        }).getTime() / 1000);
    }

    private BaseHoodieCompactionPlanGenerator createCompactionPlanGenerator(String str, HoodieTable hoodieTable, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig) {
        return (BaseHoodieCompactionPlanGenerator) ReflectionUtils.loadClass(str, new Class[]{HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class, BaseTableServicePlanActionExecutor.class}, hoodieTable, hoodieEngineContext, hoodieWriteConfig, this);
    }
}
