package org.apache.paimon.flink.action;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
import org.apache.paimon.flink.sink.CombinedTableCompactorSink;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/CompactDatabaseAction.class */
public class CompactDatabaseAction extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(CompactDatabaseAction.class);
    private Pattern includingPattern;

    @Nullable
    private Pattern excludingPattern;
    private Pattern databasePattern;
    private MultiTablesSinkMode databaseCompactMode;
    private final Map<String, FileStoreTable> tableMap;
    private Options tableOptions;

    @Nullable
    private Duration partitionIdleTime;
    private Boolean fullCompaction;
    private boolean isStreaming;

    public CompactDatabaseAction(Map<String, String> map) {
        super(map);
        this.includingPattern = Pattern.compile(".*");
        this.databasePattern = Pattern.compile(".*");
        this.databaseCompactMode = MultiTablesSinkMode.DIVIDED;
        this.tableMap = new HashMap();
        this.tableOptions = new Options();
        this.partitionIdleTime = null;
    }

    public CompactDatabaseAction includingDatabases(@Nullable String str) {
        if (str != null) {
            this.databasePattern = Pattern.compile(str);
        }
        return this;
    }

    public CompactDatabaseAction includingTables(@Nullable String str) {
        if (str != null) {
            this.includingPattern = Pattern.compile(str);
        }
        return this;
    }

    public CompactDatabaseAction excludingTables(@Nullable String str) {
        this.excludingPattern = str == null ? null : Pattern.compile(str);
        return this;
    }

    public CompactDatabaseAction withDatabaseCompactMode(@Nullable String str) {
        this.databaseCompactMode = MultiTablesSinkMode.fromString(str);
        return this;
    }

    public CompactDatabaseAction withTableOptions(Map<String, String> map) {
        this.tableOptions = Options.fromMap(map);
        return this;
    }

    public CompactDatabaseAction withPartitionIdleTime(@Nullable Duration duration) {
        this.partitionIdleTime = duration;
        return this;
    }

    public CompactDatabaseAction withFullCompaction(boolean z) {
        this.fullCompaction = Boolean.valueOf(z);
        return this;
    }

    private boolean shouldCompactionTable(String str) {
        boolean matches = this.includingPattern.matcher(str).matches();
        if (this.excludingPattern != null) {
            matches = matches && !this.excludingPattern.matcher(str).matches();
        }
        if (!matches) {
            LOG.debug("Source table '{}' is excluded.", str);
        }
        return matches;
    }

    @Override // org.apache.paimon.flink.action.Action
    public void build() {
        this.isStreaming = this.env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (this.fullCompaction == null) {
            this.fullCompaction = Boolean.valueOf(!this.isStreaming);
        }
        if (this.databaseCompactMode == MultiTablesSinkMode.DIVIDED) {
            buildForDividedMode();
        } else {
            buildForCombinedMode();
        }
    }

    private void buildForDividedMode() {
        try {
            for (String str : this.catalog.listDatabases()) {
                if (this.databasePattern.matcher(str).matches()) {
                    for (String str2 : this.catalog.listTables(str)) {
                        String format = String.format("%s.%s", str, str2);
                        if (shouldCompactionTable(format)) {
                            Table table = this.catalog.getTable(Identifier.create(str, str2));
                            if (table instanceof FileStoreTable) {
                                HashMap hashMap = new HashMap(this.tableOptions.toMap());
                                hashMap.put(CoreOptions.WRITE_ONLY.key(), "false");
                                this.tableMap.put(format, (FileStoreTable) table.copy(hashMap));
                            } else {
                                LOG.error(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName()));
                            }
                        } else {
                            LOG.debug("The table {} is excluded.", format);
                        }
                    }
                }
            }
            Preconditions.checkState(!this.tableMap.isEmpty(), "no tables to be compacted. possible cause is that there are no tables detected after pattern matching");
            for (Map.Entry<String, FileStoreTable> entry : this.tableMap.entrySet()) {
                FileStoreTable value = entry.getValue();
                switch (value.bucketMode()) {
                    case BUCKET_UNAWARE:
                        buildForUnawareBucketCompaction(this.env, entry.getKey(), value);
                        break;
                    case HASH_FIXED:
                    case HASH_DYNAMIC:
                    default:
                        buildForTraditionalCompaction(this.env, entry.getKey(), value);
                        break;
                }
            }
        } catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException e) {
            throw new RuntimeException(e);
        }
    }

    private void buildForCombinedMode() {
        CombinedTableCompactorSourceBuilder withPartitionIdleTime = new CombinedTableCompactorSourceBuilder(catalogLoader(), this.databasePattern, this.includingPattern, this.excludingPattern, ((Duration) this.tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis()).withPartitionIdleTime(this.partitionIdleTime);
        Integer valueOf = this.tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM) == null ? Integer.valueOf(this.env.getParallelism()) : (Integer) this.tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM);
        new CombinedTableCompactorSink(catalogLoader(), this.tableOptions, this.fullCompaction.booleanValue()).sinkFrom(FlinkStreamPartitioner.partition(withPartitionIdleTime.withEnv(this.env).withContinuousMode(this.isStreaming).buildAwareBucketTableSource(), new BucketsRowChannelComputer(), valueOf), FlinkStreamPartitioner.rebalance(withPartitionIdleTime.withEnv(this.env).withContinuousMode(this.isStreaming).buildForUnawareBucketsTableSource(), valueOf));
    }

    private void buildForTraditionalCompaction(StreamExecutionEnvironment streamExecutionEnvironment, String str, FileStoreTable fileStoreTable) {
        Preconditions.checkArgument((this.fullCompaction.booleanValue() && this.isStreaming) ? false : true, "The full compact strategy is only supported in batch mode. Please add -Dexecution.runtime-mode=BATCH.");
        if (this.isStreaming) {
            fileStoreTable = fileStoreTable.copy((Map<String, String>) new HashMap<String, String>() { // from class: org.apache.paimon.flink.action.CompactDatabaseAction.1
                {
                    put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                    put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                    put(CoreOptions.LOOKUP_WAIT.key(), "false");
                }
            });
        }
        new CompactorSinkBuilder(fileStoreTable, this.fullCompaction.booleanValue()).withInput(new CompactorSourceBuilder(str, fileStoreTable).withPartitionIdleTime(this.partitionIdleTime).withEnv(streamExecutionEnvironment).withContinuousMode(this.isStreaming).build()).build();
    }

    private void buildForUnawareBucketCompaction(StreamExecutionEnvironment streamExecutionEnvironment, String str, FileStoreTable fileStoreTable) {
        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(streamExecutionEnvironment, str, fileStoreTable);
        unawareBucketCompactionTopoBuilder.withContinuousMode(this.isStreaming);
        unawareBucketCompactionTopoBuilder.withPartitionIdleTime(this.partitionIdleTime);
        unawareBucketCompactionTopoBuilder.build();
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        build();
        execute("Compact database job");
    }
}
