package org.apache.paimon.flink.procedure;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.paimon.flink.action.MigrateDatabaseActionFactory;
import org.apache.paimon.flink.action.QueryServiceActionFactory;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.table.system.OptionsTable;
import org.apache.paimon.utils.ParameterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.class */
public class MigrateDatabaseProcedure extends ProcedureBase {
    private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);

    @Override // org.apache.paimon.factories.Factory
    public String identifier() {
        return MigrateDatabaseActionFactory.IDENTIFIER;
    }

    @ProcedureHint(argument = {@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")), @ArgumentHint(name = "source_database", type = @DataTypeHint("STRING")), @ArgumentHint(name = OptionsTable.OPTIONS, type = @DataTypeHint("STRING"), isOptional = true), @ArgumentHint(name = QueryServiceActionFactory.PARALLELISM, type = @DataTypeHint("Integer"), isOptional = true)})
    public String[] call(ProcedureContext procedureContext, String str, String str2, String str3, Integer num) throws Exception {
        int i = 0;
        int i2 = 0;
        for (Migrator migrator : TableMigrationUtils.getImporters(str, this.catalog, str2, Integer.valueOf(num == null ? Runtime.getRuntime().availableProcessors() : num.intValue()), ParameterUtils.parseCommaSeparatedKeyValues(notnull(str3)))) {
            try {
                migrator.executeMigrate();
                migrator.renameTable(false);
                i2++;
            } catch (Exception e) {
                i++;
                LOG.error("Call migrate_database error:" + e.getMessage());
            }
        }
        return new String[]{String.format("migrate database is finished, success cnt: %s , failed cnt: %s", String.valueOf(i2), String.valueOf(i))};
    }
}
