package org.apache.nifi.processors.aws.kinesis.stream;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.kinesis.KinesisProcessorUtils;
import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. In order to send data to Kinesis, the stream name has to be specified.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags({"amazon", "aws", "kinesis", "put", "stream"})
@WritesAttributes({@WritesAttribute(attribute = PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE, description = "Error message on posting message to AWS Kinesis"), @WritesAttribute(attribute = PutKinesisStream.AWS_KINESIS_ERROR_CODE, description = "Error code for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")})
@SeeAlso({ConsumeKinesisStream.class})
/* loaded from: input_file:org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.class */
public class PutKinesisStream extends AbstractAwsSyncProcessor<KinesisClient, KinesisClientBuilder> {
    public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message";
    public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code";
    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder().displayName("Amazon Kinesis Stream Partition Key").name("amazon-kinesis-stream-partition-key").description("The partition key attribute.  If it is not set, a random value is used").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${kinesis.partition.key}").required(false).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().displayName("Message Batch Size").name("message-batch-size").description("Batch size for messages (1-500).").defaultValue("250").required(false).addValidator(StandardValidators.createLongValidator(1, 500, true)).sensitive(false).build();
    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder().name("max-message-buffer-size").displayName("Max message buffer size (MB)").description("Max message buffer size in Mega-bytes").defaultValue("1 MB").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).sensitive(false).build();
    static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder().name("kinesis-stream-name").displayName("Amazon Kinesis Stream Name").description("The name of Kinesis Stream").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(KINESIS_STREAM_NAME, REGION, AWS_CREDENTIALS_PROVIDER_SERVICE, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, TIMEOUT, PROXY_CONFIGURATION_SERVICE, ENDPOINT_OVERRIDE);
    protected Random randomPartitionKeyGenerator = new Random();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        List<FlowFile> filterMessagesByMaxSize = KinesisProcessorUtils.filterMessagesByMaxSize(processSession, processContext.getProperty(BATCH_SIZE).asInteger().intValue(), processContext.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(), AWS_KINESIS_ERROR_MESSAGE, getLogger());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        KinesisClient client = getClient(processContext);
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (FlowFile flowFile : filterMessagesByMaxSize) {
                String value = processContext.getProperty(KINESIS_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                processSession.exportTo(flowFile, byteArrayOutputStream);
                PutRecordsRequestEntry.Builder data = PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray(byteArrayOutputStream.toByteArray()));
                String value2 = processContext.getProperty(KINESIS_PARTITION_KEY).evaluateAttributeExpressions(flowFile).getValue();
                data.partitionKey(StringUtils.isBlank(value2) ? Integer.toString(this.randomPartitionKeyGenerator.nextInt()) : value2);
                ((List) hashMap.computeIfAbsent(value, str -> {
                    return new ArrayList();
                })).add(flowFile);
                ((List) hashMap2.computeIfAbsent(value, str2 -> {
                    return new ArrayList();
                })).add((PutRecordsRequestEntry) data.build());
            }
            for (Map.Entry entry : hashMap2.entrySet()) {
                String str3 = (String) entry.getKey();
                List list = (List) entry.getValue();
                if (!list.isEmpty()) {
                    List records = client.putRecords((PutRecordsRequest) PutRecordsRequest.builder().streamName(str3).records(list).build()).records();
                    for (int i = 0; i < records.size(); i++) {
                        PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) records.get(i);
                        FlowFile flowFile2 = (FlowFile) ((List) hashMap.get(str3)).get(i);
                        HashMap hashMap3 = new HashMap();
                        hashMap3.put("aws.kinesis.shard.id", putRecordsResultEntry.shardId());
                        hashMap3.put("aws.kinesis.sequence.number", putRecordsResultEntry.sequenceNumber());
                        if (StringUtils.isNotBlank(putRecordsResultEntry.errorCode())) {
                            hashMap3.put(AWS_KINESIS_ERROR_CODE, putRecordsResultEntry.errorCode());
                            hashMap3.put(AWS_KINESIS_ERROR_MESSAGE, putRecordsResultEntry.errorMessage());
                            arrayList.add(processSession.putAllAttributes(flowFile2, hashMap3));
                        } else {
                            arrayList2.add(processSession.putAllAttributes(flowFile2, hashMap3));
                        }
                    }
                }
                ((List) hashMap2.get(str3)).clear();
                list.clear();
            }
            if (!arrayList.isEmpty()) {
                processSession.transfer(arrayList, REL_FAILURE);
                getLogger().error("Failed to publish to kinesis records {}", new Object[]{arrayList});
            }
            if (!arrayList2.isEmpty()) {
                processSession.transfer(arrayList2, REL_SUCCESS);
                getLogger().debug("Successfully published to kinesis records {}", new Object[]{arrayList2});
            }
        } catch (Exception e) {
            getLogger().error("Failed to publish due to exception {} flowfiles {} ", new Object[]{e, filterMessagesByMaxSize});
            processSession.transfer(filterMessagesByMaxSize, REL_FAILURE);
            processContext.yield();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createClientBuilder, reason: merged with bridge method [inline-methods] */
    public KinesisClientBuilder m15createClientBuilder(ProcessContext processContext) {
        return KinesisClient.builder();
    }
}
