package org.apache.nifi.processors.hadoop;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosUser;
import org.ietf.jgss.GSSException;

@RequiresInstanceClassLoading(cloneAncestorResources = true)
/* loaded from: input_file:org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.class */
public abstract class AbstractHadoopProcessor extends AbstractProcessor implements ClassloaderIsolationKeyProvider {
    private static final String NORMALIZE_ERROR_WITH_PROPERTY = "The filesystem component of the URI configured in the '{}' property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) and will be ignored.";
    private static final String NORMALIZE_ERROR_WITHOUT_PROPERTY = "The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) and will be ignored.";
    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
    public static final String HADOOP_FILE_URL_ATTRIBUTE = "hadoop.file.url";
    protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created";
    final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
    private static final String DENY_LFS_ACCESS = "NIFI_HDFS_DENY_LOCAL_FILE_SYSTEM_ACCESS";
    private static final String DENY_LFS_EXPLANATION = String.format("LFS Access Denied according to Environment Variable [%s]", DENY_LFS_ACCESS);
    private static final Pattern LOCAL_FILE_SYSTEM_URI = Pattern.compile("^file:.*");
    public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources").description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration. To use swebhdfs, see 'Additional Details' section of PutHDFS's documentation.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder().name("Directory").description("The HDFS directory from which files should be read").required(true).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true).allowableValues(CompressionType.allowableValues()).defaultValue(CompressionType.NONE.toString()).build();
    public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder().name("Additional Classpath Resources").description("A comma-separated list of paths to files and/or directories that will be added to the classpath and used for loading native libraries. When specifying a directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[]{ResourceType.DIRECTORY}).dynamicallyModifiesClasspath(true).build();
    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder().name("kerberos-user-service").displayName("Kerberos User Service").description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosUserService.class).required(false).build();
    private static final Object RESOURCES_LOCK = new Object();
    private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(HADOOP_CONFIGURATION_RESOURCES, KERBEROS_USER_SERVICE, ADDITIONAL_CLASSPATH_RESOURCES);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/hadoop/AbstractHadoopProcessor$ValidationResources.class */
    public static class ValidationResources {
        private final List<String> configLocations;
        private final Configuration configuration;

        public ValidationResources(List<String> list, Configuration configuration) {
            this.configLocations = list;
            this.configuration = configuration;
        }

        public List<String> getConfigLocations() {
            return this.configLocations;
        }

        public Configuration getConfiguration() {
            return this.configuration;
        }
    }

    protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.hdfsResources.set(EMPTY_HDFS_RESOURCES);
    }

    public void migrateProperties(PropertyConfiguration propertyConfiguration) {
        propertyConfiguration.removeProperty("Kerberos Principal");
        propertyConfiguration.removeProperty("Kerberos Password");
        propertyConfiguration.removeProperty("Kerberos Keytab");
        propertyConfiguration.removeProperty("kerberos-credentials-service");
        propertyConfiguration.removeProperty("Kerberos Relogin Period");
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.nifi.processors.hadoop.AbstractHadoopProcessor$1ClassloaderIsolationKeyBuilder] */
    public String getClassloaderIsolationKey(PropertyContext propertyContext) {
        ?? r0 = new Object(this) { // from class: org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.1ClassloaderIsolationKeyBuilder
            private static final String SEPARATOR = "__";
            private final StringBuilder sb = new StringBuilder();

            void add(String str) {
                if (str != null) {
                    if (this.sb.length() > 0) {
                        this.sb.append(SEPARATOR);
                    }
                    this.sb.append(str);
                }
            }

            String build() {
                if (this.sb.length() > 0) {
                    return this.sb.toString();
                }
                return null;
            }
        };
        r0.add(propertyContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue());
        r0.add(propertyContext.getProperty(ADDITIONAL_CLASSPATH_RESOURCES).getValue());
        try {
            KerberosUserService asControllerService = propertyContext.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
            if (asControllerService != null) {
                r0.add(asControllerService.createKerberosUser().getPrincipal());
            }
        } catch (IllegalStateException e) {
        }
        return r0.build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList();
        List<String> configLocations = getConfigLocations(validationContext);
        if (configLocations.isEmpty()) {
            return arrayList;
        }
        try {
            arrayList.addAll(validateFileSystem(getHadoopConfigurationForValidation(configLocations)));
        } catch (IOException e) {
            arrayList.add(new ValidationResult.Builder().valid(false).subject("Hadoop Configuration Resources").explanation("Could not load Hadoop Configuration resources due to: " + String.valueOf(e)).build());
        }
        return arrayList;
    }

    protected Collection<ValidationResult> validateFileSystem(Configuration configuration) {
        ArrayList arrayList = new ArrayList();
        if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) {
            arrayList.add(new ValidationResult.Builder().valid(false).subject("Hadoop File System").explanation(DENY_LFS_EXPLANATION).build());
        }
        return arrayList;
    }

    protected Configuration getHadoopConfigurationForValidation(List<String> list) throws IOException {
        ValidationResources validationResources = this.validationResourceHolder.get();
        if (validationResources == null || !list.equals(validationResources.getConfigLocations())) {
            getLogger().debug("Reloading validation resources");
            ExtendedConfiguration extendedConfiguration = new ExtendedConfiguration(getLogger());
            extendedConfiguration.setClassLoader(Thread.currentThread().getContextClassLoader());
            validationResources = new ValidationResources(list, getConfigurationFromResources(extendedConfiguration, list));
            this.validationResourceHolder.set(validationResources);
        }
        return validationResources.getConfiguration();
    }

    @OnScheduled
    public final void abstractOnScheduled(ProcessContext processContext) throws IOException {
        try {
            if (this.hdfsResources.get().getConfiguration() == null) {
                this.hdfsResources.set(resetHDFSResources(getConfigLocations(processContext), processContext));
            }
        } catch (Exception e) {
            getLogger().error("HDFS Configuration failed", e);
            this.hdfsResources.set(EMPTY_HDFS_RESOURCES);
            throw e;
        }
    }

    protected List<String> getConfigLocations(PropertyContext propertyContext) {
        return propertyContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources().asLocations();
    }

    @OnStopped
    public final void abstractOnStopped() {
        HdfsResources hdfsResources = this.hdfsResources.get();
        if (hdfsResources != null) {
            HDFSResourceHelper.closeFileSystem(hdfsResources.getFileSystem());
        }
        this.hdfsResources.set(EMPTY_HDFS_RESOURCES);
    }

    private static Configuration getConfigurationFromResources(Configuration configuration, List<String> list) throws IOException {
        boolean z = !list.isEmpty();
        if (!z) {
            String configuration2 = configuration.toString();
            String[] split = configuration2.substring(configuration2.indexOf(":") + 1).split(",");
            int length = split.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                String str = split[i];
                if (!str.contains("default") && configuration.getResource(str.trim()) != null) {
                    z = true;
                    break;
                }
                i++;
            }
        } else {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                configuration.addResource(new Path(it.next().trim()));
            }
        }
        if (z) {
            return configuration;
        }
        throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
    }

    HdfsResources resetHDFSResources(List<String> list, ProcessContext processContext) throws IOException {
        UserGroupInformation loginSimple;
        KerberosUser kerberosUser;
        FileSystem fileSystemAsUser;
        ExtendedConfiguration extendedConfiguration = new ExtendedConfiguration(getLogger());
        extendedConfiguration.setClassLoader(Thread.currentThread().getContextClassLoader());
        getConfigurationFromResources(extendedConfiguration, list);
        preProcessConfiguration(extendedConfiguration, processContext);
        checkHdfsUriForTimeout(extendedConfiguration);
        extendedConfiguration.set(String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(extendedConfiguration).getScheme()), "true");
        synchronized (RESOURCES_LOCK) {
            if (SecurityUtil.isSecurityEnabled(extendedConfiguration)) {
                kerberosUser = getKerberosUser(processContext);
                loginSimple = SecurityUtil.getUgiForKerberosUser(extendedConfiguration, kerberosUser);
            } else {
                extendedConfiguration.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                extendedConfiguration.set(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION, "simple");
                loginSimple = SecurityUtil.loginSimple(extendedConfiguration);
                kerberosUser = null;
            }
            fileSystemAsUser = getFileSystemAsUser(extendedConfiguration, loginSimple);
        }
        getLogger().debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{loginSimple, kerberosUser});
        Path workingDirectory = fileSystemAsUser.getWorkingDirectory();
        getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[]{workingDirectory, Long.valueOf(fileSystemAsUser.getDefaultBlockSize(workingDirectory)), Short.valueOf(fileSystemAsUser.getDefaultReplication(workingDirectory)), extendedConfiguration.toString()});
        return new HdfsResources(extendedConfiguration, fileSystemAsUser, loginSimple, kerberosUser);
    }

    private KerberosUser getKerberosUser(ProcessContext processContext) {
        KerberosUserService asControllerService = processContext.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
        if (asControllerService != null) {
            return asControllerService.createKerberosUser();
        }
        throw new IllegalStateException("Unable to authenticate with Kerberos, no keytab or password was provided");
    }

    protected void preProcessConfiguration(Configuration configuration, ProcessContext processContext) {
    }

    protected FileSystem getFileSystem(Configuration configuration) throws IOException {
        return FileSystem.get(configuration);
    }

    protected FileSystem getFileSystemAsUser(Configuration configuration, UserGroupInformation userGroupInformation) throws IOException {
        try {
            return (FileSystem) userGroupInformation.doAs(() -> {
                return FileSystem.get(configuration);
            });
        } catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage());
        }
    }

    protected void checkHdfsUriForTimeout(Configuration configuration) throws IOException {
        URI defaultUri = FileSystem.getDefaultUri(configuration);
        String authority = defaultUri.getAuthority();
        int port = defaultUri.getPort();
        if (authority == null || authority.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(authority, port);
        Socket socket = null;
        try {
            socket = NetUtils.getDefaultSocketFactory(configuration).createSocket();
            NetUtils.connect(socket, createSocketAddr, 1000);
            IOUtils.closeQuietly(socket);
        } catch (Throwable th) {
            IOUtils.closeQuietly(socket);
            throw th;
        }
    }

    protected CompressionCodec getCompressionCodec(ProcessContext processContext, Configuration configuration) {
        CompressionCodec compressionCodec = null;
        if (processContext.getProperty(COMPRESSION_CODEC).isSet()) {
            compressionCodec = new CompressionCodecFactory(configuration).getCodecByClassName(CompressionType.valueOf(processContext.getProperty(COMPRESSION_CODEC).getValue()).toString());
        }
        return compressionCodec;
    }

    public static String getPathDifference(Path path, Path path2) {
        int depth = path2.depth() - path.depth();
        if (depth <= 1) {
            return "";
        }
        String name = path.getName();
        Path parent = path2.getParent();
        StringBuilder sb = new StringBuilder();
        sb.append(parent.getName());
        for (int i = depth - 3; i >= 0; i--) {
            parent = parent.getParent();
            String name2 = parent.getName();
            if (name2.equals(name) && parent.toString().endsWith(path.toString())) {
                break;
            }
            sb.insert(0, "/").insert(0, name2);
        }
        return sb.toString();
    }

    protected Configuration getConfiguration() {
        return this.hdfsResources.get().getConfiguration();
    }

    protected FileSystem getFileSystem() {
        return this.hdfsResources.get().getFileSystem();
    }

    protected UserGroupInformation getUserGroupInformation() {
        getLogger().trace("getting UGI instance");
        SecurityUtil.checkTGTAndRelogin(getLogger(), this.hdfsResources.get().getKerberosUser());
        return this.hdfsResources.get().getUserGroupInformation();
    }

    boolean isLocalFileSystemAccessDenied() {
        return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS));
    }

    protected boolean isFileSystemAccessDenied(URI uri) {
        return isLocalFileSystemAccessDenied() ? LOCAL_FILE_SYSTEM_URI.matcher(uri.toString()).matches() : false;
    }

    protected Path getNormalizedPath(ProcessContext processContext, PropertyDescriptor propertyDescriptor) {
        return getNormalizedPath(processContext, propertyDescriptor, null);
    }

    protected Path getNormalizedPath(String str) {
        return getNormalizedPath(str, Optional.empty());
    }

    protected Path getNormalizedPath(ProcessContext processContext, PropertyDescriptor propertyDescriptor, FlowFile flowFile) {
        return getNormalizedPath(processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(), Optional.of(propertyDescriptor.getDisplayName()));
    }

    private Path getNormalizedPath(String str, Optional<String> optional) {
        String str2;
        URI uri = new Path(str).toUri();
        URI uri2 = getFileSystem().getUri();
        if (uri.getScheme() != null) {
            if (!uri.getScheme().equals(uri2.getScheme()) || (uri.getAuthority() != null && !uri.getAuthority().equals(uri2.getAuthority()))) {
                if (optional.isPresent()) {
                    getLogger().warn(NORMALIZE_ERROR_WITH_PROPERTY, new Object[]{optional, uri, uri2});
                } else {
                    getLogger().warn(NORMALIZE_ERROR_WITHOUT_PROPERTY, new Object[]{uri, uri2});
                }
            }
            str2 = uri.getPath();
        } else {
            str2 = str;
        }
        return new Path(str2.replaceAll("/+", "/"));
    }

    protected <T extends Throwable> Optional<T> findCause(Throwable th, Class<T> cls, Predicate<T> predicate) {
        Stream stream = Throwables.getCausalChain(th).stream();
        Objects.requireNonNull(cls);
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Objects.requireNonNull(cls);
        return filter.map((v1) -> {
            return r1.cast(v1);
        }).filter(predicate).findFirst();
    }

    protected boolean handleAuthErrors(Throwable th, ProcessSession processSession, ProcessContext processContext, BiConsumer<ProcessSession, ProcessContext> biConsumer) {
        Optional findCause = findCause(th, GSSException.class, gSSException -> {
            return 13 == gSSException.getMajor();
        });
        if (!findCause.isPresent()) {
            return false;
        }
        getLogger().error("An error occurred while connecting to HDFS. Rolling back session and, and resetting HDFS resources", (Throwable) findCause.get());
        try {
            this.hdfsResources.set(resetHDFSResources(getConfigLocations(processContext), processContext));
        } catch (IOException e) {
            getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.");
        }
        biConsumer.accept(processSession, processContext);
        return true;
    }
}
