package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/SortCompactAction.class */
public class SortCompactAction extends CompactAction {
    private static final Logger LOG = LoggerFactory.getLogger(SortCompactAction.class);
    private String sortStrategy;
    private List<String> orderColumns;

    public SortCompactAction(String str, String str2, Map<String, String> map, Map<String, String> map2) {
        super(str, str2, map, map2);
        this.table = this.table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
    }

    @Override // org.apache.paimon.flink.action.CompactAction, org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        build();
        execute("Sort Compact Job");
    }

    @Override // org.apache.paimon.flink.action.CompactAction, org.apache.paimon.flink.action.Action
    public void build() throws Exception {
        if (this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) != RuntimeExecutionMode.BATCH) {
            LOG.warn("Sort Compact only support batch mode yet. Please add -Dexecution.runtime-mode=BATCH. The action this time will shift to batch mode forcely.");
            this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        }
        FileStoreTable fileStoreTable = (FileStoreTable) this.table;
        if (fileStoreTable.bucketMode() != BucketMode.BUCKET_UNAWARE && fileStoreTable.bucketMode() != BucketMode.HASH_DYNAMIC) {
            throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");
        }
        Map<String, String> options = fileStoreTable.options();
        FlinkSourceBuilder sourceName = new FlinkSourceBuilder(fileStoreTable).sourceName(ObjectIdentifier.of(this.catalogName, this.identifier.getDatabaseName(), this.identifier.getObjectName()).asSummaryString());
        sourceName.predicate(getPredicate());
        String str = options.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
        if (str != null) {
            sourceName.sourceParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        DataStream<RowData> build = sourceName.env(this.env).sourceBounded(true).build();
        int intValue = ((FileStoreTable) this.table).coreOptions().getLocalSampleMagnification().intValue();
        if (intValue < 20) {
            throw new IllegalArgumentException(String.format("the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.", CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(), Integer.valueOf(intValue)));
        }
        String str2 = this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        int parallelism = str2 == null ? build.getParallelism() : Integer.parseInt(str2);
        new SortCompactSinkBuilder(fileStoreTable).forCompact(true).forRowData(TableSorter.getSorter(this.env, build, fileStoreTable, new TableSortInfo.Builder().setSortColumns(this.orderColumns).setSortStrategy(TableSorter.OrderType.of(this.sortStrategy)).setSinkParallelism(parallelism).setLocalSampleSize(parallelism * intValue).setGlobalSampleSize(parallelism * 1000).setRangeNumber(parallelism * 10).build()).sort()).overwrite().build();
    }

    public SortCompactAction withOrderStrategy(String str) {
        this.sortStrategy = str;
        return this;
    }

    public SortCompactAction withOrderColumns(String... strArr) {
        return withOrderColumns(Arrays.asList(strArr));
    }

    public SortCompactAction withOrderColumns(List<String> list) {
        this.orderColumns = (List) list.stream().map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toList());
        return this;
    }
}
