package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.RowUtils;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
import org.apache.beam.sdk.schemas.io.SchemaIO;
import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

@Internal
@AutoService({SchemaIOProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.class */
public class PubsubSchemaIOProvider implements SchemaIOProvider {
    public static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE = Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false), Schema.FieldType.STRING);
    public static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA = Schema.builder().addStringField(RowUtils.KEY).addStringField("value").build();
    public static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE = Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider$Config.class */
    public static abstract class Config implements Serializable {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getTimestampAttributeKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getDeadLetterQueue();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getFormat();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getThriftClass();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getThriftProtocolFactoryClass();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getProtoClass();

        boolean useDeadLetterQueue() {
            return getDeadLetterQueue() != null;
        }

        boolean useTimestampAttribute() {
            return getTimestampAttributeKey() != null;
        }

        PayloadSerializer serializer(Schema schema) {
            String format = getFormat() == null ? "json" : getFormat();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            if (getThriftClass() != null) {
                builder.put("thriftClass", getThriftClass());
            }
            if (getThriftProtocolFactoryClass() != null) {
                builder.put("thriftProtocolFactoryClass", getThriftProtocolFactoryClass());
            }
            if (getProtoClass() != null) {
                builder.put("protoClass", getProtoClass());
            }
            return PayloadSerializers.getSerializer(format, schema, builder.build());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider$PubsubSchemaIO.class */
    public static class PubsubSchemaIO implements SchemaIO, Serializable {
        protected final Schema dataSchema;
        protected final String location;
        protected final boolean useFlatSchema;
        protected final Config config;

        private PubsubSchemaIO(String str, Row row, Schema schema) {
            this.dataSchema = schema;
            this.location = str;
            this.useFlatSchema = !shouldUseNestedSchema(schema);
            this.config = (Config) new AutoValueSchema().fromRowFunction(TypeDescriptor.of(Config.class)).apply(row);
        }

        public Schema schema() {
            return this.dataSchema;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean needsSerializer() {
            return this.useFlatSchema || !fieldPresent(schema(), "payload", Schema.FieldType.BYTES);
        }

        public PTransform<PBegin, PCollection<Row>> buildReader() {
            return new PTransform<PBegin, PCollection<Row>>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.PubsubSchemaIO.1
                public PCollection<Row> expand(PBegin pBegin) {
                    PubsubMessageToRow.Builder useFlatSchema = PubsubMessageToRow.builder().messageSchema(PubsubSchemaIO.this.dataSchema).useDlq(PubsubSchemaIO.this.config.useDeadLetterQueue()).useFlatSchema(PubsubSchemaIO.this.useFlatSchema);
                    if (PubsubSchemaIO.this.needsSerializer()) {
                        Config config = PubsubSchemaIO.this.config;
                        Objects.requireNonNull(config);
                        useFlatSchema.serializerProvider(config::serializer);
                    }
                    PCollectionTuple apply = pBegin.apply("ReadFromPubsub", PubsubSchemaIO.this.readMessagesWithAttributes()).apply("PubsubMessageToRow", useFlatSchema.build());
                    apply.get(PubsubMessageToRow.MAIN_TAG).setRowSchema(PubsubSchemaIO.this.dataSchema);
                    if (PubsubSchemaIO.this.config.useDeadLetterQueue()) {
                        apply.get(PubsubMessageToRow.DLQ_TAG).apply(PubsubSchemaIO.this.writeMessagesToDlq());
                    }
                    return apply.get(PubsubMessageToRow.MAIN_TAG);
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case -597985902:
                            if (implMethodName.equals("serializer")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$SerializerProvider") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider$Config") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/Schema;)Lorg/apache/beam/sdk/schemas/io/payloads/PayloadSerializer;")) {
                                Config config = (Config) serializedLambda.getCapturedArg(0);
                                return config::serializer;
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        public PTransform<PCollection<Row>, POutput> buildWriter() {
            final PayloadSerializer serializer = needsSerializer() ? this.config.serializer(PubsubSchemaIOProvider.stripFromTimestampField(this.dataSchema)) : null;
            return new PTransform<PCollection<Row>, POutput>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.PubsubSchemaIO.2
                public POutput expand(PCollection<Row> pCollection) {
                    PCollection apply;
                    PCollection apply2 = pCollection.apply(new AddTimestampAttribute(PubsubSchemaIO.this.config.useTimestampAttribute()));
                    if (PubsubSchemaIO.this.useFlatSchema) {
                        MapElements into = MapElements.into(TypeDescriptor.of(PubsubMessage.class));
                        PayloadSerializer payloadSerializer = serializer;
                        apply = (PCollection) apply2.apply("Transform Flat Schema", into.via(row -> {
                            return new PubsubMessage(payloadSerializer.serialize(row), ImmutableMap.of());
                        }));
                    } else {
                        apply = apply2.apply("Transform Nested Schema", MapElements.via(new NestedRowToMessage(serializer, apply2.getSchema())));
                    }
                    return apply.apply(PubsubSchemaIO.this.createPubsubMessageWrite());
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case 1604110671:
                            if (implMethodName.equals("lambda$expand$954ad462$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider$PubsubSchemaIO$2") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/io/payloads/PayloadSerializer;Lorg/apache/beam/sdk/values/Row;)Lorg/apache/beam/sdk/io/gcp/pubsub/PubsubMessage;")) {
                                PayloadSerializer payloadSerializer = (PayloadSerializer) serializedLambda.getCapturedArg(0);
                                return row -> {
                                    return new PubsubMessage(payloadSerializer.serialize(row), ImmutableMap.of());
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PubsubIO.Read<PubsubMessage> readMessagesWithAttributes() {
            PubsubIO.Read<PubsubMessage> fromTopic = PubsubIO.readMessagesWithAttributes().fromTopic(this.location);
            return this.config.useTimestampAttribute() ? fromTopic.withTimestampAttribute(this.config.getTimestampAttributeKey()) : fromTopic;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PubsubIO.Write<PubsubMessage> createPubsubMessageWrite() {
            PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages().to(this.location);
            if (this.config.useTimestampAttribute()) {
                write = write.withTimestampAttribute(this.config.getTimestampAttributeKey());
            }
            return write;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PubsubIO.Write<PubsubMessage> writeMessagesToDlq() {
            PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages().to(this.config.getDeadLetterQueue());
            return this.config.useTimestampAttribute() ? write.withTimestampAttribute(this.config.getTimestampAttributeKey()) : write;
        }

        private boolean hasValidAttributesField(Schema schema) {
            return fieldPresent(schema, "attributes", PubsubSchemaIOProvider.ATTRIBUTE_MAP_FIELD_TYPE) || fieldPresent(schema, "attributes", PubsubSchemaIOProvider.ATTRIBUTE_ARRAY_FIELD_TYPE);
        }

        private boolean hasValidPayloadField(Schema schema) {
            if (!schema.hasField("payload")) {
                return false;
            }
            if (fieldPresent(schema, "payload", Schema.FieldType.BYTES)) {
                return true;
            }
            return schema.getField("payload").getType().getTypeName().equals(Schema.TypeName.ROW);
        }

        private boolean shouldUseNestedSchema(Schema schema) {
            return hasValidPayloadField(schema) && hasValidAttributesField(schema);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static boolean fieldPresent(Schema schema, String str, Schema.FieldType fieldType) {
            return schema.hasField(str) && fieldType.equivalent(schema.getField(str).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
        }
    }

    public String identifier() {
        return "pubsub";
    }

    public Schema configurationSchema() {
        return Schema.builder().addNullableField("timestampAttributeKey", Schema.FieldType.STRING).addNullableField("deadLetterQueue", Schema.FieldType.STRING).addNullableField("format", Schema.FieldType.STRING).addNullableField("thriftClass", Schema.FieldType.STRING).addNullableField("thriftProtocolFactoryClass", Schema.FieldType.STRING).addNullableField("protoClass", Schema.FieldType.STRING).build();
    }

    /* renamed from: from, reason: merged with bridge method [inline-methods] */
    public PubsubSchemaIO m243from(String str, Row row, Schema schema) {
        validateConfigurationSchema(row);
        validateDlq((String) row.getValue("deadLetterQueue"));
        validateDataSchema(schema);
        return new PubsubSchemaIO(str, row, schema);
    }

    public boolean requiresDataSchema() {
        return true;
    }

    public PCollection.IsBounded isBounded() {
        return PCollection.IsBounded.UNBOUNDED;
    }

    private void validateDataSchema(Schema schema) {
        if (schema == null) {
            throw new InvalidSchemaException("Unsupported schema specified for Pubsub source in CREATE TABLE.CREATE TABLE for Pubsub topic must not be null");
        }
        if (!PubsubSchemaIO.fieldPresent(schema, "event_timestamp", Schema.FieldType.DATETIME)) {
            throw new InvalidSchemaException("Unsupported schema specified for Pubsub source in CREATE TABLE.CREATE TABLE for Pubsub topic must include at least 'event_timestamp' field of type 'TIMESTAMP'");
        }
    }

    private void validateDlq(String str) {
        if (str != null && str.isEmpty()) {
            throw new InvalidConfigurationException("Dead letter queue topic name is not specified");
        }
    }

    private void validateConfigurationSchema(Row row) {
        if (!row.getSchema().equals(configurationSchema())) {
            throw new InvalidConfigurationException("Configuration schema provided does not match expected");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Schema stripFromTimestampField(Schema schema) {
        return Schema.of((Schema.Field[]) ((List) schema.getFields().stream().filter(field -> {
            return !"event_timestamp".equals(field.getName());
        }).collect(Collectors.toList())).toArray(new Schema.Field[0]));
    }
}
