package org.apache.nifi.processors.parquet;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parquet.stream.NifiParquetInputFile;
import org.apache.nifi.parquet.utils.ParquetAttribute;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;

@CapabilityDescription("The processor generates N flow files from the input, and adds attributes with the offsets required to read the group of rows in the FlowFile's content. Can be used to increase the overall efficiency of processing extremely large Parquet files.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"parquet", "split", "partition", "break apart", "efficient processing", "load balance", "cluster"})
@WritesAttributes({@WritesAttribute(attribute = ParquetAttribute.RECORD_OFFSET, description = "Sets the index of first record of the parquet file."), @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.")})
@ReadsAttributes({@ReadsAttribute(attribute = ParquetAttribute.RECORD_OFFSET, description = "Gets the index of first record in the input."), @ReadsAttribute(attribute = "record.count", description = "Gets the number of records in the input."), @ReadsAttribute(attribute = ParquetAttribute.FILE_RANGE_START_OFFSET, description = "Gets the start offset of the selected row group in the parquet file."), @ReadsAttribute(attribute = ParquetAttribute.FILE_RANGE_END_OFFSET, description = "Gets the end offset of the selected row group in the parquet file.")})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/parquet/CalculateParquetOffsets.class */
public class CalculateParquetOffsets extends AbstractProcessor {
    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new PropertyDescriptor.Builder().name("Records Per Split").description("Specifies how many records should be covered in each FlowFile").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new PropertyDescriptor.Builder().name("Zero Content Output").description("Whether to do, or do not copy the content of input FlowFile.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles, with special attributes that represent a chunk of the input file.").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(PROP_RECORDS_PER_SPLIT, PROP_ZERO_CONTENT_OUTPUT);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        long longValue = processContext.getProperty(PROP_RECORDS_PER_SPLIT).evaluateAttributeExpressions(flowFile).asLong().longValue();
        boolean booleanValue = processContext.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean().booleanValue();
        long longValue2 = ((Long) Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_OFFSET)).map(Long::valueOf).orElse(0L)).longValue();
        long longValue3 = ((Long) Optional.ofNullable(flowFile.getAttribute("record.count")).map(Long::valueOf).orElseGet(() -> {
            return Long.valueOf(getRecordCount(processSession, flowFile) - longValue2);
        })).longValue();
        processSession.transfer(getPartitions(processSession, flowFile, longValue, longValue3, longValue2, booleanValue), REL_SUCCESS);
        processSession.adjustCounter("Records Split", longValue3, false);
        processSession.adjustCounter("Partitions Created", r0.size(), false);
        if (booleanValue) {
            processSession.remove(flowFile);
        }
    }

    private long getRecordCount(ProcessSession processSession, FlowFile flowFile) {
        ParquetReadOptions build = ParquetReadOptions.builder().withRange(((Long) Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET)).map(Long::parseLong).orElse(0L)).longValue(), ((Long) Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET)).map(Long::parseLong).orElse(Long.MAX_VALUE)).longValue()).build();
        try {
            InputStream read = processSession.read(flowFile);
            try {
                ParquetFileReader parquetFileReader = new ParquetFileReader(new NifiParquetInputFile(read, flowFile.getSize()), build);
                try {
                    long recordCount = parquetFileReader.getRecordCount();
                    parquetFileReader.close();
                    if (read != null) {
                        read.close();
                    }
                    return recordCount;
                } catch (Throwable th) {
                    try {
                        parquetFileReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new ProcessException(e);
        }
    }

    private List<FlowFile> getPartitions(ProcessSession processSession, FlowFile flowFile, long j, long j2, long j3, boolean z) {
        long j4 = (j2 / j) + (j2 % j > 0 ? 1 : 0);
        ArrayList arrayList = new ArrayList((int) Math.min(2147483647L, j4));
        long j5 = 0;
        while (true) {
            long j6 = j5;
            if (j6 >= j4) {
                return arrayList;
            }
            long j7 = j6 * j;
            arrayList.add(processSession.putAllAttributes(z ? processSession.create(flowFile) : j6 == 0 ? flowFile : processSession.clone(flowFile), Map.of(ParquetAttribute.RECORD_OFFSET, Long.toString(j3 + j7), "record.count", Long.toString(Math.min(j, j2 - j7)))));
            j5 = j6 + 1;
        }
    }
}
