nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject [nifi] branch main updated: NIFI-8737: Fixed incorrect provenance data in HDFS processors when Directory property is inconsistent with core-site.xml
Date Fri, 25 Jun 2021 12:20:09 GMT
This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 39ffa4a  NIFI-8737: Fixed incorrect provenance data in HDFS processors when Directory
property is inconsistent with core-site.xml
39ffa4a is described below

commit 39ffa4a8ac21b83f0b269828d4b162131e393903
Author: Peter Turcsanyi <turcsanyi@apache.org>
AuthorDate: Wed Jun 23 17:04:08 2021 +0200

    NIFI-8737: Fixed incorrect provenance data in HDFS processors when Directory property
is inconsistent with core-site.xml
---
 .../processors/hadoop/AbstractHadoopProcessor.java | 23 +++++++++
 .../nifi/processors/hadoop/AbstractPutHDFS.java    | 25 +++++----
 .../processors/hadoop/AbstractFetchHDFSRecord.java |  2 +-
 .../processors/hadoop/AbstractPutHDFSRecord.java   |  3 +-
 .../apache/nifi/processors/hadoop/DeleteHDFS.java  |  2 +-
 .../apache/nifi/processors/hadoop/FetchHDFS.java   |  2 +-
 .../org/apache/nifi/processors/hadoop/GetHDFS.java |  4 +-
 .../nifi/processors/hadoop/GetHDFSFileInfo.java    |  3 +-
 .../apache/nifi/processors/hadoop/ListHDFS.java    |  4 +-
 .../apache/nifi/processors/hadoop/MoveHDFS.java    | 21 ++++----
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 12 ++---
 .../nifi/processors/hadoop/AbstractHadoopTest.java | 59 ++++++++++++++++++++++
 12 files changed, 117 insertions(+), 43 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 1cce996..2838a38 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.resource.ResourceCardinality;
 import org.apache.nifi.components.resource.ResourceReferences;
 import org.apache.nifi.components.resource.ResourceType;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
@@ -606,4 +607,26 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor
{
         }
     }
 
+    protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property)
{
+        return getNormalizedPath(context, property, null);
+    }
+
+    protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property,
FlowFile flowFile) {
+        final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
+        final Path path = new Path(propertyValue);
+        final URI uri = path.toUri();
+
+        final URI fileSystemUri = getFileSystem().getUri();
+
+        if (uri.getScheme() != null) {
+            if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority()))
{
+                getLogger().warn("The filesystem component of the URI configured in the '{}'
property ({}) does not match the filesystem URI from the Hadoop configuration file ({}) "
+
+                        "and will be ignored.", property.getDisplayName(), uri, fileSystemUri);
+            }
+
+            return new Path(uri.getPath());
+        } else {
+            return path;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
index 44fae65..b86b7dc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
@@ -100,13 +100,12 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor
{
                 Path tempDotCopyFile = null;
                 FlowFile putFlowFile = flowFile;
                 try {
-                    final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
-                    final Path configuredRootDirPath = new Path(dirValue);
+                    final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
 
                     final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
-                    final long blockSize = getBlockSize(context, session, putFlowFile);
+                    final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
                     final int bufferSize = getBufferSize(context, session, putFlowFile);
-                    final short replication = getReplication(context, session, putFlowFile);
+                    final short replication = getReplication(context, session, putFlowFile,
dirPath);
 
                     final CompressionCodec codec = getCompressionCodec(context, configuration);
 
@@ -114,19 +113,19 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor
{
                             ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
                             : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
 
-                    final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
-                    final Path copyFile = new Path(configuredRootDirPath, filename);
+                    final Path tempCopyFile = new Path(dirPath, "." + filename);
+                    final Path copyFile = new Path(dirPath, filename);
 
                     // Create destination directory if it does not exist
                     try {
-                        if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
-                            throw new IOException(configuredRootDirPath.toString() + " already
exists and is not a directory");
+                        if (!hdfs.getFileStatus(dirPath).isDirectory()) {
+                            throw new IOException(dirPath.toString() + " already exists and
is not a directory");
                         }
                     } catch (FileNotFoundException fe) {
-                        if (!hdfs.mkdirs(configuredRootDirPath)) {
-                            throw new IOException(configuredRootDirPath.toString() + " could
not be created");
+                        if (!hdfs.mkdirs(dirPath)) {
+                            throw new IOException(dirPath.toString() + " could not be created");
                         }
-                        changeOwner(context, hdfs, configuredRootDirPath, flowFile);
+                        changeOwner(context, hdfs, dirPath, flowFile);
                     }
 
                     final boolean destinationExists = hdfs.exists(copyFile);
@@ -274,7 +273,7 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor
{
     /**
      * Returns with the expected block size.
      */
-    protected abstract long getBlockSize(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile);
+    protected abstract long getBlockSize(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile, final Path dirPath);
 
     /**
      * Returns with the expected buffer size.
@@ -284,7 +283,7 @@ public abstract class AbstractPutHDFS extends AbstractHadoopProcessor
{
     /**
      * Returns with the expected replication factor.
      */
-    protected abstract short getReplication(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile);
+    protected abstract short getReplication(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile, final Path dirPath);
 
     /**
      * Returns if file system should ignore locality.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 7248e8f..33e762f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -179,7 +179,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor
{
             FlowFile child = null;
             final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(originalFlowFile).getValue();
             try {
-                final Path path = new Path(filenameValue);
+                final Path path = getNormalizedPath(context, FILENAME, originalFlowFile);
                 final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null);
                 final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 5ee54e3..a595128 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -273,10 +273,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor
{
             FlowFile putFlowFile = flowFile;
             try {
                 final String filenameValue = putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
// TODO codec extension
-                final String directoryValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
 
                 // create the directory if it doesn't exist
-                final Path directoryPath = new Path(directoryValue);
+                final Path directoryPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
                 createDirectory(fileSystem, directoryPath, remoteOwner, remoteGroup);
 
                 // write to tempFile first and on success rename to destFile
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 8788085..9296507 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -145,7 +145,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
         // We need a FlowFile to report provenance correctly.
         final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
 
-        final String fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(finalFlowFile).getValue();
+        final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY,
finalFlowFile).toString();
 
         final FileSystem fileSystem = getFileSystem();
         final UserGroupInformation ugi = getUserGroupInformation();
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 26fd382..b60ee5b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -125,7 +125,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
 
         final Path path;
         try {
-            path = new Path(filenameValue);
+            path = getNormalizedPath(context, FILENAME, flowFile);
         } catch (IllegalArgumentException e) {
             getLogger().error("Failed to retrieve content from {} for {} due to {}; routing
to failure", new Object[] {filenameValue, flowFile, e});
             flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index ba53377..3f08da0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -345,7 +345,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
                 BUFFER_SIZE_DEFAULT);
-        final Path rootDir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+        final Path rootDir = getNormalizedPath(context, DIRECTORY);
 
         final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
         final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
@@ -427,7 +427,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
         if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock())
{
             try {
                 final FileSystem hdfs = getFileSystem();
-                final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+                final Path directoryPath = getNormalizedPath(context, DIRECTORY);
 
                 if (!hdfs.exists(directoryPath)) {
                     context.yield();
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index f2864f0..8383732 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -582,7 +582,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
      */
     protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, FlowFile ff)
{
         HDFSFileInfoRequest req = new HDFSFileInfoRequest();
-        req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
+        String fullPath = getNormalizedPath(context, FULL_PATH, ff).toString();
+        req.setFullPath(fullPath);
         req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
 
         PropertyValue pv;
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 02209e3..583b0b8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -402,8 +402,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
         lastRunTimestamp = now;
 
-        final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
-
         // Ensure that we are using the latest listing information before we try to perform
a listing of HDFS files.
         try {
             final StateMap stateMap = session.getState(Scope.CLUSTER);
@@ -443,7 +441,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         final Set<FileStatus> statuses;
         try {
-            final Path rootPath = new Path(directory);
+            final Path rootPath = getNormalizedPath(context, DIRECTORY);
             statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context),
fileFilterMode);
             getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
         } catch (final IOException | IllegalArgumentException e) {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 76a03d0..acb4c85 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.hadoop;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -245,7 +246,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
 
         Path inputPath;
         try {
-            inputPath = new Path(filenameValue);
+            inputPath = getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE, flowFile);
             if (!hdfs.exists(inputPath)) {
                 throw new IOException("Input Directory or File does not exist in HDFS");
             }
@@ -348,9 +349,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                     FlowFile flowFile = session.create(parentFlowFile);
                     try {
                         final String originalFilename = file.getName();
-                        final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
-                        final Path configuredRootOutputDirPath = new Path(outputDirValue);
-                        final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
+                        final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY,
parentFlowFile);
+                        final Path newFile = new Path(outputDirPath, originalFilename);
                         final boolean destinationExists = hdfs.exists(newFile);
                         // If destination file already exists, resolve that
                         // based on processor configuration
@@ -382,15 +382,15 @@ public class MoveHDFS extends AbstractHadoopProcessor {
 
                         // Create destination directory if it does not exist
                         try {
-                            if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory())
{
-                                throw new IOException(configuredRootOutputDirPath.toString()
+                            if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
+                                throw new IOException(outputDirPath.toString()
                                         + " already exists and is not a directory");
                             }
                         } catch (FileNotFoundException fe) {
-                            if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
-                                throw new IOException(configuredRootOutputDirPath.toString()
+ " could not be created");
+                            if (!hdfs.mkdirs(outputDirPath)) {
+                                throw new IOException(outputDirPath.toString() + " could
not be created");
                             }
-                            changeOwner(context, hdfs, configuredRootOutputDirPath);
+                            changeOwner(context, hdfs, outputDirPath);
                         }
 
                         boolean moved = false;
@@ -419,8 +419,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                         final String hdfsPath = newFile.getParent().toString();
                         flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(),
newFilename);
                         flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE,
hdfsPath);
-                        final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/"
+ outputPath
-                                : "hdfs://" + outputPath;
+                        final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath,
"/");
                         session.getProvenanceReporter().send(flowFile, transitUri);
                         session.transfer(flowFile, REL_SUCCESS);
 
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index fef0805..00942e3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -189,11 +189,9 @@ public class PutHDFS extends AbstractPutHDFS {
     }
 
     @Override
-    protected long getBlockSize(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile) {
-        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-        final Path configuredRootDirPath = new Path(dirValue);
+    protected long getBlockSize(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, Path dirPath) {
         final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
-        return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(configuredRootDirPath);
+        return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
     }
 
     @Override
@@ -203,12 +201,10 @@ public class PutHDFS extends AbstractPutHDFS {
     }
 
     @Override
-    protected short getReplication(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile) {
-        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
-        final Path configuredRootDirPath = new Path(dirValue);
+    protected short getReplication(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, Path dirPath) {
         final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
         return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
-                .getDefaultReplication(configuredRootDirPath);
+                .getDefaultReplication(dirPath);
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index d60ba7f..0cd069e 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.resource.FileResourceReference;
@@ -39,11 +41,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -207,4 +212,58 @@ public class AbstractHadoopTest {
         runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath());
         runner.assertValid();
     }
+
+    @Test
+    public void testGetNormalizedPathWithoutFileSystem() throws URISyntaxException {
+        AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container1@storageaccount1");
+        TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "/dir1");
+
+        Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+        assertEquals("/dir1", path.toString());
+        assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+    }
+
+    @Test
+    public void testGetNormalizedPathWithCorrectFileSystem() throws URISyntaxException {
+        AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container2@storageaccount2");
+        TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container2@storageaccount2/dir2");
+
+        Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+        assertEquals("/dir2", path.toString());
+        assertTrue(runner.getLogger().getWarnMessages().isEmpty());
+    }
+
+    @Test
+    public void testGetNormalizedPathWithIncorrectFileSystem() throws URISyntaxException
{
+        AbstractHadoopProcessor processor = initProcessorForTestGetNormalizedPath("abfs://container3@storageaccount3");
+        TestRunner runner = initTestRunnerForTestGetNormalizedPath(processor, "abfs://container*@storageaccount*/dir3");
+
+        Path path = processor.getNormalizedPath(runner.getProcessContext(), AbstractHadoopProcessor.DIRECTORY);
+
+        assertEquals("/dir3", path.toString());
+        assertFalse(runner.getLogger().getWarnMessages().isEmpty());
+    }
+
+    private AbstractHadoopProcessor initProcessorForTestGetNormalizedPath(String fileSystemUri)
throws URISyntaxException {
+        final FileSystem fileSystem = mock(FileSystem.class);
+        when(fileSystem.getUri()).thenReturn(new URI(fileSystemUri));
+
+        final PutHDFS processor = new PutHDFS() {
+            @Override
+            protected FileSystem getFileSystem() {
+                return fileSystem;
+            }
+        };
+
+        return processor;
+    }
+
+    private TestRunner initTestRunnerForTestGetNormalizedPath(AbstractHadoopProcessor processor,
String directory) throws URISyntaxException {
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(AbstractHadoopProcessor.DIRECTORY, directory);
+
+        return runner;
+    }
 }

Mime
View raw message