package org.apache.nifi.processors.jolt;

import com.bazaarvoice.jolt.ContextualTransform;
import com.bazaarvoice.jolt.JoltTransform;
import com.bazaarvoice.jolt.Transform;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription("Applies a JOLT specification to each record in the FlowFile payload. A new FlowFile is created with transformed content and is routed to the 'success' relationship. If the transform fails, the original FlowFile is routed to the 'failure' relationship.")
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate")})
@RequiresInstanceClassLoading
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/jolt/JoltTransformRecord.class */
public class JoltTransformRecord extends AbstractJoltTransform {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("jolt-record-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("jolt-record-record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship").build();
    static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Stream.concat(getCommonPropertyDescriptors().stream(), Stream.of((Object[]) new PropertyDescriptor[]{RECORD_READER, RECORD_WRITER})).toList();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL);

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        OutputStream write;
        RecordSetWriter createWriter;
        FlowFile putAllAttributes;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        StopWatch stopWatch = new StopWatch(true);
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory asControllerService2 = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        try {
            InputStream read = processSession.read(flowFile);
            try {
                RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                try {
                    RecordSchema schema = asControllerService2.getSchema(flowFile.getAttributes(), createRecordReader.getSchema());
                    HashMap hashMap = new HashMap();
                    FlowFile create = processSession.create(flowFile);
                    Record nextRecord = createRecordReader.nextRecord();
                    if (nextRecord == null) {
                        write = processSession.write(create);
                        try {
                            createWriter = asControllerService2.createWriter(getLogger(), schema, write, create);
                            try {
                                createWriter.beginRecordSet();
                                WriteResult finishRecordSet = createWriter.finishRecordSet();
                                hashMap.put("record.count", String.valueOf(finishRecordSet.getRecordCount()));
                                hashMap.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                                hashMap.putAll(finishRecordSet.getAttributes());
                                if (createWriter != null) {
                                    createWriter.close();
                                }
                                if (write != null) {
                                    write.close();
                                }
                                putAllAttributes = processSession.putAllAttributes(create, hashMap);
                                logger.info("{} had no Records to transform", new Object[]{flowFile});
                            } catch (Throwable th) {
                                throw th;
                            }
                        } finally {
                        }
                    } else {
                        JoltTransform transform = getTransform(processContext, flowFile);
                        List<Record> transform2 = transform(nextRecord, transform);
                        if (transform2.isEmpty()) {
                            throw new ProcessException("Error transforming the first record");
                        }
                        Record record = (Record) transform2.getFirst();
                        if (record == null) {
                            throw new ProcessException("Error transforming the first record");
                        }
                        RecordSchema schema2 = asControllerService2.getSchema(flowFile.getAttributes(), record.getSchema());
                        write = processSession.write(create);
                        try {
                            createWriter = asControllerService2.createWriter(getLogger(), schema2, write, create);
                            try {
                                createWriter.beginRecordSet();
                                createWriter.write(record);
                                for (int i = 1; i < transform2.size(); i++) {
                                    Record record2 = transform2.get(i);
                                    if (record2 == null) {
                                        throw new ProcessException("Error transforming the first record");
                                    }
                                    createWriter.write(record2);
                                }
                                while (true) {
                                    Record nextRecord2 = createRecordReader.nextRecord();
                                    if (nextRecord2 == null) {
                                        break;
                                    }
                                    Iterator<Record> it = transform(nextRecord2, transform).iterator();
                                    while (it.hasNext()) {
                                        createWriter.write(it.next());
                                    }
                                }
                                WriteResult finishRecordSet2 = createWriter.finishRecordSet();
                                try {
                                    createWriter.close();
                                } catch (IOException e) {
                                    getLogger().warn("Failed to close Writer for {}", new Object[]{create});
                                }
                                hashMap.put("record.count", String.valueOf(finishRecordSet2.getRecordCount()));
                                hashMap.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                                hashMap.putAll(finishRecordSet2.getAttributes());
                                if (createWriter != null) {
                                    createWriter.close();
                                }
                                if (write != null) {
                                    write.close();
                                }
                                String value = processContext.getProperty(JOLT_TRANSFORM).getValue();
                                putAllAttributes = processSession.putAllAttributes(create, hashMap);
                                processSession.getProvenanceReporter().modifyContent(putAllAttributes, "Modified With " + value, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                                logger.debug("Transform completed {}", new Object[]{flowFile});
                            } finally {
                                if (createWriter != null) {
                                    try {
                                        createWriter.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            }
                        } finally {
                        }
                    }
                    if (createRecordReader != null) {
                        createRecordReader.close();
                    }
                    if (read != null) {
                        read.close();
                    }
                    if (putAllAttributes != null) {
                        processSession.transfer(putAllAttributes, REL_SUCCESS);
                    }
                    processSession.transfer(flowFile, REL_ORIGINAL);
                } catch (Throwable th3) {
                    if (createRecordReader != null) {
                        try {
                            createRecordReader.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e2) {
            logger.error("Transform failed for {}", new Object[]{flowFile, e2});
            processSession.transfer(flowFile, REL_FAILURE);
            if (0 != 0) {
                processSession.remove((FlowFile) null);
            }
        }
    }

    private List<Record> transform(Record record, JoltTransform joltTransform) {
        Object normalizeRecordObjects = normalizeRecordObjects(transform(joltTransform, (Map) normalizeJoltObjects((Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())))));
        ArrayList arrayList = new ArrayList();
        if (normalizeRecordObjects == null) {
            return arrayList;
        }
        if (normalizeRecordObjects instanceof Object[]) {
            for (Object obj : (Object[]) normalizeRecordObjects) {
                if (obj != null) {
                    arrayList.add(DataTypeUtils.toRecord(obj, "r"));
                }
            }
        } else {
            arrayList.add(DataTypeUtils.toRecord(normalizeRecordObjects, "r"));
        }
        return arrayList;
    }

    protected static Object transform(JoltTransform joltTransform, Object obj) {
        return joltTransform instanceof ContextualTransform ? ((ContextualTransform) joltTransform).transform(obj, Collections.emptyMap()) : ((Transform) joltTransform).transform(obj);
    }

    protected static Object normalizeJoltObjects(Object obj) {
        if (!(obj instanceof Map)) {
            return obj instanceof Object[] ? Arrays.stream((Object[]) obj).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()) : obj instanceof Collection ? ((Collection) obj).stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()) : obj;
        }
        Map map = (Map) obj;
        map.forEach((str, obj2) -> {
            map.put(str, normalizeJoltObjects(obj2));
        });
        return map;
    }

    protected static Object normalizeRecordObjects(Object obj) {
        if (obj instanceof Map) {
            Map map = (Map) obj;
            map.forEach((str, obj2) -> {
                map.put(str, normalizeRecordObjects(obj2));
            });
            return map;
        }
        if (obj instanceof List) {
            List list = (List) obj;
            Object[] objArr = new Object[list.size()];
            for (int i = 0; i < objArr.length; i++) {
                objArr[i] = normalizeRecordObjects(list.get(i));
            }
            return objArr;
        }
        if (!(obj instanceof Collection)) {
            return obj;
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = ((Collection) obj).iterator();
        while (it.hasNext()) {
            arrayList.add(normalizeRecordObjects(it.next()));
        }
        return arrayList;
    }
}
