package org.apache.nifi.processors.gcp.bigquery;

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.Status;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;

@CapabilityDescription("Writes the contents of a FlowFile to a Google BigQuery table. The processor is record based so the schema that is used is driven by the RecordReader. Attributes that are not matched to the target schema are skipped. Exactly once delivery semantics are achieved via stream offsets.")
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "bq", "bigquery"})
@WritesAttributes({@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)})
/* loaded from: input_file:org/apache/nifi/processors/gcp/bigquery/PutBigQuery.class */
public class PutBigQuery extends AbstractBigQueryProcessor {
    private final AtomicReference<Exception> error = new AtomicReference<>();
    private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
    private final Phaser inflightRequestCount = new Phaser(1);
    private BigQueryWriteClient writeClient = null;
    private StreamWriter streamWriter = null;
    private String transferType;
    private String endpoint;
    private int maxRetryCount;
    private int recordBatchCount;
    static final String STREAM = "STREAM";
    static final AllowableValue STREAM_TYPE = new AllowableValue(STREAM, STREAM, "Use streaming record handling strategy");
    static final String BATCH = "BATCH";
    static final AllowableValue BATCH_TYPE = new AllowableValue(BATCH, BATCH, "Use batching record handling strategy");
    private static final List<Status.Code> RETRYABLE_ERROR_CODES = Arrays.asList(Status.Code.INTERNAL, Status.Code.ABORTED, Status.Code.CANCELLED);
    public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID).required(true).build();
    public static final PropertyDescriptor BIGQUERY_API_ENDPOINT = new PropertyDescriptor.Builder().name("bigquery-api-endpoint").displayName("BigQuery API Endpoint").description("Can be used to override the default BigQuery endpoint. Default is " + BigQueryWriteStubSettings.getDefaultEndpoint() + ". Format must be hostname:port.").addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).required(true).defaultValue(BigQueryWriteStubSettings.getDefaultEndpoint()).build();
    private static final String TRANSFER_TYPE_NAME = "bq.transfer.type";
    private static final String TRANSFER_TYPE_DESC = "Defines the preferred transfer type streaming or batching";
    static final PropertyDescriptor TRANSFER_TYPE = new PropertyDescriptor.Builder().name(TRANSFER_TYPE_NAME).displayName("Transfer Type").description(TRANSFER_TYPE_DESC).required(true).defaultValue(STREAM_TYPE.getValue()).allowableValues(new DescribedValue[]{STREAM_TYPE, BATCH_TYPE}).build();
    private static final String APPEND_RECORD_COUNT_NAME = "bq.append.record.count";
    private static final String APPEND_RECORD_COUNT_DESC = "The number of records to be appended to the write stream at once. Applicable for both batch and stream types";
    static final PropertyDescriptor APPEND_RECORD_COUNT = new PropertyDescriptor.Builder().name(APPEND_RECORD_COUNT_NAME).displayName("Append Record Count").description(APPEND_RECORD_COUNT_DESC).required(true).defaultValue("20").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name(BigQueryAttributes.RECORD_READER_ATTR).displayName("Record Reader").description(BigQueryAttributes.RECORD_READER_DESC).identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder().name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR).displayName("Skip Invalid Rows").description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC).required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("false").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = List.of((Object[]) new PropertyDescriptor[]{GCP_CREDENTIALS_PROVIDER_SERVICE, PROJECT_ID, BIGQUERY_API_ENDPOINT, DATASET, TABLE_NAME, RECORD_READER, TRANSFER_TYPE, APPEND_RECORD_COUNT, RETRY_COUNT, SKIP_INVALID_ROWS, PROXY_CONFIGURATION_SERVICE});

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/nifi/processors/gcp/bigquery/PutBigQuery$AppendCompleteCallback.class */
    public class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
        private final AppendContext appendContext;

        public AppendCompleteCallback(AppendContext appendContext) {
            this.appendContext = appendContext;
        }

        public void onSuccess(AppendRowsResponse appendRowsResponse) {
            PutBigQuery.this.getLogger().info("Append success with offset: {}", new Object[]{Long.valueOf(this.appendContext.getOffset())});
            PutBigQuery.this.appendSuccessCount.incrementAndGet();
            PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
        }

        public void onFailure(Throwable th) {
            Status fromThrowable = Status.fromThrowable(th);
            if (this.appendContext.getRetryCount() < PutBigQuery.this.maxRetryCount && PutBigQuery.RETRYABLE_ERROR_CODES.contains(fromThrowable.getCode())) {
                this.appendContext.incrementRetryCount();
                try {
                    PutBigQuery.this.append(this.appendContext);
                    PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
                    return;
                } catch (Exception e) {
                    PutBigQuery.this.getLogger().error("Failed to retry append", e);
                }
            }
            AtomicReference<Exception> atomicReference = PutBigQuery.this.error;
            Optional ofNullable = Optional.ofNullable(Exceptions.toStorageException(th));
            Class<RuntimeException> cls = RuntimeException.class;
            Objects.requireNonNull(RuntimeException.class);
            atomicReference.compareAndSet(null, (Exception) ofNullable.map((v1) -> {
                return r3.cast(v1);
            }).orElse(new RuntimeException(th)));
            PutBigQuery.this.getLogger().error("Failure during appending data", th);
            PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/gcp/bigquery/PutBigQuery$AppendContext.class */
    public static class AppendContext {
        private final ProtoRows data;
        private final long offset;
        private int retryCount = 0;

        AppendContext(ProtoRows protoRows, long j) {
            this.data = protoRows;
            this.offset = j;
        }

        public ProtoRows getData() {
            return this.data;
        }

        public int getRetryCount() {
            return this.retryCount;
        }

        public void incrementRetryCount() {
            this.retryCount++;
        }

        public long getOffset() {
            return this.offset;
        }
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
        this.transferType = processContext.getProperty(TRANSFER_TYPE).getValue();
        this.maxRetryCount = processContext.getProperty(RETRY_COUNT).asInteger().intValue();
        this.recordBatchCount = processContext.getProperty(APPEND_RECORD_COUNT).asInteger().intValue();
        this.endpoint = processContext.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue();
        this.writeClient = createWriteClient(getGoogleCredentials(processContext), ProxyConfiguration.getConfiguration(processContext));
    }

    @OnUnscheduled
    public void onUnscheduled() {
        this.writeClient.shutdown();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        TableName of = TableName.of(processContext.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(), processContext.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(), processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue());
        try {
            WriteStream createWriteStream = createWriteStream(of);
            TableSchema tableSchema = createWriteStream.getTableSchema();
            Descriptors.Descriptor convertBQTableSchemaToProtoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
            this.streamWriter = createStreamWriter(createWriteStream.getName(), convertBQTableSchemaToProtoDescriptor, getGoogleCredentials(processContext), ProxyConfiguration.getConfiguration(processContext));
            boolean booleanValue = processContext.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean().booleanValue();
            RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            try {
                try {
                    InputStream read = processSession.read(flowFile);
                    try {
                        RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                        try {
                            int writeRecordsToStream = writeRecordsToStream(createRecordReader, convertBQTableSchemaToProtoDescriptor, booleanValue, tableSchema);
                            if (createRecordReader != null) {
                                createRecordReader.close();
                            }
                            if (read != null) {
                                read.close();
                            }
                            finishProcessing(processSession, processSession.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(writeRecordsToStream)), this.streamWriter, createWriteStream.getName(), of.toString());
                        } catch (Throwable th) {
                            if (createRecordReader != null) {
                                try {
                                    createRecordReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (read != null) {
                            try {
                                read.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    finishProcessing(processSession, flowFile, this.streamWriter, createWriteStream.getName(), of.toString());
                    throw th5;
                }
            } catch (Exception e) {
                this.error.set(e);
                finishProcessing(processSession, flowFile, this.streamWriter, createWriteStream.getName(), of.toString());
            }
        } catch (Descriptors.DescriptorValidationException | IOException e2) {
            getLogger().error("Failed to create Big Query Stream Writer for writing", e2);
            processContext.yield();
            processSession.rollback();
        }
    }

    private int writeRecordsToStream(RecordReader recordReader, Descriptors.Descriptor descriptor, boolean z, TableSchema tableSchema) throws Exception {
        int i = 0;
        int i2 = 0;
        ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
        while (true) {
            Record nextRecord = recordReader.nextRecord();
            if (nextRecord == null) {
                break;
            }
            DynamicMessage recordToProtoMessage = recordToProtoMessage(nextRecord, descriptor, z, tableSchema);
            if (recordToProtoMessage != null) {
                newBuilder.addSerializedRows(recordToProtoMessage.toByteString());
                i2++;
                if (i2 % this.recordBatchCount == 0) {
                    append(new AppendContext(newBuilder.build(), i));
                    newBuilder = ProtoRows.newBuilder();
                    i = i2;
                }
            }
        }
        if (i2 > i) {
            append(new AppendContext(newBuilder.build(), i));
        }
        return i2;
    }

    private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean z, TableSchema tableSchema) {
        DynamicMessage dynamicMessage = null;
        try {
            dynamicMessage = ProtoUtils.createMessage(descriptor, convertMapRecord(record.toMap()), tableSchema);
        } catch (RuntimeException e) {
            getLogger().error("Cannot convert record to message", e);
            if (!z) {
                throw e;
            }
        }
        return dynamicMessage;
    }

    private void append(AppendContext appendContext) throws Exception {
        if (this.error.get() != null) {
            throw this.error.get();
        }
        ApiFutures.addCallback(this.streamWriter.append(appendContext.getData(), appendContext.getOffset()), new AppendCompleteCallback(appendContext), (v0) -> {
            v0.run();
        });
        this.inflightRequestCount.register();
    }

    private void finishProcessing(ProcessSession processSession, FlowFile flowFile, StreamWriter streamWriter, String str, String str2) {
        this.inflightRequestCount.arriveAndAwaitAdvance();
        streamWriter.close();
        if (this.error.get() != null) {
            getLogger().error("Stream processing failed", this.error.get());
            FlowFile putAttribute = processSession.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(this.appendSuccessCount.get() * this.recordBatchCount));
            processSession.penalize(putAttribute);
            processSession.transfer(putAttribute, REL_FAILURE);
            this.error.set(null);
            return;
        }
        if (isBatch()) {
            this.writeClient.finalizeWriteStream(str);
            BatchCommitWriteStreamsResponse batchCommitWriteStreams = this.writeClient.batchCommitWriteStreams(BatchCommitWriteStreamsRequest.newBuilder().setParent(str2).addWriteStreams(str).build());
            if (!batchCommitWriteStreams.hasCommitTime()) {
                for (StorageError storageError : batchCommitWriteStreams.getStreamErrorsList()) {
                    getLogger().error("Commit Storage Error Code: {} with message {}", new Object[]{storageError.getCode().name(), storageError.getErrorMessage()});
                }
                processSession.penalize(flowFile);
                processSession.transfer(flowFile, REL_FAILURE);
                return;
            }
            getLogger().info("Appended and committed all records successfully.");
        }
        processSession.transfer(flowFile, REL_SUCCESS);
    }

    private WriteStream createWriteStream(TableName tableName) {
        return this.writeClient.createWriteStream(CreateWriteStreamRequest.newBuilder().setParent(tableName.toString()).setWriteStream(WriteStream.newBuilder().setType(isBatch() ? WriteStream.Type.PENDING : WriteStream.Type.COMMITTED).build()).build());
    }

    protected BigQueryWriteClient createWriteClient(GoogleCredentials googleCredentials, ProxyConfiguration proxyConfiguration) {
        try {
            BigQueryWriteSettings.Builder newBuilder = BigQueryWriteSettings.newBuilder();
            newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(googleCredentials));
            newBuilder.setEndpoint(this.endpoint);
            newBuilder.setTransportChannelProvider(createTransportChannelProvider(proxyConfiguration));
            return BigQueryWriteClient.create(newBuilder.build());
        } catch (Exception e) {
            throw new ProcessException("Failed to create Big Query Write Client for writing", e);
        }
    }

    protected StreamWriter createStreamWriter(String str, Descriptors.Descriptor descriptor, GoogleCredentials googleCredentials, ProxyConfiguration proxyConfiguration) throws IOException {
        ProtoSchema convert = ProtoSchemaConverter.convert(descriptor);
        StreamWriter.Builder newBuilder = StreamWriter.newBuilder(str);
        newBuilder.setWriterSchema(convert);
        newBuilder.setCredentialsProvider(FixedCredentialsProvider.create(googleCredentials));
        newBuilder.setEndpoint(this.endpoint);
        newBuilder.setChannelProvider(createTransportChannelProvider(proxyConfiguration));
        return newBuilder.build();
    }

    private TransportChannelProvider createTransportChannelProvider(ProxyConfiguration proxyConfiguration) {
        InstantiatingGrpcChannelProvider.Builder newBuilder = InstantiatingGrpcChannelProvider.newBuilder();
        if (proxyConfiguration != null) {
            if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
                newBuilder.setChannelConfigurator(managedChannelBuilder -> {
                    return managedChannelBuilder.proxyDetector(socketAddress -> {
                        return HttpConnectProxiedSocketAddress.newBuilder().setTargetAddress((InetSocketAddress) socketAddress).setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort().intValue())).setUsername(proxyConfiguration.getProxyUserName()).setPassword(proxyConfiguration.getProxyUserPassword()).build();
                    });
                });
            } else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) {
                getLogger().warn("Proxy type SOCKS is not supported, the proxy configuration will be ignored");
            }
        }
        return newBuilder.build();
    }

    private boolean isBatch() {
        return BATCH_TYPE.getValue().equals(this.transferType);
    }

    private static Map<String, Object> convertMapRecord(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            Object obj = map.get(str);
            String lowerCase = str.toLowerCase();
            if (obj instanceof MapRecord) {
                hashMap.put(lowerCase, convertMapRecord(((MapRecord) obj).toMap()));
            } else if ((obj instanceof Object[]) && ((Object[]) obj).length > 0 && (((Object[]) obj)[0] instanceof MapRecord)) {
                ArrayList arrayList = new ArrayList();
                for (Object obj2 : (Object[]) obj) {
                    arrayList.add(convertMapRecord(((MapRecord) obj2).toMap()));
                }
                hashMap.put(lowerCase, arrayList);
            } else if (obj instanceof Timestamp) {
                hashMap.put(lowerCase, Long.valueOf(((Timestamp) obj).getTime() * 1000));
            } else if (obj instanceof Time) {
                LocalTime localTime = ((Time) obj).toLocalTime();
                hashMap.put(lowerCase, Long.valueOf(CivilTimeEncoder.encodePacked64TimeMicros(org.threeten.bp.LocalTime.of(localTime.getHour(), localTime.getMinute(), localTime.getSecond()))));
            } else if (obj instanceof Date) {
                hashMap.put(lowerCase, Integer.valueOf((int) ((Date) obj).toLocalDate().toEpochDay()));
            } else {
                hashMap.put(lowerCase, obj);
            }
        }
        return hashMap;
    }
}
