package org.apache.nifi.processors.azure.storage;

import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.MultiProcessorUseCase;
import org.apache.nifi.annotation.documentation.ProcessorConfiguration;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

@CapabilityDescription("Fetch the specified file from Azure Data Lake Storage")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@WritesAttributes({@WritesAttribute(attribute = "azure.datalake.storage.statusCode", description = "The HTTP error code (if available) from the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorCode", description = "The Azure Data Lake Storage moniker of the failed operation"), @WritesAttribute(attribute = "azure.datalake.storage.errorMessage", description = "The Azure Data Lake Storage error message from the failed operation")})
@MultiProcessorUseCase(description = "Retrieve all files in an Azure DataLake Storage directory", keywords = {"azure", "datalake", "adls", "state", "retrieve", "fetch", "all", "stream"}, configurations = {@ProcessorConfiguration(processorClass = ListAzureDataLakeStorage.class, configuration = "The \"Filesystem Name\" property should be set to the name of the Azure Filesystem (also known as a Container) that files reside in.     If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_FILESYSTEM}`.\nConfigure the \"Directory Name\" property to specify the name of the directory in the file system.     If the flow being built is to be reused elsewhere, it's a good idea to parameterize this property by setting it to something like `#{AZURE_DIRECTORY}`.\n\nThe \"ADLS Credentials\" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.\n\nThe 'success' Relationship of this Processor is then connected to FetchAzureDataLakeStorage.\n"), @ProcessorConfiguration(processorClass = FetchAzureDataLakeStorage.class, configuration = "\"Filesystem Name\" = \"${azure.filesystem}\"\n\"Directory Name\" = \"${azure.directory}\"\n\"File Name\" = \"${azure.filename}\"\n\nThe \"ADLS Credentials\" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem.\n")})
/* loaded from: input_file:org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.class */
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder().name("range-start").displayName("Range Start").description("The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder().name("range-length").displayName("Range Length").description("The number of bytes to download from the object, starting from the Range Start. An empty value or a value that extends beyond the end of the object will read to the end of the object.").addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder().name("number-of-retries").displayName("Number of Retries").description("The number of automatic retries to perform if the download fails.").addValidator(StandardValidators.createLongValidator(0, 2147483647L, true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).defaultValue("0").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(AzureStorageUtils.ADLS_CREDENTIALS_SERVICE, AzureStorageUtils.FILESYSTEM, AzureStorageUtils.DIRECTORY, AzureStorageUtils.FILE, RANGE_START, RANGE_LENGTH, NUM_RETRIES, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE);

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        long nanoTime = System.nanoTime();
        try {
            long longValue = processContext.getProperty(RANGE_START).isSet() ? processContext.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L;
            Long valueOf = processContext.getProperty(RANGE_LENGTH).isSet() ? Long.valueOf(processContext.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()) : null;
            int intValue = processContext.getProperty(NUM_RETRIES).isSet() ? processContext.getProperty(NUM_RETRIES).evaluateAttributeExpressions(flowFile).asInteger().intValue() : 0;
            FileRange fileRange = new FileRange(longValue, valueOf);
            DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions();
            downloadRetryOptions.setMaxRetryRequests(intValue);
            String evaluateFileSystemProperty = AzureStorageUtils.evaluateFileSystemProperty(AzureStorageUtils.FILESYSTEM, (PropertyContext) processContext, flowFile);
            String evaluateDirectoryProperty = AzureStorageUtils.evaluateDirectoryProperty(AzureStorageUtils.DIRECTORY, (PropertyContext) processContext, flowFile);
            String evaluateFileProperty = AzureStorageUtils.evaluateFileProperty((PropertyContext) processContext, flowFile);
            DataLakeFileClient fileClient = getStorageClient(processContext, flowFile).getFileSystemClient(evaluateFileSystemProperty).getDirectoryClient(evaluateDirectoryProperty).getFileClient(evaluateFileProperty);
            if (fileClient.getProperties().isDirectory().booleanValue()) {
                throw new ProcessException(AzureStorageUtils.FILE.getDisplayName() + " (" + evaluateFileProperty + ") points to a directory. Full path: " + fileClient.getFilePath());
            }
            FlowFile write = processSession.write(flowFile, outputStream -> {
                fileClient.readWithResponse(outputStream, fileRange, downloadRetryOptions, (DataLakeRequestConditions) null, false, (Duration) null, Context.NONE);
            });
            processSession.getProvenanceReporter().modifyContent(write);
            processSession.transfer(write, REL_SUCCESS);
            processSession.getProvenanceReporter().fetch(write, fileClient.getFileUrl(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
        } catch (Exception e) {
            getLogger().error("Failure to fetch file from Azure Data Lake Storage", e);
            processSession.transfer(processSession.penalize(processSession.putAttribute(flowFile, "azure.datalake.storage.errorMessage", e.getMessage())), REL_FAILURE);
        } catch (DataLakeStorageException e2) {
            getLogger().error("Failure to fetch file from Azure Data Lake Storage", e2);
            processSession.transfer(processSession.penalize(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(flowFile, "azure.datalake.storage.statusCode", String.valueOf(e2.getStatusCode())), "azure.datalake.storage.errorCode", e2.getErrorCode()), "azure.datalake.storage.errorMessage", e2.getMessage())), REL_FAILURE);
        }
    }
}
