package org.apache.nifi.redis.processor;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.util.RedisUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.springframework.data.redis.connection.RedisConnection;

@CapabilityDescription("Puts record field data into Redis using a specified hash value, which is determined by a RecordPath to a field in each record containing the hash value. The record fields and values are stored as key/value pairs associated by the hash value. NOTE: Neither the evaluated hash value nor any of the field values can be null. If the hash value is null, the FlowFile will be routed to failure. For each of the field values, if the value is null that field will be not set in Redis.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "redis", "hash", "record"})
@WritesAttributes({@WritesAttribute(attribute = PutRedisHashRecord.SUCCESS_RECORD_COUNT, description = "Number of records written to Redis")})
/* loaded from: input_file:org/apache/nifi/redis/processor/PutRedisHashRecord.class */
public class PutRedisHashRecord extends AbstractProcessor {
    public static final String SUCCESS_RECORD_COUNT = "redis.success.record.count";
    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor HASH_VALUE_RECORD_PATH = new PropertyDescriptor.Builder().name("hash-value-record-path").displayName("Hash Value Record Path").description("Specifies a RecordPath to evaluate against each Record in order to determine the hash value associated with all the record fields/values (see 'hset' in Redis documentation for more details). The RecordPath must point at exactly one field or an error will occur.").required(true).addValidator(new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor DATA_RECORD_PATH = new PropertyDescriptor.Builder().name("data-record-path").displayName("Data Record Path").description("This property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to Redis instead of sending the entire incoming Record. The property defaults to the root '/' which corresponds to a 'flat' record (all fields/values at the top level of  the Record.").required(true).addValidator(new RecordPathValidator()).defaultValue("/").expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("charset").displayName("Character Set").description("Specifies the character set to use when storing record field values as strings. All fields will be converted to strings using this character set before being stored in Redis.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles having all Records stored in Redis will be routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles containing Records with processing errors will be routed to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(RECORD_READER_FACTORY, RedisUtils.REDIS_CONNECTION_POOL, HASH_VALUE_RECORD_PATH, DATA_RECORD_PATH, CHARSET);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private volatile RedisConnectionPool redisConnectionPool;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        this.redisConnectionPool = processContext.getProperty(RedisUtils.REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
    }

    @OnStopped
    public void onStopped() {
        this.redisConnectionPool = null;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory asControllerService = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        long j = 0;
        try {
            InputStream read = processSession.read(flowFile);
            try {
                RecordReader createRecordReader = asControllerService.createRecordReader(flowFile, read, getLogger());
                try {
                    RedisConnection connection = this.redisConnectionPool.getConnection();
                    try {
                        RecordPath compile = RecordPath.compile(processContext.getProperty(HASH_VALUE_RECORD_PATH).getValue());
                        RecordPath compile2 = RecordPath.compile(processContext.getProperty(DATA_RECORD_PATH).getValue());
                        Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                if (connection != null) {
                                    connection.close();
                                }
                                if (createRecordReader != null) {
                                    createRecordReader.close();
                                }
                                if (read != null) {
                                    read.close();
                                }
                                processSession.transfer(processSession.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(j)), REL_SUCCESS);
                                return;
                            }
                            List list = compile.evaluate(nextRecord).getSelectedFields().distinct().toList();
                            if (list.isEmpty()) {
                                throw new ProcessException(String.format("No results found for Record [%d] Hash Value Record Path: %s", Long.valueOf(j), compile.getPath()));
                            }
                            if (list.size() > 1) {
                                throw new ProcessException(String.format("Multiple results [%d] found for Record [%d] Hash Value Record Path: %s", Integer.valueOf(list.size()), Long.valueOf(j), compile.getPath()));
                            }
                            Object value = ((FieldValue) list.getFirst()).getValue();
                            if (value == null) {
                                throw new ProcessException(String.format("Null value found for Record [%d] Hash Value Record Path: %s", Long.valueOf(j), compile.getPath()));
                            }
                            j = putDataRecordsToRedis(getDataRecords(compile2, nextRecord), connection, (String) DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), forName.name()), forName, j);
                        }
                    } catch (Throwable th) {
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (createRecordReader != null) {
                        try {
                            createRecordReader.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (MalformedRecordException e) {
            getLogger().error("Read Records failed {}", new Object[]{flowFile, e});
            processSession.transfer(processSession.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(0L)), REL_FAILURE);
        } catch (SchemaNotFoundException e2) {
            getLogger().error("Record Schema not found {}", new Object[]{flowFile, e2});
            processSession.transfer(processSession.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(0L)), REL_FAILURE);
        } catch (Exception e3) {
            getLogger().error("Put Records failed {}", new Object[]{flowFile, e3});
            processSession.transfer(processSession.putAttribute(flowFile, SUCCESS_RECORD_COUNT, String.valueOf(0L)), REL_FAILURE);
        }
    }

    private List<Record> getDataRecords(RecordPath recordPath, Record record) {
        if (recordPath == null) {
            return Collections.singletonList(record);
        }
        List list = recordPath.evaluate(record).getSelectedFields().toList();
        if (list.isEmpty()) {
            throw new ProcessException("RecordPath " + recordPath.getPath() + " evaluated against Record yielded no results.");
        }
        Iterator it = list.iterator();
        while (it.hasNext()) {
            RecordFieldType fieldType = ((FieldValue) it.next()).getField().getDataType().getFieldType();
            if (fieldType != RecordFieldType.RECORD) {
                throw new ProcessException("RecordPath " + recordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type " + String.valueOf(fieldType));
            }
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator it2 = list.iterator();
        while (it2.hasNext()) {
            arrayList.add((Record) ((FieldValue) it2.next()).getValue());
        }
        return arrayList;
    }

    private long putDataRecordsToRedis(List<Record> list, RedisConnection redisConnection, String str, Charset charset, long j) {
        long j2 = j;
        for (Record record : list) {
            Iterator it = record.getSchema().getFields().iterator();
            while (it.hasNext()) {
                String fieldName = ((RecordField) it.next()).getFieldName();
                Object value = record.getValue(fieldName);
                if (fieldName == null || value == null) {
                    getLogger().debug("Record field missing required elements: name [{}] value [{}]", new Object[]{fieldName, value});
                } else {
                    redisConnection.hashCommands().hSet(str.getBytes(charset), fieldName.getBytes(charset), ((String) DataTypeUtils.convertType(value, RecordFieldType.STRING.getDataType(), charset.name())).getBytes(charset));
                }
            }
            j2++;
        }
        return j2;
    }
}
