nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [2/2] nifi git commit: NIFI-3366 This closes #2332. Added parent/child flowfile relationship between the incoming flowfile and the files that are moved from the input directory to the output directory. Updated to allow tests to check for evaluation of pr
Date Mon, 11 Dec 2017 13:42:02 GMT
NIFI-3366 This closes #2332. Added parent/child flowfile relationship between the incoming flowfile and the files that are moved from the input directory to the output directory.
Updated to allow tests to check for evaluation of properties that support expression language.
Fixed bug with changeOwner attempting to operate on original file rather than the moved/copied file.
Added license header to MoveHDFSTest.java
Added test for moving a directory of files that contains a subdir, ensuring non-recursive behavior
Added to the description of the processor that it is non-recursive when a directory is used as input.
Added RAT exclude for test resource .dotfile to pom.xml

Signed-off-by: joewitt <joewitt@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/600586d6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/600586d6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/600586d6

Branch: refs/heads/master
Commit: 600586d6be1ee2b7f8fb9397623a2bc8d72df1b7
Parents: 3731fbe
Author: Jeff Storck <jtswork@gmail.com>
Authored: Thu Dec 7 15:39:22 2017 -0500
Committer: joewitt <joewitt@apache.org>
Committed: Mon Dec 11 08:41:36 2017 -0500

----------------------------------------------------------------------
 .../nifi-hdfs-processors/pom.xml                |  15 +
 .../apache/nifi/processors/hadoop/MoveHDFS.java | 880 +++++++++----------
 .../nifi/processors/hadoop/MoveHDFSTest.java    | 408 +++++----
 .../src/test/resources/testdata/.dotfile        |   0
 4 files changed, 685 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index e6b8b91..32576e1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -66,4 +66,19 @@
             <artifactId>nifi-properties</artifactId>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/testdata/.dotfile</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index e9842b7..5c889d6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -60,145 +61,141 @@ import org.apache.nifi.util.StopWatch;
 /**
  * This processor renames files on HDFS.
  */
-@Tags({ "hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS" })
-@CapabilityDescription("Rename existing files on Hadoop Distributed File System (HDFS)")
+@Tags({"hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS"})
+@CapabilityDescription("Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).")
 @ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
 @WritesAttributes({
-		@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
-		@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.") })
-@SeeAlso({ PutHDFS.class, GetHDFS.class })
+        @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")})
+@SeeAlso({PutHDFS.class, GetHDFS.class})
 public class MoveHDFS extends AbstractHadoopProcessor {
 
-	// static global
-	public static final int MAX_WORKING_QUEUE_SIZE = 25000;
-	public static final String REPLACE_RESOLUTION = "replace";
-	public static final String IGNORE_RESOLUTION = "ignore";
-	public static final String FAIL_RESOLUTION = "fail";
-
-	private static final Set<Relationship> relationships;
-
-	public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
-			REPLACE_RESOLUTION, "Replaces the existing file if any.");
-	public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
-			"Failed rename operation stops processing and routes to success.");
-	public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
-			"Failing to rename a file routes to failure.");
-
-	public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
-	public static final int BUFFER_SIZE_DEFAULT = 4096;
-
-	public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
-
-	// relationships
-	public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
-			.description("Files that have been successfully renamed on HDFS are transferred to this relationship")
-			.build();
-
-	public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
-			.description("Files that could not be renamed on HDFS are transferred to this relationship").build();
-
-	// properties
-	public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
-			.name("Conflict Resolution Strategy")
-			.description(
-					"Indicates what should happen when a file with the same name already exists in the output directory")
-			.required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
-			.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
-
-	public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
-			.name("File Filter Regex")
-			.description(
-					"A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
-							+ "Expression will be fetched, otherwise all files will be fetched")
-			.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
-
-	public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
-			.name("Ignore Dotted Files")
-			.description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
-			.allowableValues("true", "false").defaultValue("true").build();
-
-	public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
-			.name("Input Directory or File")
-			.description("The HDFS directory from which files should be read, or a single file to read")
-			.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
-			.expressionLanguageSupported(true).build();
-
-	public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory")
-			.description("The HDFS directory where the files will be moved to").required(true)
-			.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
-			.build();
-
-	public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation")
-			.description("The operation that will be performed on the source file").required(true)
-			.allowableValues("move", "copy").defaultValue("move").build();
-
-	public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner")
-			.description(
-					"Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group")
-			.description(
-					"Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
-			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-	static {
-		final Set<Relationship> rels = new HashSet<>();
-		rels.add(REL_SUCCESS);
-		rels.add(REL_FAILURE);
-		relationships = Collections.unmodifiableSet(rels);
-	}
-
-	// non-static global
-	protected ProcessorConfiguration processorConfig;
-	private final AtomicLong logEmptyListing = new AtomicLong(2L);
-
-	private final Lock listingLock = new ReentrantLock();
-	private final Lock queueLock = new ReentrantLock();
-
-	private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
-	private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
-
-	// methods
-	@Override
-	public Set<Relationship> getRelationships() {
-		return relationships;
-	}
-
-	@Override
-	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-		List<PropertyDescriptor> props = new ArrayList<>(properties);
-		props.add(CONFLICT_RESOLUTION);
-		props.add(INPUT_DIRECTORY_OR_FILE);
-		props.add(OUTPUT_DIRECTORY);
-		props.add(OPERATION);
-		props.add(FILE_FILTER_REGEX);
-		props.add(IGNORE_DOTTED_FILES);
-		props.add(REMOTE_OWNER);
-		props.add(REMOTE_GROUP);
-		return props;
-	}
-
-	@OnScheduled
-	public void onScheduled(ProcessContext context) throws Exception {
-		super.abstractOnScheduled(context);
-		// copy configuration values to pass them around cleanly
-		processorConfig = new ProcessorConfiguration(context);
-		// forget the state of the queue in case HDFS contents changed while
-		// this processor was turned off
-		queueLock.lock();
-		try {
-			filePathQueue.clear();
-			processing.clear();
-		} finally {
-			queueLock.unlock();
-		}
-	}
-
-	@Override
-	public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-		// MoveHDFS
-		FlowFile parentFlowFile = session.get();
+    // static global
+    public static final String REPLACE_RESOLUTION = "replace";
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    private static final Set<Relationship> relationships;
+
+    public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Failed rename operation stops processing and routes to success.");
+    public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Failing to rename a file routes to failure.");
+
+    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
+
+    // relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Files that have been successfully renamed on HDFS are transferred to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("Files that could not be renamed on HDFS are transferred to this relationship").build();
+
+    // properties
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description(
+                    "Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
+
+    public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
+            .name("File Filter Regex")
+            .description(
+                    "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
+                            + "Expression will be fetched, otherwise all files will be fetched")
+            .required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
+            .name("Ignore Dotted Files")
+            .description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
+            .allowableValues("true", "false").defaultValue("true").build();
+
+    public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
+            .name("Input Directory or File")
+            .description("The HDFS directory from which files should be read, or a single file to read.")
+            .defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .expressionLanguageSupported(true).build();
+
+    public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory")
+            .description("The HDFS directory where the files will be moved to").required(true)
+            .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation")
+            .description("The operation that will be performed on the source file").required(true)
+            .allowableValues("move", "copy").defaultValue("move").build();
+
+    public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner")
+            .description(
+                    "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group")
+            .description(
+                    "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    // non-static global
+    protected ProcessorConfiguration processorConfig;
+    private final AtomicLong logEmptyListing = new AtomicLong(2L);
+
+    private final Lock listingLock = new ReentrantLock();
+    private final Lock queueLock = new ReentrantLock();
+
+    private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
+
+    // methods
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(CONFLICT_RESOLUTION);
+        props.add(INPUT_DIRECTORY_OR_FILE);
+        props.add(OUTPUT_DIRECTORY);
+        props.add(OPERATION);
+        props.add(FILE_FILTER_REGEX);
+        props.add(IGNORE_DOTTED_FILES);
+        props.add(REMOTE_OWNER);
+        props.add(REMOTE_GROUP);
+        return props;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws Exception {
+        super.abstractOnScheduled(context);
+        // copy configuration values to pass them around cleanly
+        processorConfig = new ProcessorConfiguration(context);
+        // forget the state of the queue in case HDFS contents changed while
+        // this processor was turned off
+        queueLock.lock();
+        try {
+            filePathQueue.clear();
+            processing.clear();
+        } finally {
+            queueLock.unlock();
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        // MoveHDFS
+        FlowFile parentFlowFile = session.get();
         if (parentFlowFile == null) {
             return;
         }
@@ -209,319 +206,318 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         Path inputPath = null;
         try {
             inputPath = new Path(filenameValue);
-            if(!hdfs.exists(inputPath)) {
-            	throw new IOException("Input Directory or File does not exist in HDFS");
+            if (!hdfs.exists(inputPath)) {
+                throw new IOException("Input Directory or File does not exist in HDFS");
             }
         } catch (Exception e) {
-            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, parentFlowFile, e});
+            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, parentFlowFile, e});
             parentFlowFile = session.putAttribute(parentFlowFile, "hdfs.failure.reason", e.getMessage());
             parentFlowFile = session.penalize(parentFlowFile);
-        	session.transfer(parentFlowFile, REL_FAILURE);
+            session.transfer(parentFlowFile, REL_FAILURE);
+            return;
+        }
+
+        List<Path> files = new ArrayList<Path>();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            Set<Path> listedFiles = performListing(context, inputPath);
+            stopWatch.stop();
+            final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
+            if (listedFiles != null) {
+                // place files into the work queue
+                int newItems = 0;
+                queueLock.lock();
+                try {
+                    for (Path file : listedFiles) {
+                        if (!filePathQueue.contains(file) && !processing.contains(file)) {
+                            if (!filePathQueue.offer(file)) {
+                                break;
+                            }
+                            newItems++;
+                        }
+                    }
+                } catch (Exception e) {
+                    getLogger().warn("Could not add to processing queue due to {}", new Object[]{e.getMessage()}, e);
+                } finally {
+                    queueLock.unlock();
+                }
+                if (listedFiles.size() > 0) {
+                    logEmptyListing.set(3L);
+                }
+                if (logEmptyListing.getAndDecrement() > 0) {
+                    getLogger().info(
+                            "Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
+                            new Object[]{millis, listedFiles.size(), newItems});
+                }
+            }
+        } catch (IOException e) {
+            context.yield();
+            getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
             return;
         }
+
+        // prepare to process a batch of files in the queue
+        queueLock.lock();
+        try {
+            filePathQueue.drainTo(files);
+            if (files.isEmpty()) {
+                // nothing to do!
+                session.remove(parentFlowFile);
+                context.yield();
+                return;
+            }
+        } finally {
+            queueLock.unlock();
+        }
+
+        processBatchOfFiles(files, context, session, parentFlowFile);
+
+        queueLock.lock();
+        try {
+            processing.removeAll(files);
+        } finally {
+            queueLock.unlock();
+        }
+
         session.remove(parentFlowFile);
+    }
+
+    protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
+                                       final ProcessSession session, FlowFile parentFlowFile) {
+        Preconditions.checkState(parentFlowFile != null, "No parent flowfile for this batch was provided");
+
+        // process the batch of files
+        final Configuration conf = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (conf == null || ugi == null) {
+            getLogger().error("Configuration or UserGroupInformation not configured properly");
+            session.transfer(parentFlowFile, REL_FAILURE);
+            context.yield();
+            return;
+        }
+
+        for (final Path file : files) {
+
+            ugi.doAs(new PrivilegedAction<Object>() {
+                @Override
+                public Object run() {
+                    FlowFile flowFile = session.create(parentFlowFile);
+                    try {
+                        final String originalFilename = file.getName();
+                        final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
+                        final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
+                        final boolean destinationExists = hdfs.exists(newFile);
+                        // If destination file already exists, resolve that
+                        // based on processor configuration
+                        if (destinationExists) {
+                            switch (processorConfig.getConflictResolution()) {
+                                case REPLACE_RESOLUTION:
+                                    if (hdfs.delete(file, false)) {
+                                        getLogger().info("deleted {} in order to replace with the contents of {}",
+                                                new Object[]{file, flowFile});
+                                    }
+                                    break;
+                                case IGNORE_RESOLUTION:
+                                    session.transfer(flowFile, REL_SUCCESS);
+                                    getLogger().info(
+                                            "transferring {} to success because file with same name already exists",
+                                            new Object[]{flowFile});
+                                    return null;
+                                case FAIL_RESOLUTION:
+                                    session.transfer(session.penalize(flowFile), REL_FAILURE);
+                                    getLogger().warn(
+                                            "penalizing {} and routing to failure because file with same name already exists",
+                                            new Object[]{flowFile});
+                                    return null;
+                                default:
+                                    break;
+                            }
+                        }
+
+                        // Create destination directory if it does not exist
+                        try {
+                            if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
+                                throw new IOException(configuredRootOutputDirPath.toString()
+                                        + " already exists and is not a directory");
+                            }
+                        } catch (FileNotFoundException fe) {
+                            if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
+                                throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
+                            }
+                            changeOwner(context, hdfs, configuredRootOutputDirPath);
+                        }
+
+                        boolean moved = false;
+                        for (int i = 0; i < 10; i++) { // try to rename multiple
+                            // times.
+                            if (processorConfig.getOperation().equals("move")) {
+                                if (hdfs.rename(file, newFile)) {
+                                    moved = true;
+                                    break;// rename was successful
+                                }
+                            } else {
+                                if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
+                                    moved = true;
+                                    break;// copy was successful
+                                }
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+                        }
+                        if (!moved) {
+                            throw new ProcessException("Could not move file " + file + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, newFile);
+                        final String outputPath = newFile.toString();
+                        final String newFilename = newFile.getName();
+                        final String hdfsPath = newFile.getParent().toString();
+                        flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
+                        flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                        final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
+                                : "hdfs://" + outputPath;
+                        session.getProvenanceReporter().send(flowFile, transitUri);
+                        session.transfer(flowFile, REL_SUCCESS);
+
+                    } catch (final Throwable t) {
+                        getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
+                        session.transfer(session.penalize(flowFile), REL_FAILURE);
+                        context.yield();
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
+    protected Set<Path> performListing(final ProcessContext context, Path path) throws IOException {
+        Set<Path> listing = null;
+
+        if (listingLock.tryLock()) {
+            try {
+                final FileSystem hdfs = getFileSystem();
+                // get listing
+                listing = selectFiles(hdfs, path, null);
+            } finally {
+                listingLock.unlock();
+            }
+        }
 
-		List<Path> files = new ArrayList<Path>();
-
-		try {
-			final StopWatch stopWatch = new StopWatch(true);
-			Set<Path> listedFiles = performListing(context, inputPath);
-			stopWatch.stop();
-			final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-
-			if (listedFiles != null) {
-				// place files into the work queue
-				int newItems = 0;
-				queueLock.lock();
-				try {
-					for (Path file : listedFiles) {
-						if (!filePathQueue.contains(file) && !processing.contains(file)) {
-							if (!filePathQueue.offer(file)) {
-								break;
-							}
-							newItems++;
-						}
-					}
-				} catch (Exception e) {
-					getLogger().warn("Could not add to processing queue due to {}", new Object[] { e });
-				} finally {
-					queueLock.unlock();
-				}
-				if (listedFiles.size() > 0) {
-					logEmptyListing.set(3L);
-				}
-				if (logEmptyListing.getAndDecrement() > 0) {
-					getLogger().info(
-							"Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
-							new Object[] { millis, listedFiles.size(), newItems });
-				}
-			}
-		} catch (IOException e) {
-			context.yield();
-			getLogger().warn("Error while retrieving list of files due to {}", new Object[] { e });
-			return;
-		}
-
-		// prepare to process a batch of files in the queue
-		queueLock.lock();
-		try {
-			filePathQueue.drainTo(files);
-			if (files.isEmpty()) {
-				// nothing to do!
-				context.yield();
-				return;
-			}
-		} finally {
-			queueLock.unlock();
-		}
-
-		processBatchOfFiles(files, context, session);
-
-		queueLock.lock();
-		try {
-			processing.removeAll(files);
-		} finally {
-			queueLock.unlock();
-		}
-	}
-
-	protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
-			final ProcessSession session) {
-		// process the batch of files
-		final Configuration conf = getConfiguration();
-		final FileSystem hdfs = getFileSystem();
-		final UserGroupInformation ugi = getUserGroupInformation();
-
-		if (conf == null || ugi == null) {
-			getLogger().error("Configuration or UserGroupInformation not configured properly");
-			session.transfer(session.get(), REL_FAILURE);
-			context.yield();
-			return;
-		}
-
-		for (final Path file : files) {
-
-			ugi.doAs(new PrivilegedAction<Object>() {
-				@Override
-				public Object run() {
-					FlowFile flowFile = session.create();
-					try {
-						final String originalFilename = file.getName();
-						final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
-						final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
-						final boolean destinationExists = hdfs.exists(newFile);
-						// If destination file already exists, resolve that
-						// based on processor configuration
-						if (destinationExists) {
-							switch (processorConfig.getConflictResolution()) {
-							case REPLACE_RESOLUTION:
-								if (hdfs.delete(file, false)) {
-									getLogger().info("deleted {} in order to replace with the contents of {}",
-											new Object[] { file, flowFile });
-								}
-								break;
-							case IGNORE_RESOLUTION:
-								session.transfer(flowFile, REL_SUCCESS);
-								getLogger().info(
-										"transferring {} to success because file with same name already exists",
-										new Object[] { flowFile });
-								return null;
-							case FAIL_RESOLUTION:
-								session.transfer(session.penalize(flowFile), REL_FAILURE);
-								getLogger().warn(
-										"penalizing {} and routing to failure because file with same name already exists",
-										new Object[] { flowFile });
-								return null;
-							default:
-								break;
-							}
-						}
-
-						// Create destination directory if it does not exist
-						try {
-							if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
-								throw new IOException(configuredRootOutputDirPath.toString()
-										+ " already exists and is not a directory");
-							}
-						} catch (FileNotFoundException fe) {
-							if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
-								throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
-							}
-							changeOwner(context, hdfs, configuredRootOutputDirPath);
-						}
-
-						boolean moved = false;
-						for (int i = 0; i < 10; i++) { // try to rename multiple
-														// times.
-							if (processorConfig.getOperation().equals("move")) {
-								if (hdfs.rename(file, newFile)) {
-									moved = true;
-									break;// rename was successful
-								}
-							} else {
-								if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
-									moved = true;
-									break;// copy was successful
-								}
-							}
-							Thread.sleep(200L);// try waiting to let whatever
-												// might cause rename failure to
-												// resolve
-						}
-						if (!moved) {
-							throw new ProcessException("Could not move file " + file + " to its final filename");
-						}
-
-						changeOwner(context, hdfs, file);
-						final String outputPath = newFile.toString();
-						final String newFilename = newFile.getName();
-						final String hdfsPath = newFile.getParent().toString();
-						flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
-						flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-						final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
-								: "hdfs://" + outputPath;
-						session.getProvenanceReporter().send(flowFile, transitUri);
-						session.transfer(flowFile, REL_SUCCESS);
-
-					} catch (final Throwable t) {
-						getLogger().error("Failed to rename on HDFS due to {}", new Object[] { t });
-						session.transfer(session.penalize(flowFile), REL_FAILURE);
-						context.yield();
-					}
-					return null;
-				}
-			});
-		}
-	}
-
-	protected Set<Path> performListing(final ProcessContext context, Path path) throws IOException {
-		Set<Path> listing = null;
-
-		if (listingLock.tryLock()) {
-			try {
-				final FileSystem hdfs = getFileSystem();
-				// get listing
-				listing = selectFiles(hdfs, path, null);
-			} finally {
-				listingLock.unlock();
-			}
-		}
-
-		return listing;
-	}
-
-	protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {
-		try {
-			// Change owner and group of file if configured to do so
-			String owner = context.getProperty(REMOTE_OWNER).getValue();
-			String group = context.getProperty(REMOTE_GROUP).getValue();
-			if (owner != null || group != null) {
-				hdfs.setOwner(name, owner, group);
-			}
-		} catch (Exception e) {
-			getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[] { name, e });
-		}
-	}
-
-	protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path> filesVisited)
-			throws IOException {
-		if (null == filesVisited) {
-			filesVisited = new HashSet<>();
-		}
-
-		if (!hdfs.exists(inputPath)) {
-			throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear to exist!");
-		}
-
-		final Set<Path> files = new HashSet<>();
-
-		FileStatus inputStatus = hdfs.getFileStatus(inputPath);
-
-		if (inputStatus.isDirectory()) {
-			for (final FileStatus file : hdfs.listStatus(inputPath)) {
-				final Path canonicalFile = file.getPath();
-
-				if (!filesVisited.add(canonicalFile)) { // skip files we've
-														// already
-					// seen (may be looping
-					// directory links)
-					continue;
-				}
-
-				if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
-					files.add(canonicalFile);
-
-					if (getLogger().isDebugEnabled()) {
-						getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
-					}
-				}
-			}
-		} else if (inputStatus.isFile()) {
-			files.add(inputPath);
-		}
-		return files;
-	}
-
-	protected static class ProcessorConfiguration {
-
-		final private String conflictResolution;
-		final private String operation;
-		final private Path inputRootDirPath;
-		final private Path outputRootDirPath;
-		final private Pattern fileFilterPattern;
-		final private boolean ignoreDottedFiles;
-
-		ProcessorConfiguration(final ProcessContext context) {
-			conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
-			operation = context.getProperty(OPERATION).getValue();
-			final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).getValue();
-			inputRootDirPath = new Path(inputDirValue);
-			final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).getValue();
-			outputRootDirPath = new Path(outputDirValue);
-			final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
-			fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
-			ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
-		}
-
-		public String getOperation() {
-			return operation;
-		}
-
-		public String getConflictResolution() {
-			return conflictResolution;
-		}
-
-		public Path getInput() {
-			return inputRootDirPath;
-		}
-
-		public Path getOutputDirectory() {
-			return outputRootDirPath;
-		}
-
-		protected PathFilter getPathFilter(final Path dir) {
-			return new PathFilter() {
-
-				@Override
-				public boolean accept(Path path) {
-					if (ignoreDottedFiles && path.getName().startsWith(".")) {
-						return false;
-					}
-					final String pathToCompare;
-					String relativePath = getPathDifference(dir, path);
-					if (relativePath.length() == 0) {
-						pathToCompare = path.getName();
-					} else {
-						pathToCompare = relativePath + Path.SEPARATOR + path.getName();
-					}
-
-					if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) {
-						return false;
-					}
-					return true;
-				}
-
-			};
-		}
-	}
+        return listing;
+    }
+
+    protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {
+        try {
+            // Change owner and group of file if configured to do so
+            String owner = context.getProperty(REMOTE_OWNER).getValue();
+            String group = context.getProperty(REMOTE_GROUP).getValue();
+            if (owner != null || group != null) {
+                hdfs.setOwner(name, owner, group);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e.getMessage()}, e);
+        }
+    }
+
+    protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path> filesVisited)
+            throws IOException {
+        if (null == filesVisited) {
+            filesVisited = new HashSet<>();
+        }
+
+        if (!hdfs.exists(inputPath)) {
+            throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear to exist!");
+        }
+
+        final Set<Path> files = new HashSet<>();
+
+        FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+
+        if (inputStatus.isDirectory()) {
+            for (final FileStatus file : hdfs.listStatus(inputPath)) {
+                final Path canonicalFile = file.getPath();
+
+                if (!filesVisited.add(canonicalFile)) { // skip files we've already seen (may be looping directory links)
+                    continue;
+                }
+
+                if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
+                    files.add(canonicalFile);
+
+                    if (getLogger().isDebugEnabled()) {
+                        getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
+                    }
+                }
+            }
+        } else if (inputStatus.isFile()) {
+            files.add(inputPath);
+        }
+        return files;
+    }
+
+    protected static class ProcessorConfiguration {
+
+        final private String conflictResolution;
+        final private String operation;
+        final private Path inputRootDirPath;
+        final private Path outputRootDirPath;
+        final private Pattern fileFilterPattern;
+        final private boolean ignoreDottedFiles;
+
+        ProcessorConfiguration(final ProcessContext context) {
+            conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+            operation = context.getProperty(OPERATION).getValue();
+            final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions().getValue();
+            inputRootDirPath = new Path(inputDirValue);
+            final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
+            outputRootDirPath = new Path(outputDirValue);
+            final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
+            fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
+            ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+        }
+
+        public String getOperation() {
+            return operation;
+        }
+
+        public String getConflictResolution() {
+            return conflictResolution;
+        }
+
+        public Path getInput() {
+            return inputRootDirPath;
+        }
+
+        public Path getOutputDirectory() {
+            return outputRootDirPath;
+        }
+
+        protected PathFilter getPathFilter(final Path dir) {
+            return new PathFilter() {
+
+                @Override
+                public boolean accept(Path path) {
+                    if (ignoreDottedFiles && path.getName().startsWith(".")) {
+                        return false;
+                    }
+                    final String pathToCompare;
+                    String relativePath = getPathDifference(dir, path);
+                    if (relativePath.length() == 0) {
+                        pathToCompare = path.getName();
+                    } else {
+                        pathToCompare = relativePath + Path.SEPARATOR + path.getName();
+                    }
+
+                    if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) {
+                        return false;
+                    }
+                    return true;
+                }
+
+            };
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index 6aecdd3..c55a2b1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -1,15 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.hadoop;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
+import org.apache.commons.io.FileUtils;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
@@ -23,173 +30,222 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class MoveHDFSTest {
 
-	private static final String OUTPUT_DIRECTORY = "src/test/resources/testdataoutput";
-	private static final String INPUT_DIRECTORY = "src/test/resources/testdata";
-	private static final String DOT_FILE_PATH = "src/test/resources/testdata/.testfordotfiles";
-	private NiFiProperties mockNiFiProperties;
-	private KerberosProperties kerberosProperties;
-
-	@Before
-	public void setup() {
-		mockNiFiProperties = mock(NiFiProperties.class);
-		when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
-		kerberosProperties = new KerberosProperties(null);
-	}
-
-	@After
-	public void teardown() {
-		File outputDirectory = new File(OUTPUT_DIRECTORY);
-		if (outputDirectory.exists()) {
-			if (outputDirectory.isDirectory()) {
-				moveFilesFromOutputDirectoryToInput();
-			}
-			outputDirectory.delete();
-		}
-		removeDotFile();
-	}
-
-	private void removeDotFile() {
-		File dotFile = new File(DOT_FILE_PATH);
-		if (dotFile.exists()) {
-			dotFile.delete();
-		}
-	}
-
-	private void moveFilesFromOutputDirectoryToInput() {
-		File folder = new File(OUTPUT_DIRECTORY);
-		for (File file : folder.listFiles()) {
-			if (file.isFile()) {
-				String path = file.getAbsolutePath();
-				if(!path.endsWith(".crc")) {
-					String newPath = path.replaceAll("testdataoutput", "testdata");
-					File newFile = new File(newPath);
-					if (!newFile.exists()) {
-						file.renameTo(newFile);
-					}
-				} else {
-					file.delete();
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testOutputDirectoryValidator() {
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		Collection<ValidationResult> results;
-		ProcessContext pc;
-
-		results = new HashSet<>();
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
-		runner.enqueue(new byte[0]);
-		pc = runner.getProcessContext();
-		if (pc instanceof MockProcessContext) {
-			results = ((MockProcessContext) pc).validate();
-		}
-		Assert.assertEquals(1, results.size());
-		for (ValidationResult vr : results) {
-			assertTrue(vr.toString().contains("Output Directory is required"));
-		}
-	}
-
-	@Test
-	public void testBothInputAndOutputDirectoriesAreValid() {
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		Collection<ValidationResult> results;
-		ProcessContext pc;
-
-		results = new HashSet<>();
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-		runner.enqueue(new byte[0]);
-		pc = runner.getProcessContext();
-		if (pc instanceof MockProcessContext) {
-			results = ((MockProcessContext) pc).validate();
-		}
-		Assert.assertEquals(0, results.size());
-	}
-
-	@Test
-	public void testOnScheduledShouldRunCleanly() {
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-		runner.enqueue(new byte[0]);
-		runner.setValidateExpressionUsage(false);
-		runner.run();
-		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-		Assert.assertEquals(7, flowFiles.size());
-	}
-	
-	@Test
-	public void testDotFileFilter() throws IOException {
-		createDotFile();
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
-		runner.enqueue(new byte[0]);
-		runner.setValidateExpressionUsage(false);
-		runner.run();
-		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-		Assert.assertEquals(8, flowFiles.size());
-	}
-	
-	@Test
-	public void testFileFilterRegex() {
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-		runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
-		runner.enqueue(new byte[0]);
-		runner.setValidateExpressionUsage(false);
-		runner.run();
-		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-		Assert.assertEquals(1, flowFiles.size());
-	}
-	
-	@Test
-	public void testSingleFileAsInput() {
-		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-		TestRunner runner = TestRunners.newTestRunner(proc);
-		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
-		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-		runner.enqueue(new byte[0]);
-		runner.setValidateExpressionUsage(false);
-		runner.run();
-		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-		Assert.assertEquals(1, flowFiles.size());
-	}
-
-	private void createDotFile() throws IOException {
-		File dotFile = new File(DOT_FILE_PATH);
-		dotFile.createNewFile();
-	}
-
-	private static class TestableMoveHDFS extends MoveHDFS {
-
-		private KerberosProperties testKerberosProperties;
-
-		public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
-			this.testKerberosProperties = testKerberosProperties;
-		}
-
-		@Override
-		protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
-			return testKerberosProperties;
-		}
-
-	}
+    private static final String OUTPUT_DIRECTORY = "target/test-data-output";
+    private static final String TEST_DATA_DIRECTORY = "src/test/resources/testdata";
+    private static final String INPUT_DIRECTORY = "target/test-data-input";
+    private NiFiProperties mockNiFiProperties;
+    private KerberosProperties kerberosProperties;
+
+    @Before
+    public void setup() {
+        mockNiFiProperties = mock(NiFiProperties.class);
+        when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+        kerberosProperties = new KerberosProperties(null);
+    }
+
+    @After
+    public void teardown() {
+        File inputDirectory = new File(INPUT_DIRECTORY);
+        File outputDirectory = new File(OUTPUT_DIRECTORY);
+        if (inputDirectory.exists()) {
+            Assert.assertTrue("Could not delete input directory: " + inputDirectory, FileUtils.deleteQuietly(inputDirectory));
+        }
+        if (outputDirectory.exists()) {
+            Assert.assertTrue("Could not delete output directory: " + outputDirectory, FileUtils.deleteQuietly(outputDirectory));
+        }
+    }
+
+    @Test
+    public void testOutputDirectoryValidator() {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            assertTrue(vr.toString().contains("Output Directory is required"));
+        }
+    }
+
+    @Test
+    public void testBothInputAndOutputDirectoriesAreValid() {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testOnScheduledShouldRunCleanly() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+    }
+
+    @Test
+    public void testDotFileFilterIgnore() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "true");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY, ".dotfile").exists());
+    }
+
+    @Test
+    public void testDotFileFilterInclude() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(8, flowFiles.size());
+    }
+
+    @Test
+    public void testFileFilterRegex() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+    }
+
+    @Test
+    public void testSingleFileAsInputCopy() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OPERATION, "copy");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY, "randombytes-1").exists());
+        Assert.assertTrue(new File(OUTPUT_DIRECTORY, "randombytes-1").exists());
+    }
+
+    @Test
+    public void testSingleFileAsInputMove() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertFalse(new File(INPUT_DIRECTORY, "randombytes-1").exists());
+        Assert.assertTrue(new File(OUTPUT_DIRECTORY, "randombytes-1").exists());
+    }
+
+    @Test
+    public void testDirectoryWithSubDirectoryAsInputMove() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
+        File subdir = new File(INPUT_DIRECTORY, "subdir");
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), subdir);
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY).exists());
+        Assert.assertTrue(subdir.exists());
+    }
+
+    @Test
+    public void testEmptyInputDirectory() throws IOException {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Files.createDirectories(Paths.get(INPUT_DIRECTORY));
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        Assert.assertEquals(0, Files.list(Paths.get(INPUT_DIRECTORY)).count());
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(0, flowFiles.size());
+    }
+
+    private static class TestableMoveHDFS extends MoveHDFS {
+
+        private KerberosProperties testKerberosProperties;
+
+        public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+            this.testKerberosProperties = testKerberosProperties;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
+            return testKerberosProperties;
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile
new file mode 100644
index 0000000..e69de29


Mime
View raw message