package org.apache.beam.sdk.io.gcp.bigquery.providers;

import com.google.auto.service.AutoService;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.class */
public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
    private static final String INPUT_ROWS_TAG = "input";
    private static final String FAILED_ROWS_TAG = "FailedRows";
    private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
    protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
    protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
    private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
    private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS.intValue());
    protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
    protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
    protected static final Schema ROW_SCHEMA_MUTATION_INFO = Schema.builder().addStringField(ROW_PROPERTY_MUTATION_TYPE).addStringField(ROW_PROPERTY_MUTATION_SQN).build();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform.class */
    public static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
        private BigQueryServices testBigQueryServices = null;
        private final BigQueryWriteConfiguration configuration;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$ElementCounterFn.class */
        public static class ElementCounterFn<T> extends DoFn<T, T> {
            private Counter bqGenericElementCounter;
            private Long elementsInBundle = 0L;

            ElementCounterFn(String str) {
                this.bqGenericElementCounter = Metrics.counter(BigQueryStorageWriteApiSchemaTransform.class, str);
            }

            @DoFn.ProcessElement
            public void process(DoFn<T, T>.ProcessContext processContext) {
                this.elementsInBundle = Long.valueOf(this.elementsInBundle.longValue() + 1);
                processContext.output(processContext.element());
            }

            @DoFn.FinishBundle
            public void finish(DoFn<T, T>.FinishBundleContext finishBundleContext) {
                this.bqGenericElementCounter.inc(this.elementsInBundle.longValue());
                this.elementsInBundle = 0L;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$FailOnError.class */
        public static class FailOnError extends DoFn<BigQueryStorageApiInsertError, Void> {
            private FailOnError() {
            }

            @DoFn.ProcessElement
            public void process(DoFn<BigQueryStorageApiInsertError, Void>.ProcessContext processContext) {
                throw new RuntimeException(((BigQueryStorageApiInsertError) processContext.element()).getErrorMessage());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$NoOutputDoFn.class */
        public static class NoOutputDoFn<T> extends DoFn<T, Row> {
            private NoOutputDoFn() {
            }

            @DoFn.ProcessElement
            public void process(DoFn<T, Row>.ProcessContext processContext) {
            }
        }

        BigQueryStorageWriteApiSchemaTransform(BigQueryWriteConfiguration bigQueryWriteConfiguration) {
            bigQueryWriteConfiguration.validate();
            this.configuration = bigQueryWriteConfiguration;
        }

        @VisibleForTesting
        public void setBigQueryServices(BigQueryServices bigQueryServices) {
            this.testBigQueryServices = bigQueryServices;
        }

        public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
            PCollection singlePCollection = pCollectionRowTuple.getSinglePCollection();
            BigQueryIO.Write<Row> createStorageWriteApiTransform = createStorageWriteApiTransform(singlePCollection.getSchema());
            if (singlePCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
                Long triggeringFrequencySeconds = this.configuration.getTriggeringFrequencySeconds();
                Boolean autoSharding = this.configuration.getAutoSharding();
                int intValue = this.configuration.getNumStreams() == null ? 0 : this.configuration.getNumStreams().intValue();
                if (!(this.configuration.getUseAtLeastOnceSemantics() != null && this.configuration.getUseAtLeastOnceSemantics().booleanValue())) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withTriggeringFrequency((triggeringFrequencySeconds == null || triggeringFrequencySeconds.longValue() <= 0) ? BigQueryStorageWriteApiSchemaTransformProvider.DEFAULT_TRIGGERING_FREQUENCY : Duration.standardSeconds(triggeringFrequencySeconds.longValue()));
                }
                if (intValue > 0) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withNumStorageWriteApiStreams(intValue);
                } else if (autoSharding == null || autoSharding.booleanValue()) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withAutoSharding();
                }
            }
            Schema schema = singlePCollection.getSchema();
            WriteResult writeResult = (WriteResult) singlePCollection.apply("element-count", ParDo.of(new ElementCounterFn("BigQuery-write-element-counter"))).setRowSchema(schema).apply(createStorageWriteApiTransform);
            PCollection rowSchema = writeResult.getFailedStorageApiInserts().apply("post-write", ParDo.of(new NoOutputDoFn())).setRowSchema(Schema.of(new Schema.Field[0]));
            if (this.configuration.getErrorHandling() == null) {
                writeResult.getFailedStorageApiInserts().apply("Error on failed inserts", ParDo.of(new FailOnError()));
                return PCollectionRowTuple.of("post_write", rowSchema);
            }
            writeResult.getFailedStorageApiInserts().apply("error-count", ParDo.of(new ElementCounterFn("BigQuery-write-error-counter")));
            Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("failed_row", Schema.FieldType.row(schema)), Schema.Field.of("error_message", Schema.FieldType.STRING)});
            return PCollectionRowTuple.of("post_write", rowSchema).and(this.configuration.getErrorHandling().getOutput(), writeResult.getFailedStorageApiInserts().apply("Construct failed rows and errors", MapElements.into(TypeDescriptors.rows()).via(bigQueryStorageApiInsertError -> {
                return Row.withSchema(of).withFieldValue("error_message", bigQueryStorageApiInsertError.getErrorMessage()).withFieldValue("failed_row", BigQueryUtils.toBeamRow(schema, bigQueryStorageApiInsertError.getRow())).build();
            })).setRowSchema(of));
        }

        BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
            BigQueryIO.Write withWriteDisposition = BigQueryIO.write().withMethod((this.configuration.getUseAtLeastOnceSemantics() == null || !this.configuration.getUseAtLeastOnceSemantics().booleanValue()) ? BigQueryIO.Write.Method.STORAGE_WRITE_API : BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
            Schema schema2 = schema;
            boolean z = false;
            if (this.configuration.getTable().equals(BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS)) {
                validateDynamicDestinationsSchema(schema);
                schema2 = schema.getField(PortableBigQueryDestinations.RECORD).getType().getRowSchema();
                z = true;
            }
            if (Boolean.TRUE.equals(this.configuration.getUseCdcWrites())) {
                validateCdcSchema(schema);
                schema2 = schema.getField(PortableBigQueryDestinations.RECORD).getType().getRowSchema();
                z = true;
                withWriteDisposition = withWriteDisposition.withPrimaryKey(this.configuration.getPrimaryKey()).withRowMutationInformationFn(row -> {
                    return RowMutationInformation.of(RowMutationInformation.MutationType.valueOf(row.getRow(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO).getString(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_TYPE)), row.getRow(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO).getString(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_SQN));
                });
            }
            PortableBigQueryDestinations portableBigQueryDestinations = new PortableBigQueryDestinations(schema2, this.configuration);
            BigQueryIO.Write<Row> withFormatFunction = withWriteDisposition.to(portableBigQueryDestinations).withFormatFunction(portableBigQueryDestinations.getFilterFormatFunction(z));
            if (!Strings.isNullOrEmpty(this.configuration.getCreateDisposition())) {
                withFormatFunction = withFormatFunction.withCreateDisposition(BigQueryIO.Write.CreateDisposition.valueOf(this.configuration.getCreateDisposition().toUpperCase()));
            }
            if (!Strings.isNullOrEmpty(this.configuration.getWriteDisposition())) {
                withFormatFunction = withFormatFunction.withWriteDisposition(BigQueryIO.Write.WriteDisposition.valueOf(this.configuration.getWriteDisposition().toUpperCase()));
            }
            if (!Strings.isNullOrEmpty(this.configuration.getKmsKey())) {
                withFormatFunction = withFormatFunction.withKmsKey(this.configuration.getKmsKey());
            }
            if (this.testBigQueryServices != null) {
                withFormatFunction = withFormatFunction.withTestServices(this.testBigQueryServices);
            }
            return withFormatFunction;
        }

        void validateDynamicDestinationsSchema(Schema schema) {
            Preconditions.checkArgument(schema.getFieldNames().containsAll(Arrays.asList(PortableBigQueryDestinations.DESTINATION, PortableBigQueryDestinations.RECORD)), String.format("When writing to dynamic destinations, we expect Row Schema with a \"%s\" string field and a \"%s\" Row field.", PortableBigQueryDestinations.DESTINATION, PortableBigQueryDestinations.RECORD));
        }

        private void validateCdcSchema(Schema schema) {
            Preconditions.checkArgument(schema.getFieldNames().containsAll(Arrays.asList(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO, PortableBigQueryDestinations.RECORD)), "When writing using CDC functionality, we expect Row Schema with a \"row_mutation_info\" Row field and a \"record\" Row field.");
            Schema rowSchema = schema.getField(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();
            Preconditions.checkArgument(rowSchema != null && rowSchema.equals(BigQueryStorageWriteApiSchemaTransformProvider.ROW_SCHEMA_MUTATION_INFO), "When writing using CDC functionality, we expect a \"row_mutation_info\" field of Row type with schema:\n" + BigQueryStorageWriteApiSchemaTransformProvider.ROW_SCHEMA_MUTATION_INFO.toString() + "\nReceived \"" + BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO + "\" field with schema:\n" + rowSchema);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2137105231:
                    if (implMethodName.equals("lambda$createStorageWriteApiTransform$8708a7c5$1")) {
                        z = true;
                        break;
                    }
                    break;
                case -1138352456:
                    if (implMethodName.equals("lambda$expand$328833dc$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/Schema;Lorg/apache/beam/sdk/schemas/Schema;Lorg/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError;)Lorg/apache/beam/sdk/values/Row;")) {
                        Schema schema = (Schema) serializedLambda.getCapturedArg(0);
                        Schema schema2 = (Schema) serializedLambda.getCapturedArg(1);
                        return bigQueryStorageApiInsertError -> {
                            return Row.withSchema(schema).withFieldValue("error_message", bigQueryStorageApiInsertError.getErrorMessage()).withFieldValue("failed_row", BigQueryUtils.toBeamRow(schema2, bigQueryStorageApiInsertError.getRow())).build();
                        };
                    }
                    break;
                case MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION /* 1 */:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Lorg/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation;")) {
                        return row -> {
                            return RowMutationInformation.of(RowMutationInformation.MutationType.valueOf(row.getRow(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO).getString(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_TYPE)), row.getRow(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO).getString(BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_SQN));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchemaTransform from(BigQueryWriteConfiguration bigQueryWriteConfiguration) {
        return new BigQueryStorageWriteApiSchemaTransform(bigQueryWriteConfiguration);
    }

    public String identifier() {
        return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2";
    }

    public String description() {
        return String.format("Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api).\n\nThis expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [%s] and contains the failed rows. The second DLQ has tag [%s] and contains failed rows and along with their respective errors.", FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG);
    }

    public List<String> inputCollectionNames() {
        return Collections.singletonList(INPUT_ROWS_TAG);
    }

    public List<String> outputCollectionNames() {
        return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, "errors");
    }
}
