package org.apache.paimon.flink.action.cdc.postgres;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.class */
public class PostgresSyncTableAction extends SyncTableActionBase {
    private JdbcSchemasInfo postgresSchemasInfo;

    public PostgresSyncTableAction(String str, String str2, Map<String, String> map, Map<String, String> map2) {
        super(str, str2, map, map2, SyncJobHandler.SourceType.POSTGRES);
    }

    @Override // org.apache.paimon.flink.action.cdc.SyncTableActionBase
    protected Schema retrieveSchema() throws Exception {
        this.postgresSchemasInfo = PostgresActionUtils.getPostgresTableInfos(this.cdcSourceConfig, monitorTablePredication(), new ArrayList(), this.typeMapping);
        validatePostgresTableInfos(this.postgresSchemasInfo);
        return this.postgresSchemasInfo.mergeAll().schema();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    /* renamed from: buildSource, reason: merged with bridge method [inline-methods] */
    public JdbcIncrementalSource<CdcSourceRecord> mo231buildSource() {
        validateRuntimeExecutionMode();
        List<JdbcSchemasInfo.JdbcSchemaInfo> pkTables = this.postgresSchemasInfo.pkTables();
        HashSet hashSet = new HashSet();
        String[] strArr = new String[pkTables.size()];
        for (int i = 0; i < pkTables.size(); i++) {
            JdbcSchemasInfo.JdbcSchemaInfo jdbcSchemaInfo = pkTables.get(i);
            strArr[i] = jdbcSchemaInfo.schemaName() + Path.CUR_DIR + jdbcSchemaInfo.identifier().getObjectName();
            hashSet.add(jdbcSchemaInfo.schemaName());
        }
        return PostgresActionUtils.buildPostgresSource(this.cdcSourceConfig, (String[]) hashSet.toArray(new String[0]), strArr);
    }

    private void validatePostgresTableInfos(JdbcSchemasInfo jdbcSchemasInfo) {
        List<Identifier> nonPkTables = jdbcSchemasInfo.nonPkTables();
        Preconditions.checkArgument(nonPkTables.isEmpty(), "Source tables of PostgreSQL table synchronization job cannot contain table which doesn't have primary keys.\nThey are: %s", nonPkTables.stream().map((v0) -> {
            return v0.getFullName();
        }).collect(Collectors.joining(CoreOptions.FIELDS_SEPARATOR)));
        Preconditions.checkArgument(!jdbcSchemasInfo.pkTables().isEmpty(), "No table satisfies the given database name and table name.");
    }

    private Predicate<String> monitorTablePredication() {
        return str -> {
            return Pattern.compile((String) this.cdcSourceConfig.get(PostgresSourceOptions.TABLE_NAME)).matcher(str).matches();
        };
    }
}
