package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.database.dialect.service.api.ColumnDefinition;
import org.apache.nifi.database.dialect.service.api.DatabaseDialectService;
import org.apache.nifi.database.dialect.service.api.StandardColumnDefinition;
import org.apache.nifi.database.dialect.service.api.StandardStatementRequest;
import org.apache.nifi.database.dialect.service.api.StatementResponse;
import org.apache.nifi.database.dialect.service.api.StatementType;
import org.apache.nifi.database.dialect.service.api.TableDefinition;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.standard.db.ColumnDescription;
import org.apache.nifi.processors.standard.db.DatabaseAdapterDescriptor;
import org.apache.nifi.processors.standard.db.NameNormalizer;
import org.apache.nifi.processors.standard.db.NameNormalizerFactory;
import org.apache.nifi.processors.standard.db.TableNotFoundException;
import org.apache.nifi.processors.standard.db.TableSchema;
import org.apache.nifi.processors.standard.db.TranslationStrategy;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("This processor uses a JDBC connection and incoming records to generate any database table changes needed to support the incoming records. It expects a 'flat' record layout, meaning none of the top-level record fields has nested fields that are intended to become columns themselves.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"metadata", "jdbc", "database", "table", "update", "alter"})
@WritesAttributes({@WritesAttribute(attribute = UpdateDatabaseTable.ATTR_OUTPUT_TABLE, description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the target table name."), @WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned)."), @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer, only if a Record Writer is specified and Update Field Names is 'true'."), @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile, only if a Record Writer is specified and Update Field Names is 'true'.")})
/* loaded from: input_file:org/apache/nifi/processors/standard/UpdateDatabaseTable.class */
public class UpdateDatabaseTable extends AbstractProcessor {
    static final String ATTR_OUTPUT_TABLE = "output.table";
    static final AllowableValue CREATE_IF_NOT_EXISTS = new AllowableValue("Create If Not Exists", "Create If Not Exists", "Create a table with the given schema if it does not already exist");
    static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists", "If the target does not already exist, log an error and route the flowfile to failure");
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The service for reading incoming flow files. The reader is only used to determine the schema of the records, the actual records will not be processed.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("updatedatabasetable-dbcp-service").displayName("Database Connection Pooling Service").description("The Controller Service that is used to obtain connection(s) to the database").required(true).identifiesControllerService(DBCPService.class).build();
    static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder().name("updatedatabasetable-catalog-name").displayName("Catalog Name").description("The name of the catalog that the statement should update. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the property is set and the database is case-sensitive, the catalog name must match the database's catalog name exactly.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder().name("updatedatabasetable-schema-name").displayName("Schema Name").description("The name of the database schema that the table belongs to. This may not apply for the database that you are updating. In this case, leave the field empty. Note that if the property is set and the database is case-sensitive, the schema name must match the database's schema name exactly.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("updatedatabasetable-table-name").displayName("Table Name").description("The name of the database table to update. If the table does not exist, then it will either be created or an error thrown, depending on the value of the Create Table property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder().name("updatedatabasetable-create-table").displayName("Create Table Strategy").description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).").required(true).addValidator(Validator.VALID).allowableValues(new DescribedValue[]{CREATE_IF_NOT_EXISTS, FAIL_IF_NOT_EXISTS}).defaultValue(FAIL_IF_NOT_EXISTS.getValue()).build();
    static final PropertyDescriptor PRIMARY_KEY_FIELDS = new PropertyDescriptor.Builder().name("updatedatabasetable-primary-keys").displayName("Primary Key Fields").description("A comma-separated list of record field names that uniquely identifies a row in the database. This property is only used if the specified table needs to be created, in which case the Primary Key Fields will be used to specify the primary keys of the newly-created table. IMPORTANT: Primary Key Fields must match the record field names exactly unless 'Quote Column Identifiers' is false and the database allows for case-insensitive column names. In practice it is best to specify Primary Key Fields that exactly match the record field names, and those will become the column names in the created table.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(CREATE_TABLE, new AllowableValue[]{CREATE_IF_NOT_EXISTS}).build();
    static final PropertyDescriptor TRANSLATE_FIELD_NAMES = new PropertyDescriptor.Builder().name("updatedatabasetable-translate-field-names").displayName("Translate Field Names").description("If true, the Processor will attempt to translate field names into the corresponding column names for the table specified, for the purposes of determining whether the field name exists as a column in the target table. NOTE: If the target table does not exist and is to be created, this property is ignored and the field names will be used as-is. If false, the field names must match the column names exactly, or the column may not be found and instead an error my be reported that the column already exists.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor TRANSLATION_STRATEGY = new PropertyDescriptor.Builder().required(true).name("Column Name Translation Strategy").description("The strategy used to normalize table column name. Column Name will be uppercased to do case-insensitive matching irrespective of strategy").allowableValues(TranslationStrategy.class).defaultValue(TranslationStrategy.REMOVE_UNDERSCORE.getValue()).dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue(), new String[0]).build();
    public static final PropertyDescriptor TRANSLATION_PATTERN = new PropertyDescriptor.Builder().name("Column Name Translation Pattern").displayName("Column Name Translation Pattern").description("Column name will be normalized with this regular expression").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).dependsOn(TRANSLATE_FIELD_NAMES, TRANSLATE_FIELD_NAMES.getDefaultValue(), new String[0]).dependsOn(TRANSLATION_STRATEGY, TranslationStrategy.PATTERN.getValue(), new String[0]).build();
    static final PropertyDescriptor UPDATE_FIELD_NAMES = new PropertyDescriptor.Builder().name("updatedatabasetable-update-field-names").displayName("Update Field Names").description("This property indicates whether to update the output schema such that the field names are set to the exact column names from the specified table. This should be used if the incoming record field names may not match the table's column names in terms of upper- and lower-case. For example, this property should be set to true if the output FlowFile is destined for Oracle e.g., which expects the field names to match the column names exactly. NOTE: The value of the 'Translate Field Names' property is ignored when updating field names; instead they are updated to match the column name as returned by the database.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("updatedatabasetable-record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer should use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types. If Create Table Strategy is set 'Create If Not Exists', the Record Writer's output format must match the Record Reader's format in order for the data to be placed in the created table location. Note that this property is only used if 'Update Field Names' is set to true and the field names do not all match the column names exactly. If no update is needed for any field names (or 'Update Field Names' is false), the Record Writer is not used and instead the input FlowFile is routed to success or failure without modification.").identifiesControllerService(RecordSetWriterFactory.class).dependsOn(UPDATE_FIELD_NAMES, "true", new String[0]).required(true).build();
    static final PropertyDescriptor QUOTE_COLUMN_IDENTIFIERS = new PropertyDescriptor.Builder().name("updatedatabasetable-quoted-column-identifiers").displayName("Quote Column Identifiers").description("Enabling this option will cause all column names to be quoted, allowing you to use reserved words as column names in your tables and/or forcing the record field names to match the column names exactly.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor QUOTE_TABLE_IDENTIFIER = new PropertyDescriptor.Builder().name("updatedatabasetable-quoted-table-identifiers").displayName("Quote Table Identifiers").description("Enabling this option will cause the table name to be quoted to support the use of special characters in the table name and/or forcing the value of the Table Name property to match the target table name exactly.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("updatedatabasetable-query-timeout").displayName("Query Timeout").description("Sets the number of seconds the driver will wait for a query to execute. A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.").defaultValue("0").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor("db-type");
    static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing records routed to this relationship after the record has been successfully transmitted to the database.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing records routed to this relationship if the record could not be transmitted to the database.").build();
    protected static Set<Relationship> relationships = Set.of(REL_SUCCESS, REL_FAILURE);
    private static final List<PropertyDescriptor> properties = List.of((Object[]) new PropertyDescriptor[]{RECORD_READER, DBCP_SERVICE, DB_TYPE, DATABASE_DIALECT_SERVICE, CATALOG_NAME, SCHEMA_NAME, TABLE_NAME, CREATE_TABLE, PRIMARY_KEY_FIELDS, TRANSLATE_FIELD_NAMES, TRANSLATION_STRATEGY, TRANSLATION_PATTERN, UPDATE_FIELD_NAMES, RECORD_WRITER_FACTORY, QUOTE_TABLE_IDENTIFIER, QUOTE_COLUMN_IDENTIFIERS, QUERY_TIMEOUT});

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/UpdateDatabaseTable$OutputMetadataHolder.class */
    public static class OutputMetadataHolder {
        private final RecordSchema outputSchema;
        private final Map<String, String> fieldMap;

        public OutputMetadataHolder(RecordSchema recordSchema, Map<String, String> map) {
            this.outputSchema = recordSchema;
            this.fieldMap = map;
        }

        public RecordSchema getOutputSchema() {
            return this.outputSchema;
        }

        public Map<String, String> getFieldMap() {
            return this.fieldMap;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        boolean isSet = validationContext.getProperty(RECORD_WRITER_FACTORY).isSet();
        boolean booleanValue = validationContext.getProperty(UPDATE_FIELD_NAMES).asBoolean().booleanValue();
        if (!isSet && booleanValue) {
            arrayList.add(new ValidationResult.Builder().subject(RECORD_WRITER_FACTORY.getDisplayName()).explanation("Record Writer must be set if 'Update Field Names' is true").valid(false).build());
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        HashSet hashSet;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory asControllerService2 = processContext.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        String value = processContext.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value4 = processContext.getProperty(PRIMARY_KEY_FIELDS).evaluateAttributeExpressions(flowFile).getValue();
        ComponentLog logger = getLogger();
        try {
            try {
                try {
                    InputStream read = processSession.read(flowFile);
                    try {
                        try {
                            RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                            if (read != null) {
                                read.close();
                            }
                            RecordSchema schema = createRecordReader.getSchema();
                            boolean equals = processContext.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
                            boolean booleanValue = processContext.getProperty(UPDATE_FIELD_NAMES).asBoolean().booleanValue();
                            boolean booleanValue2 = processContext.getProperty(TRANSLATE_FIELD_NAMES).asBoolean().booleanValue();
                            TranslationStrategy valueOf = TranslationStrategy.valueOf(processContext.getProperty(TRANSLATION_STRATEGY).getValue());
                            String value5 = processContext.getProperty(TRANSLATION_PATTERN).getValue();
                            Pattern compile = value5 == null ? null : Pattern.compile(value5);
                            NameNormalizer nameNormalizer = null;
                            if (booleanValue2) {
                                nameNormalizer = NameNormalizerFactory.getNormalizer(valueOf, compile);
                            }
                            if (asControllerService2 == null && booleanValue) {
                                throw new ProcessException("Record Writer must be set if 'Update Field Names' is true");
                            }
                            DBCPService asControllerService3 = processContext.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
                            DatabaseDialectService databaseDialectService = DatabaseAdapterDescriptor.getDatabaseDialectService(processContext, DATABASE_DIALECT_SERVICE, processContext.getProperty(DB_TYPE).getValue());
                            Connection connection = asControllerService3.getConnection(flowFile.getAttributes());
                            try {
                                boolean booleanValue3 = processContext.getProperty(QUOTE_TABLE_IDENTIFIER).asBoolean().booleanValue();
                                boolean booleanValue4 = processContext.getProperty(QUOTE_COLUMN_IDENTIFIERS).asBoolean().booleanValue();
                                HashMap hashMap = new HashMap(flowFile.getAttributes());
                                if (!equals || value4 == null) {
                                    hashSet = null;
                                } else {
                                    hashSet = new HashSet();
                                    Stream map = Arrays.stream(value4.split(",")).filter(str -> {
                                        return (str == null || str.trim().isEmpty()) ? false : true;
                                    }).map((v0) -> {
                                        return v0.trim();
                                    });
                                    Objects.requireNonNull(hashSet);
                                    map.forEach((v1) -> {
                                        r1.add(v1);
                                    });
                                }
                                OutputMetadataHolder checkAndUpdateTableSchema = checkAndUpdateTableSchema(connection, databaseDialectService, schema, value, value2, value3, equals, booleanValue2, nameNormalizer, booleanValue, hashSet, booleanValue3, booleanValue4);
                                if (checkAndUpdateTableSchema != null) {
                                    try {
                                        flowFile = processSession.write(flowFile, (inputStream, outputStream) -> {
                                            try {
                                                RecordReader createRecordReader2 = asControllerService.createRecordReader(flowFile, inputStream, getLogger());
                                                RecordSetWriter createWriter = asControllerService2.createWriter(getLogger(), checkAndUpdateTableSchema.getOutputSchema(), outputStream, hashMap);
                                                WriteResult updateRecords = updateRecords(schema, checkAndUpdateTableSchema, createRecordReader2, createWriter);
                                                createWriter.flush();
                                                createWriter.close();
                                                hashMap.put("record.count", String.valueOf(updateRecords.getRecordCount()));
                                                hashMap.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                                                hashMap.putAll(updateRecords.getAttributes());
                                            } catch (Exception e) {
                                                if (!(e instanceof IOException)) {
                                                    throw new IOException("Unable to create RecordReader", e);
                                                }
                                                throw ((IOException) e);
                                            }
                                        });
                                    } catch (Exception e) {
                                        getLogger().error("Failed to process {}; will route to failure", new Object[]{flowFile, e});
                                        Throwable cause = e.getCause();
                                        if (cause != null) {
                                            processSession.putAttribute(flowFile, "record.error.message", cause.getLocalizedMessage() != null ? cause.getLocalizedMessage() : cause.getClass().getCanonicalName() + " Thrown");
                                        } else {
                                            processSession.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
                                        }
                                        processSession.transfer(flowFile, REL_FAILURE);
                                        if (connection != null) {
                                            connection.close();
                                            return;
                                        }
                                        return;
                                    }
                                }
                                hashMap.put(ATTR_OUTPUT_TABLE, value3);
                                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, hashMap);
                                processSession.getProvenanceReporter().invokeRemoteProcess(putAllAttributes, getJdbcUrl(connection));
                                processSession.transfer(putAllAttributes, REL_SUCCESS);
                                if (connection != null) {
                                    connection.close();
                                }
                            } catch (Throwable th) {
                                if (connection != null) {
                                    try {
                                        connection.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } catch (Throwable th3) {
                            if (read != null) {
                                try {
                                    read.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (Exception e2) {
                        throw new ProcessException("Unable to create RecordReader", e2);
                    }
                } catch (IOException | SQLException e3) {
                    FlowFile putAttribute = processSession.putAttribute(flowFile, ATTR_OUTPUT_TABLE, value3);
                    logger.error("Exception while processing {} - routing to failure", new Object[]{putAttribute, e3});
                    processSession.transfer(putAttribute, REL_FAILURE);
                }
            } catch (ProcessException e4) {
                logger.error("Failed to create {} for {} - routing to failure", new Object[]{RecordReader.class.getSimpleName(), flowFile, e4});
                Throwable cause2 = e4.getCause();
                if (cause2 != null) {
                    processSession.putAttribute(flowFile, "record.error.message", cause2.getLocalizedMessage() != null ? cause2.getLocalizedMessage() : cause2.getClass().getCanonicalName() + " Thrown");
                } else {
                    processSession.putAttribute(flowFile, "record.error.message", e4.getClass().getCanonicalName() + " Thrown");
                }
                processSession.transfer(flowFile, REL_FAILURE);
            }
        } catch (DiscontinuedException e5) {
            getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e5, e5});
            processSession.transfer(flowFile, Relationship.SELF);
        } catch (Throwable th5) {
            if (!(th5 instanceof ProcessException)) {
                throw new ProcessException(th5);
            }
        }
    }

    private synchronized OutputMetadataHolder checkAndUpdateTableSchema(Connection connection, DatabaseDialectService databaseDialectService, RecordSchema recordSchema, String str, String str2, String str3, boolean z, boolean z2, NameNormalizer nameNormalizer, boolean z3, Set<String> set, boolean z4, boolean z5) throws IOException {
        OutputMetadataHolder outputMetadataHolder;
        try {
            Statement createStatement = connection.createStatement();
            try {
                String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();
                TableSchema tableSchema = null;
                try {
                    tableSchema = TableSchema.from(connection, str, str2, str3, z2, nameNormalizer, null, getLogger());
                } catch (TableNotFoundException e) {
                }
                ArrayList arrayList = new ArrayList();
                boolean z6 = false;
                if (tableSchema == null) {
                    if (!z) {
                        throw new IOException("The table " + str3 + " could not be found in the database and the processor is configured not to create it.");
                    }
                    for (RecordField recordField : recordSchema.getFields()) {
                        String fieldName = recordField.getFieldName();
                        boolean z7 = recordField.getDefaultValue() != null;
                        String normalizedName = z2 ? nameNormalizer.getNormalizedName(fieldName) : fieldName;
                        arrayList.add(new ColumnDescription(enquoteIdentifier(normalizedName, identifierQuoteString, z5), DataTypeUtils.getSQLTypeValue(recordField.getDataType()), z7, null, recordField.isNullable()));
                        getLogger().debug("Adding column {} to table {}", new Object[]{normalizedName, str3});
                    }
                    tableSchema = new TableSchema(enquoteIdentifier(str, identifierQuoteString, z4), enquoteIdentifier(str2, identifierQuoteString, z4), enquoteIdentifier(str3, identifierQuoteString, z4), arrayList, z2, nameNormalizer, set, identifierQuoteString);
                    String sql = databaseDialectService.getStatement(new StandardStatementRequest(StatementType.CREATE, getTableDefinition(tableSchema))).sql();
                    if (StringUtils.isNotEmpty(sql)) {
                        getLogger().info("Executing DDL: {}", new Object[]{sql});
                        createStatement.execute(sql);
                    }
                    z6 = true;
                }
                ArrayList arrayList2 = new ArrayList();
                Iterator<ColumnDescription> it = tableSchema.getColumnsAsList().iterator();
                while (it.hasNext()) {
                    arrayList2.add(TableSchema.normalizedName(it.next().getColumnName(), z2, nameNormalizer));
                }
                ArrayList arrayList3 = new ArrayList();
                if (!z6) {
                    for (RecordField recordField2 : recordSchema.getFields()) {
                        String fieldName2 = recordField2.getFieldName();
                        String normalizedName2 = TableSchema.normalizedName(fieldName2, z2, nameNormalizer);
                        if (!arrayList2.contains(normalizedName2)) {
                            arrayList3.add(new ColumnDescription(normalizedName2, DataTypeUtils.getSQLTypeValue(recordField2.getDataType()), recordField2.getDefaultValue() != null, null, recordField2.isNullable()));
                            arrayList2.add(fieldName2);
                            getLogger().debug("Adding column {} to table {}", new Object[]{fieldName2, str3});
                        }
                    }
                    if (!arrayList3.isEmpty()) {
                        Stream map = arrayList3.stream().map(columnDescription -> {
                            return new StandardColumnDefinition(enquoteIdentifier(columnDescription.getColumnName(), identifierQuoteString, z5), columnDescription.getDataType(), columnDescription.isNullable() ? ColumnDefinition.Nullable.YES : ColumnDefinition.Nullable.UNKNOWN, columnDescription.isRequired());
                        });
                        Class<ColumnDefinition> cls = ColumnDefinition.class;
                        Objects.requireNonNull(ColumnDefinition.class);
                        StatementResponse statement = databaseDialectService.getStatement(new StandardStatementRequest(StatementType.ALTER, new TableDefinition(Optional.empty(), Optional.empty(), enquoteIdentifier(str3, identifierQuoteString, z4), map.map((v1) -> {
                            return r1.cast(v1);
                        }).toList())));
                        getLogger().info("Executing DDL: {}", new Object[]{statement.sql()});
                        createStatement.execute(statement.sql());
                    }
                }
                if (z3) {
                    List<RecordField> fields = recordSchema.getFields();
                    ArrayList arrayList4 = new ArrayList();
                    HashMap hashMap = new HashMap();
                    boolean z8 = false;
                    for (RecordField recordField3 : fields) {
                        String fieldName3 = recordField3.getFieldName();
                        boolean z9 = false;
                        Iterator it2 = arrayList2.iterator();
                        while (true) {
                            if (!it2.hasNext()) {
                                break;
                            }
                            String str4 = (String) it2.next();
                            if (fieldName3.equalsIgnoreCase(str4)) {
                                if (!fieldName3.equals(str4)) {
                                    z8 = true;
                                }
                                hashMap.put(fieldName3, str4);
                                arrayList4.add(new RecordField(str4, recordField3.getDataType(), recordField3.getDefaultValue(), recordField3.isNullable()));
                                z9 = true;
                            }
                        }
                        if (!z9) {
                            hashMap.put(fieldName3, fieldName3);
                        }
                    }
                    outputMetadataHolder = z8 ? new OutputMetadataHolder(new SimpleRecordSchema(arrayList4), hashMap) : null;
                } else {
                    outputMetadataHolder = null;
                }
                OutputMetadataHolder outputMetadataHolder2 = outputMetadataHolder;
                if (createStatement != null) {
                    createStatement.close();
                }
                return outputMetadataHolder2;
            } catch (Throwable th) {
                if (createStatement != null) {
                    try {
                        createStatement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e2) {
            throw new IOException(e2);
        }
    }

    private synchronized WriteResult updateRecords(RecordSchema recordSchema, OutputMetadataHolder outputMetadataHolder, RecordReader recordReader, RecordSetWriter recordSetWriter) throws IOException {
        try {
            recordSetWriter.beginRecordSet();
            while (true) {
                Record nextRecord = recordReader.nextRecord();
                if (nextRecord == null) {
                    return recordSetWriter.finishRecordSet();
                }
                HashMap hashMap = new HashMap(recordSchema.getFields().size());
                for (Map.Entry<String, String> entry : outputMetadataHolder.getFieldMap().entrySet()) {
                    hashMap.put(entry.getValue(), nextRecord.getValue(entry.getKey()));
                }
                recordSetWriter.write(new MapRecord(outputMetadataHolder.getOutputSchema(), hashMap));
            }
        } catch (MalformedRecordException e) {
            throw new IOException("Error reading records: " + e.getMessage(), e);
        }
    }

    private String getJdbcUrl(Connection connection) {
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            return metaData != null ? metaData.getURL() : "DBCPService";
        } catch (Exception e) {
            getLogger().warn("Could not determine JDBC URL based on the Driver Connection.", e);
            return "DBCPService";
        }
    }

    private TableDefinition getTableDefinition(TableSchema tableSchema) {
        Set<String> primaryKeyColumnNames = tableSchema.getPrimaryKeyColumnNames();
        Set<String> of = primaryKeyColumnNames == null ? Set.of() : primaryKeyColumnNames;
        Stream<R> map = tableSchema.getColumnsAsList().stream().map(columnDescription -> {
            return new StandardColumnDefinition(columnDescription.getColumnName(), columnDescription.getDataType(), columnDescription.isNullable() ? ColumnDefinition.Nullable.YES : ColumnDefinition.Nullable.NO, of.contains(columnDescription.getColumnName()));
        });
        Class<ColumnDefinition> cls = ColumnDefinition.class;
        Objects.requireNonNull(ColumnDefinition.class);
        return new TableDefinition(Optional.ofNullable(tableSchema.getCatalogName()), Optional.ofNullable(tableSchema.getSchemaName()), tableSchema.getTableName(), map.map((v1) -> {
            return r1.cast(v1);
        }).toList());
    }

    private String enquoteIdentifier(String str, String str2, boolean z) {
        return (str == null || !z) ? str : str2 + str + str2;
    }
}
