package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;

@CapabilityDescription("Counts the number of Records in a record set, optionally counting the number of elements per category, where the categories are defined by user-defined properties.")
@DynamicProperty(name = "The name of the category. For example, sport", value = "The RecordPath that points to the value of the category. For example /sport", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Specifies a category that should be counted. For example, if the property name is 'sport' and the value is '/sport', the processor will count how many records have a value of 'soccer' for the /sport field, how many have a value of 'baseball' for the /sport, and so on. These counts be added as attributes named recordStats.sport.soccer, recordStats.sport.baseball.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "stats", "metrics"})
@WritesAttributes({@WritesAttribute(attribute = "record.count", description = "A count of the records in the record set in the FlowFile."), @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.count", description = "A count of the records that contain a value for the user defined property."), @WritesAttribute(attribute = "recordStats.<User Defined Property Name>.<value>.count", description = "Each value discovered for the user defined property will have its own count attribute. Total number of top N value counts to be added is defined by the limit configuration.")})
/* loaded from: input_file:org/apache/nifi/processors/standard/CalculateRecordStats.class */
public class CalculateRecordStats extends AbstractProcessor {
    static final String RECORD_COUNT_ATTR = "record.count";
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-stats-reader").displayName("Record Reader").description("A record reader to use for reading the records.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder().name("record-stats-limit").description("Limit the number of individual stats that are returned for each record path to the top N results.").required(true).defaultValue("10").addValidator(StandardValidators.INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RECORD_READER, LIMIT);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are successfully processed, are routed to this Relationship.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be processed for any reason, it is routed to this Relationship.").build();
    static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private RecordPathCache cache;

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).displayName(str).dynamic(true).addValidator(new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext processContext) {
        this.cache = new RecordPathCache(25);
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        try {
            flowFile = processSession.putAllAttributes(flowFile, calculateStats(flowFile, getRecordPaths(processContext, flowFile), processContext, processSession));
            processSession.transfer(flowFile, REL_SUCCESS);
        } catch (Exception e) {
            getLogger().error("Failed to process stats for {}", new Object[]{flowFile, e});
            processSession.transfer(flowFile, REL_FAILURE);
        }
    }

    protected Map<String, RecordPath> getRecordPaths(ProcessContext processContext, FlowFile flowFile) {
        return (Map) processContext.getProperties().keySet().stream().filter((v0) -> {
            return v0.isDynamic();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, propertyDescriptor -> {
            return this.cache.getCompiled(processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue());
        }));
    }

    protected Map<String, String> calculateStats(FlowFile flowFile, Map<String, RecordPath> map, ProcessContext processContext, ProcessSession processSession) throws IOException, SchemaNotFoundException, MalformedRecordException {
        InputStream read = processSession.read(flowFile);
        try {
            RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            int intValue = processContext.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger().intValue();
            RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
            Map<String, Integer> hashMap = new HashMap<>();
            int i = 0;
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            while (true) {
                Record nextRecord = createRecordReader.nextRecord();
                if (nextRecord == null) {
                    break;
                }
                for (Map.Entry<String, RecordPath> entry : map.entrySet()) {
                    entry.getValue().evaluate(nextRecord).getSelectedFields().forEach(fieldValue -> {
                        Object value = fieldValue.getValue();
                        String obj = value == null ? "<null>" : value.toString();
                        String str = "recordStats." + ((String) entry.getKey());
                        String str2 = str + "." + obj;
                        int intValue2 = ((Integer) hashMap.getOrDefault(str2, 0)).intValue();
                        int intValue3 = ((Integer) hashMap.getOrDefault(str, 0)).intValue();
                        hashMap.put(str2, Integer.valueOf(intValue2 + 1));
                        if (value != null) {
                            hashMap.put(str, Integer.valueOf(intValue3 + 1));
                        }
                        linkedHashSet.add(str);
                    });
                }
                i++;
            }
            Map<String, Integer> filterBySize = filterBySize(hashMap, intValue, linkedHashSet);
            filterBySize.put("record.count", Integer.valueOf(i));
            Map<String, String> map2 = (Map) filterBySize.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry2 -> {
                return ((Integer) entry2.getValue()).toString();
            }));
            if (read != null) {
                read.close();
            }
            return map2;
        } catch (Throwable th) {
            if (read != null) {
                try {
                    read.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected Map<String, Integer> filterBySize(Map<String, Integer> map, int i, Collection<String> collection) {
        if (map.size() <= i) {
            return map;
        }
        ArrayList arrayList = new ArrayList(((Map) map.entrySet().stream().filter(entry -> {
            return !collection.contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).entrySet());
        arrayList.sort(Map.Entry.comparingByValue());
        List<Map.Entry> subList = arrayList.reversed().subList(0, i);
        HashMap hashMap = new HashMap();
        map.forEach((str, num) -> {
            if (collection.contains(str)) {
                hashMap.put(str, num);
            }
        });
        for (Map.Entry entry2 : subList) {
            hashMap.put((String) entry2.getKey(), (Integer) entry2.getValue());
        }
        return hashMap;
    }
}
