package org.apache.nifi.processors.graph;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;

@CapabilityDescription("This uses FlowFile records as input to perform graph mutations. Each record is associated with an individual query/mutation, and a FlowFile will be output for each successful operation. Failed records will be sent as a single FlowFile to the failure relationship.")
@DynamicProperty(name = "A dynamic property to be used as a parameter in the graph script", value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "Uses a record path to set a variable as a parameter in the graph script")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"graph", "gremlin", "cypher"})
@WritesAttributes({@WritesAttribute(attribute = ExecuteGraphQueryRecord.GRAPH_OPERATION_TIME, description = "The amount of time it took to execute all of the graph operations."), @WritesAttribute(attribute = ExecuteGraphQueryRecord.RECORD_COUNT, description = "The number of records unsuccessfully processed (written on FlowFiles routed to the 'failure' relationship.")})
/* loaded from: input_file:org/apache/nifi/processors/graph/ExecuteGraphQueryRecord.class */
public class ExecuteGraphQueryRecord extends AbstractGraphExecutor {
    public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder().name("client-service").displayName("Client Service").description("The graph client service for connecting to a graph database.").identifiesControllerService(GraphClientService.class).addValidator(Validator.VALID).required(true).build();
    public static final PropertyDescriptor READER_SERVICE = new PropertyDescriptor.Builder().name("reader-service").displayName("Record Reader").description("The record reader to use with this processor.").identifiesControllerService(RecordReaderFactory.class).required(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor WRITER_SERVICE = new PropertyDescriptor.Builder().name("writer-service").displayName("Failed Record Writer").description("The record writer to use for writing failed records.").identifiesControllerService(RecordSetWriterFactory.class).required(true).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor SUBMISSION_SCRIPT = new PropertyDescriptor.Builder().name("record-script").displayName("Graph Record Script").description("Script to perform the business logic on graph, using flow file attributes and custom properties as variable-value pairs in its logic.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final Relationship SUCCESS = new Relationship.Builder().name("original").description("Original flow files that successfully interacted with graph server.").build();
    public static final Relationship FAILURE = new Relationship.Builder().name("failure").description("Flow files that fail to interact with graph server.").build();
    public static final Relationship GRAPH = new Relationship.Builder().name("response").description("The response object from the graph server.").autoTerminateDefault(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(CLIENT_SERVICE, READER_SERVICE, WRITER_SERVICE, SUBMISSION_SCRIPT);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(SUCCESS, FAILURE, GRAPH);
    public static final String RECORD_COUNT = "record.count";
    public static final String GRAPH_OPERATION_TIME = "graph.operations.took";
    private volatile RecordPathCache recordPathCache;
    private GraphClientService clientService;
    private RecordReaderFactory recordReaderFactory;
    private RecordSetWriterFactory recordSetWriterFactory;
    private final ObjectMapper mapper = new ObjectMapper();

    @Override // org.apache.nifi.processors.graph.AbstractGraphExecutor
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String str) {
        return new PropertyDescriptor.Builder().name(str).required(false).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @Override // org.apache.nifi.processors.graph.AbstractGraphExecutor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.clientService = processContext.getProperty(CLIENT_SERVICE).asControllerService(GraphClientService.class);
        this.recordReaderFactory = processContext.getProperty(READER_SERVICE).asControllerService(RecordReaderFactory.class);
        this.recordSetWriterFactory = processContext.getProperty(WRITER_SERVICE).asControllerService(RecordSetWriterFactory.class);
        this.recordPathCache = new RecordPathCache(100);
    }

    private Object getRecordValue(Record record, RecordPath recordPath) {
        List list = (List) recordPath.evaluate(record).getSelectedFields().collect(Collectors.toList());
        if (list == null || list.isEmpty()) {
            return null;
        }
        if (list.size() != 1) {
            return list.stream().map(fieldValue -> {
                return fieldValue.getValue();
            }).collect(Collectors.toList());
        }
        Object value = ((FieldValue) list.get(0)).getValue();
        if (value != null && value.getClass().isArray()) {
            value = Arrays.asList((Object[]) value);
        }
        return value;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        String value = processContext.getProperty(SUBMISSION_SCRIPT).evaluateAttributeExpressions(flowFile).getValue();
        HashMap hashMap = new HashMap();
        processContext.getProperties().keySet().stream().filter((v0) -> {
            return v0.isDynamic();
        }).forEach(propertyDescriptor -> {
            hashMap.put(propertyDescriptor.getName(), this.recordPathCache.getCompiled(processContext.getProperty(propertyDescriptor.getName()).evaluateAttributeExpressions(flowFile).getValue()));
        });
        FlowFile create = processSession.create(flowFile);
        try {
            InputStream read = processSession.read(flowFile);
            try {
                RecordReader createRecordReader = this.recordReaderFactory.createRecordReader(flowFile, read, getLogger());
                try {
                    OutputStream write = processSession.write(create);
                    try {
                        RecordSetWriter createWriter = this.recordSetWriterFactory.createWriter(getLogger(), createRecordReader.getSchema(), write, flowFile.getAttributes());
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            createWriter.beginRecordSet();
                            int i = 0;
                            while (true) {
                                Record nextRecord = createRecordReader.nextRecord();
                                if (nextRecord == null) {
                                    break;
                                }
                                FlowFile create2 = processSession.create(flowFile);
                                try {
                                    try {
                                        HashMap hashMap2 = new HashMap();
                                        for (String str : hashMap.keySet()) {
                                            if (!hashMap2.containsKey(str)) {
                                                hashMap2.put(str, getRecordValue(nextRecord, (RecordPath) hashMap.get(str)));
                                            }
                                        }
                                        hashMap2.putAll(flowFile.getAttributes());
                                        if (getLogger().isDebugEnabled()) {
                                            getLogger().debug("Dynamic Properties: {}", new Object[]{hashMap2});
                                        }
                                        ArrayList arrayList = new ArrayList(executeQuery(value, hashMap2));
                                        OutputStream write2 = processSession.write(create2);
                                        write2.write(this.mapper.writerWithDefaultPrettyPrinter().writeValueAsString(arrayList).getBytes(StandardCharsets.UTF_8));
                                        write2.close();
                                        processSession.transfer(create2, GRAPH);
                                        i++;
                                    } catch (Exception e) {
                                        getLogger().error("Error processing record at index {}", new Object[]{Integer.valueOf(i), e});
                                        createWriter.write(nextRecord);
                                        processSession.remove(create2);
                                        i++;
                                    }
                                } catch (Throwable th) {
                                    int i2 = i + 1;
                                    throw th;
                                }
                            }
                            long currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000;
                            if (getLogger().isDebugEnabled()) {
                                getLogger().debug(String.format("Took %s seconds.\nHandled %d records", Long.valueOf(currentTimeMillis2), Integer.valueOf(i)));
                            }
                            WriteResult finishRecordSet = createWriter.finishRecordSet();
                            createWriter.flush();
                            if (createWriter != null) {
                                createWriter.close();
                            }
                            if (write != null) {
                                write.close();
                            }
                            if (createRecordReader != null) {
                                createRecordReader.close();
                            }
                            if (read != null) {
                                read.close();
                            }
                            processSession.getProvenanceReporter().send(flowFile, this.clientService.getTransitUrl(), currentTimeMillis2 * 1000);
                            if (finishRecordSet.getRecordCount() < 1) {
                                processSession.remove(create);
                                processSession.transfer(processSession.putAttribute(flowFile, GRAPH_OPERATION_TIME, String.valueOf(currentTimeMillis2)), SUCCESS);
                            } else {
                                processSession.transfer(processSession.putAttribute(create, RECORD_COUNT, String.valueOf(finishRecordSet.getRecordCount())), FAILURE);
                                processSession.remove(flowFile);
                            }
                        } catch (Throwable th2) {
                            if (createWriter != null) {
                                try {
                                    createWriter.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            }
                            throw th2;
                        }
                    } catch (Throwable th4) {
                        if (write != null) {
                            try {
                                write.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    if (createRecordReader != null) {
                        try {
                            createRecordReader.close();
                        } catch (Throwable th7) {
                            th6.addSuppressed(th7);
                        }
                    }
                    throw th6;
                }
            } finally {
            }
        } catch (Exception e2) {
            getLogger().error("Error reading records, routing input FlowFile to failure", e2);
            processSession.remove(create);
            processSession.transfer(flowFile, FAILURE);
        }
    }

    private List<Map<String, Object>> executeQuery(String str, Map<String, Object> map) {
        ObjectMapper objectMapper = new ObjectMapper();
        ArrayList arrayList = new ArrayList();
        this.clientService.executeQuery(str, map, (map2, z) -> {
            if (getLogger().isDebugEnabled()) {
                try {
                    getLogger().debug(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(map2));
                } catch (JsonProcessingException e) {
                    getLogger().error("Error converted map to JSON ", e);
                }
            }
            arrayList.add(map2);
        });
        return arrayList;
    }
}
