package org.apache.nifi.processors.aws.kinesis.firehose;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.KinesisProcessorUtils;
import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.FirehoseClientBuilder;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
import software.amazon.awssdk.services.firehose.model.Record;

@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. In order to send data to firehose, the firehose delivery stream name has to be specified.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
@WritesAttributes({@WritesAttribute(attribute = PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, description = "Error message on posting message to AWS Kinesis Firehose"), @WritesAttribute(attribute = PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_CODE, description = "Error code for the message when posting to AWS Kinesis Firehose"), @WritesAttribute(attribute = PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID, description = "Record id of the message posted to Kinesis Firehose")})
/* loaded from: input_file:org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.class */
public class PutKinesisFirehose extends AbstractAwsSyncProcessor<FirehoseClient, FirehoseClientBuilder> {
    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder().name("Amazon Kinesis Firehose Delivery Stream Name").description("The name of kinesis firehose delivery stream").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("Batch size for messages (1-500).").defaultValue("250").required(false).addValidator(StandardValidators.createLongValidator(1, 500, true)).sensitive(false).build();
    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder().name("Max message buffer size").description("Max message buffer").defaultValue("1 MB").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).sensitive(false).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, REGION, AWS_CREDENTIALS_PROVIDER_SERVICE, MAX_MESSAGE_BUFFER_SIZE_MB, TIMEOUT, PROXY_CONFIGURATION_SERVICE, ENDPOINT_OVERRIDE);
    public static final int MAX_MESSAGE_SIZE = 1024000;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createClientBuilder, reason: merged with bridge method [inline-methods] */
    public FirehoseClientBuilder m10createClientBuilder(ProcessContext processContext) {
        return FirehoseClient.builder();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        List<FlowFile> filterMessagesByMaxSize = KinesisProcessorUtils.filterMessagesByMaxSize(processSession, processContext.getProperty(BATCH_SIZE).asInteger().intValue(), processContext.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(), AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, getLogger());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        FirehoseClient client = getClient(processContext);
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (FlowFile flowFile : filterMessagesByMaxSize) {
                String value = processContext.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
                hashMap2.computeIfAbsent(value, str -> {
                    return new ArrayList();
                });
                processSession.read(flowFile, inputStream -> {
                    ((List) hashMap2.get(value)).add((Record) Record.builder().data(SdkBytes.fromInputStream(inputStream)).build());
                });
                ((List) hashMap.computeIfAbsent(value, str2 -> {
                    return new ArrayList();
                })).add(flowFile);
            }
            for (Map.Entry entry : hashMap2.entrySet()) {
                String str3 = (String) entry.getKey();
                List list = (List) entry.getValue();
                if (list.size() > 0) {
                    List requestResponses = client.putRecordBatch((PutRecordBatchRequest) PutRecordBatchRequest.builder().deliveryStreamName(str3).records(list).build()).requestResponses();
                    for (int i = 0; i < requestResponses.size(); i++) {
                        PutRecordBatchResponseEntry putRecordBatchResponseEntry = (PutRecordBatchResponseEntry) requestResponses.get(i);
                        FlowFile flowFile2 = (FlowFile) ((List) hashMap.get(str3)).get(i);
                        HashMap hashMap3 = new HashMap();
                        hashMap3.put(AWS_KINESIS_FIREHOSE_RECORD_ID, putRecordBatchResponseEntry.recordId());
                        FlowFile putAttribute = processSession.putAttribute(flowFile2, AWS_KINESIS_FIREHOSE_RECORD_ID, putRecordBatchResponseEntry.recordId());
                        if (StringUtils.isNotBlank(putRecordBatchResponseEntry.errorCode())) {
                            hashMap3.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, putRecordBatchResponseEntry.errorCode());
                            hashMap3.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, putRecordBatchResponseEntry.errorMessage());
                            arrayList.add(processSession.putAllAttributes(putAttribute, hashMap3));
                        } else {
                            arrayList2.add(processSession.putAllAttributes(putAttribute, hashMap3));
                        }
                    }
                    ((List) hashMap2.get(str3)).clear();
                    list.clear();
                }
            }
            if (arrayList.size() > 0) {
                processSession.transfer(arrayList, REL_FAILURE);
                getLogger().error("Failed to publish to kinesis firehose {}", new Object[]{arrayList});
            }
            if (arrayList2.size() > 0) {
                processSession.transfer(arrayList2, REL_SUCCESS);
                getLogger().info("Successfully published to kinesis firehose {}", new Object[]{arrayList2});
            }
        } catch (Exception e) {
            getLogger().error("Failed to publish to kinesis firehose {} with exception {}", new Object[]{filterMessagesByMaxSize, e});
            processSession.transfer(filterMessagesByMaxSize, REL_FAILURE);
            processContext.yield();
        }
    }
}
