nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [1/2] nifi git commit: NIFI-3366 MoveHDFS processor supports expressions language for input and copy operations
Date Mon, 11 Dec 2017 13:42:01 GMT
Repository: nifi
Updated Branches:
  refs/heads/master c138987bb -> 600586d6b


NIFI-3366 MoveHDFS processor supports expressions language for input and copy operations

Signed-off-by: Jeff Storck <jtswork@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3731fbee
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3731fbee
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3731fbee

Branch: refs/heads/master
Commit: 3731fbee8883885a87f5f7548f46d9190aa3045d
Parents: c138987
Author: Gray Gwizdz <ggwizdz1@ford.com>
Authored: Wed Mar 15 09:40:02 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Mon Dec 11 08:41:25 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/MoveHDFS.java | 527 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/hadoop/MoveHDFSTest.java    | 195 +++++++
 3 files changed, 723 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
new file mode 100644
index 0000000..e9842b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+/**
+ * This processor renames files on HDFS.
+ */
+@Tags({ "hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS" })
+@CapabilityDescription("Rename existing files on Hadoop Distributed File System (HDFS)")
+@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS
comes from the value of this attribute.")
+@WritesAttributes({
+		@WritesAttribute(attribute = "filename", description = "The name of the file written to
HDFS is stored in this attribute."),
+		@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to
the file on HDFS is stored in this attribute.") })
+@SeeAlso({ PutHDFS.class, GetHDFS.class })
+public class MoveHDFS extends AbstractHadoopProcessor {
+
+	// static global
+	public static final int MAX_WORKING_QUEUE_SIZE = 25000;
+	public static final String REPLACE_RESOLUTION = "replace";
+	public static final String IGNORE_RESOLUTION = "ignore";
+	public static final String FAIL_RESOLUTION = "fail";
+
+	private static final Set<Relationship> relationships;
+
+	public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+			REPLACE_RESOLUTION, "Replaces the existing file if any.");
+	public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION,
IGNORE_RESOLUTION,
+			"Failed rename operation stops processing and routes to success.");
+	public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION,
FAIL_RESOLUTION,
+			"Failing to rename a file routes to failure.");
+
+	public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+	public static final int BUFFER_SIZE_DEFAULT = 4096;
+
+	public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
+
+	// relationships
+	public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+			.description("Files that have been successfully renamed on HDFS are transferred to this
relationship")
+			.build();
+
+	public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+			.description("Files that could not be renamed on HDFS are transferred to this relationship").build();
+
+	// properties
+	public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+			.name("Conflict Resolution Strategy")
+			.description(
+					"Indicates what should happen when a file with the same name already exists in the output
directory")
+			.required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
+			.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
+
+	public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
+			.name("File Filter Regex")
+			.description(
+					"A Java Regular Expression for filtering Filenames; if a filter is supplied then only
files whose names match that Regular "
+							+ "Expression will be fetched, otherwise all files will be fetched")
+			.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+	public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
+			.name("Ignore Dotted Files")
+			.description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
+			.allowableValues("true", "false").defaultValue("true").build();
+
+	public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
+			.name("Input Directory or File")
+			.description("The HDFS directory from which files should be read, or a single file to
read")
+			.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+			.expressionLanguageSupported(true).build();
+
+	public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output
Directory")
+			.description("The HDFS directory where the files will be moved to").required(true)
+			.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
+			.build();
+
+	public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS
Operation")
+			.description("The operation that will be performed on the source file").required(true)
+			.allowableValues("move", "copy").defaultValue("move").build();
+
+	public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote
Owner")
+			.description(
+					"Changes the owner of the HDFS file to this value after it is written. This only works
if NiFi is running as a user that has HDFS super user privilege to change owner")
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote
Group")
+			.description(
+					"Changes the group of the HDFS file to this value after it is written. This only works
if NiFi is running as a user that has HDFS super user privilege to change group")
+			.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+	static {
+		final Set<Relationship> rels = new HashSet<>();
+		rels.add(REL_SUCCESS);
+		rels.add(REL_FAILURE);
+		relationships = Collections.unmodifiableSet(rels);
+	}
+
+	// non-static global
+	protected ProcessorConfiguration processorConfig;
+	private final AtomicLong logEmptyListing = new AtomicLong(2L);
+
+	private final Lock listingLock = new ReentrantLock();
+	private final Lock queueLock = new ReentrantLock();
+
+	private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
+	private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
+
+	// methods
+	@Override
+	public Set<Relationship> getRelationships() {
+		return relationships;
+	}
+
+	@Override
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		List<PropertyDescriptor> props = new ArrayList<>(properties);
+		props.add(CONFLICT_RESOLUTION);
+		props.add(INPUT_DIRECTORY_OR_FILE);
+		props.add(OUTPUT_DIRECTORY);
+		props.add(OPERATION);
+		props.add(FILE_FILTER_REGEX);
+		props.add(IGNORE_DOTTED_FILES);
+		props.add(REMOTE_OWNER);
+		props.add(REMOTE_GROUP);
+		return props;
+	}
+
+	@OnScheduled
+	public void onScheduled(ProcessContext context) throws Exception {
+		super.abstractOnScheduled(context);
+		// copy configuration values to pass them around cleanly
+		processorConfig = new ProcessorConfiguration(context);
+		// forget the state of the queue in case HDFS contents changed while
+		// this processor was turned off
+		queueLock.lock();
+		try {
+			filePathQueue.clear();
+			processing.clear();
+		} finally {
+			queueLock.unlock();
+		}
+	}
+
+	@Override
+	public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
+		// MoveHDFS
+		FlowFile parentFlowFile = session.get();
+        if (parentFlowFile == null) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(parentFlowFile).getValue();
+
+        Path inputPath = null;
+        try {
+            inputPath = new Path(filenameValue);
+            if(!hdfs.exists(inputPath)) {
+            	throw new IOException("Input Directory or File does not exist in HDFS");
+            }
+        } catch (Exception e) {
+            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing
to failure", new Object[] {filenameValue, parentFlowFile, e});
+            parentFlowFile = session.putAttribute(parentFlowFile, "hdfs.failure.reason",
e.getMessage());
+            parentFlowFile = session.penalize(parentFlowFile);
+        	session.transfer(parentFlowFile, REL_FAILURE);
+            return;
+        }
+        session.remove(parentFlowFile);
+
+		List<Path> files = new ArrayList<Path>();
+
+		try {
+			final StopWatch stopWatch = new StopWatch(true);
+			Set<Path> listedFiles = performListing(context, inputPath);
+			stopWatch.stop();
+			final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
+			if (listedFiles != null) {
+				// place files into the work queue
+				int newItems = 0;
+				queueLock.lock();
+				try {
+					for (Path file : listedFiles) {
+						if (!filePathQueue.contains(file) && !processing.contains(file)) {
+							if (!filePathQueue.offer(file)) {
+								break;
+							}
+							newItems++;
+						}
+					}
+				} catch (Exception e) {
+					getLogger().warn("Could not add to processing queue due to {}", new Object[] { e });
+				} finally {
+					queueLock.unlock();
+				}
+				if (listedFiles.size() > 0) {
+					logEmptyListing.set(3L);
+				}
+				if (logEmptyListing.getAndDecrement() > 0) {
+					getLogger().info(
+							"Obtained file listing in {} milliseconds; listing had {} items, {} of which were
new",
+							new Object[] { millis, listedFiles.size(), newItems });
+				}
+			}
+		} catch (IOException e) {
+			context.yield();
+			getLogger().warn("Error while retrieving list of files due to {}", new Object[] { e });
+			return;
+		}
+
+		// prepare to process a batch of files in the queue
+		queueLock.lock();
+		try {
+			filePathQueue.drainTo(files);
+			if (files.isEmpty()) {
+				// nothing to do!
+				context.yield();
+				return;
+			}
+		} finally {
+			queueLock.unlock();
+		}
+
+		processBatchOfFiles(files, context, session);
+
+		queueLock.lock();
+		try {
+			processing.removeAll(files);
+		} finally {
+			queueLock.unlock();
+		}
+	}
+
+	protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
+			final ProcessSession session) {
+		// process the batch of files
+		final Configuration conf = getConfiguration();
+		final FileSystem hdfs = getFileSystem();
+		final UserGroupInformation ugi = getUserGroupInformation();
+
+		if (conf == null || ugi == null) {
+			getLogger().error("Configuration or UserGroupInformation not configured properly");
+			session.transfer(session.get(), REL_FAILURE);
+			context.yield();
+			return;
+		}
+
+		for (final Path file : files) {
+
+			ugi.doAs(new PrivilegedAction<Object>() {
+				@Override
+				public Object run() {
+					FlowFile flowFile = session.create();
+					try {
+						final String originalFilename = file.getName();
+						final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
+						final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
+						final boolean destinationExists = hdfs.exists(newFile);
+						// If destination file already exists, resolve that
+						// based on processor configuration
+						if (destinationExists) {
+							switch (processorConfig.getConflictResolution()) {
+							case REPLACE_RESOLUTION:
+								if (hdfs.delete(file, false)) {
+									getLogger().info("deleted {} in order to replace with the contents of {}",
+											new Object[] { file, flowFile });
+								}
+								break;
+							case IGNORE_RESOLUTION:
+								session.transfer(flowFile, REL_SUCCESS);
+								getLogger().info(
+										"transferring {} to success because file with same name already exists",
+										new Object[] { flowFile });
+								return null;
+							case FAIL_RESOLUTION:
+								session.transfer(session.penalize(flowFile), REL_FAILURE);
+								getLogger().warn(
+										"penalizing {} and routing to failure because file with same name already exists",
+										new Object[] { flowFile });
+								return null;
+							default:
+								break;
+							}
+						}
+
+						// Create destination directory if it does not exist
+						try {
+							if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
+								throw new IOException(configuredRootOutputDirPath.toString()
+										+ " already exists and is not a directory");
+							}
+						} catch (FileNotFoundException fe) {
+							if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
+								throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
+							}
+							changeOwner(context, hdfs, configuredRootOutputDirPath);
+						}
+
+						boolean moved = false;
+						for (int i = 0; i < 10; i++) { // try to rename multiple
+														// times.
+							if (processorConfig.getOperation().equals("move")) {
+								if (hdfs.rename(file, newFile)) {
+									moved = true;
+									break;// rename was successful
+								}
+							} else {
+								if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
+									moved = true;
+									break;// copy was successful
+								}
+							}
+							Thread.sleep(200L);// try waiting to let whatever
+												// might cause rename failure to
+												// resolve
+						}
+						if (!moved) {
+							throw new ProcessException("Could not move file " + file + " to its final filename");
+						}
+
+						changeOwner(context, hdfs, file);
+						final String outputPath = newFile.toString();
+						final String newFilename = newFile.getName();
+						final String hdfsPath = newFile.getParent().toString();
+						flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
+						flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+						final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
+								: "hdfs://" + outputPath;
+						session.getProvenanceReporter().send(flowFile, transitUri);
+						session.transfer(flowFile, REL_SUCCESS);
+
+					} catch (final Throwable t) {
+						getLogger().error("Failed to rename on HDFS due to {}", new Object[] { t });
+						session.transfer(session.penalize(flowFile), REL_FAILURE);
+						context.yield();
+					}
+					return null;
+				}
+			});
+		}
+	}
+
+	protected Set<Path> performListing(final ProcessContext context, Path path) throws
IOException {
+		Set<Path> listing = null;
+
+		if (listingLock.tryLock()) {
+			try {
+				final FileSystem hdfs = getFileSystem();
+				// get listing
+				listing = selectFiles(hdfs, path, null);
+			} finally {
+				listingLock.unlock();
+			}
+		}
+
+		return listing;
+	}
+
+	protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path
name) {
+		try {
+			// Change owner and group of file if configured to do so
+			String owner = context.getProperty(REMOTE_OWNER).getValue();
+			String group = context.getProperty(REMOTE_GROUP).getValue();
+			if (owner != null || group != null) {
+				hdfs.setOwner(name, owner, group);
+			}
+		} catch (Exception e) {
+			getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]
{ name, e });
+		}
+	}
+
+	protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path>
filesVisited)
+			throws IOException {
+		if (null == filesVisited) {
+			filesVisited = new HashSet<>();
+		}
+
+		if (!hdfs.exists(inputPath)) {
+			throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear
to exist!");
+		}
+
+		final Set<Path> files = new HashSet<>();
+
+		FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+
+		if (inputStatus.isDirectory()) {
+			for (final FileStatus file : hdfs.listStatus(inputPath)) {
+				final Path canonicalFile = file.getPath();
+
+				if (!filesVisited.add(canonicalFile)) { // skip files we've
+														// already
+					// seen (may be looping
+					// directory links)
+					continue;
+				}
+
+				if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile))
{
+					files.add(canonicalFile);
+
+					if (getLogger().isDebugEnabled()) {
+						getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
+					}
+				}
+			}
+		} else if (inputStatus.isFile()) {
+			files.add(inputPath);
+		}
+		return files;
+	}
+
+	protected static class ProcessorConfiguration {
+
+		final private String conflictResolution;
+		final private String operation;
+		final private Path inputRootDirPath;
+		final private Path outputRootDirPath;
+		final private Pattern fileFilterPattern;
+		final private boolean ignoreDottedFiles;
+
+		ProcessorConfiguration(final ProcessContext context) {
+			conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+			operation = context.getProperty(OPERATION).getValue();
+			final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).getValue();
+			inputRootDirPath = new Path(inputDirValue);
+			final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).getValue();
+			outputRootDirPath = new Path(outputDirValue);
+			final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
+			fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
+			ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+		}
+
+		public String getOperation() {
+			return operation;
+		}
+
+		public String getConflictResolution() {
+			return conflictResolution;
+		}
+
+		public Path getInput() {
+			return inputRootDirPath;
+		}
+
+		public Path getOutputDirectory() {
+			return outputRootDirPath;
+		}
+
+		protected PathFilter getPathFilter(final Path dir) {
+			return new PathFilter() {
+
+				@Override
+				public boolean accept(Path path) {
+					if (ignoreDottedFiles && path.getName().startsWith(".")) {
+						return false;
+					}
+					final String pathToCompare;
+					String relativePath = getPathDifference(dir, path);
+					if (relativePath.length() == 0) {
+						pathToCompare = path.getName();
+					} else {
+						pathToCompare = relativePath + Path.SEPARATOR + path.getName();
+					}
+
+					if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches())
{
+						return false;
+					}
+					return true;
+				}
+
+			};
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 165ec2c..920776a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,3 +20,4 @@ org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents
 org.apache.nifi.processors.hadoop.ListHDFS
 org.apache.nifi.processors.hadoop.PutHDFS
 org.apache.nifi.processors.hadoop.DeleteHDFS
+org.apache.nifi.processors.hadoop.MoveHDFS

http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
new file mode 100644
index 0000000..6aecdd3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -0,0 +1,195 @@
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MoveHDFSTest {
+
+	private static final String OUTPUT_DIRECTORY = "src/test/resources/testdataoutput";
+	private static final String INPUT_DIRECTORY = "src/test/resources/testdata";
+	private static final String DOT_FILE_PATH = "src/test/resources/testdata/.testfordotfiles";
+	private NiFiProperties mockNiFiProperties;
+	private KerberosProperties kerberosProperties;
+
+	@Before
+	public void setup() {
+		mockNiFiProperties = mock(NiFiProperties.class);
+		when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+		kerberosProperties = new KerberosProperties(null);
+	}
+
+	@After
+	public void teardown() {
+		File outputDirectory = new File(OUTPUT_DIRECTORY);
+		if (outputDirectory.exists()) {
+			if (outputDirectory.isDirectory()) {
+				moveFilesFromOutputDirectoryToInput();
+			}
+			outputDirectory.delete();
+		}
+		removeDotFile();
+	}
+
+	private void removeDotFile() {
+		File dotFile = new File(DOT_FILE_PATH);
+		if (dotFile.exists()) {
+			dotFile.delete();
+		}
+	}
+
+	private void moveFilesFromOutputDirectoryToInput() {
+		File folder = new File(OUTPUT_DIRECTORY);
+		for (File file : folder.listFiles()) {
+			if (file.isFile()) {
+				String path = file.getAbsolutePath();
+				if(!path.endsWith(".crc")) {
+					String newPath = path.replaceAll("testdataoutput", "testdata");
+					File newFile = new File(newPath);
+					if (!newFile.exists()) {
+						file.renameTo(newFile);
+					}
+				} else {
+					file.delete();
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testOutputDirectoryValidator() {
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		Collection<ValidationResult> results;
+		ProcessContext pc;
+
+		results = new HashSet<>();
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
+		runner.enqueue(new byte[0]);
+		pc = runner.getProcessContext();
+		if (pc instanceof MockProcessContext) {
+			results = ((MockProcessContext) pc).validate();
+		}
+		Assert.assertEquals(1, results.size());
+		for (ValidationResult vr : results) {
+			assertTrue(vr.toString().contains("Output Directory is required"));
+		}
+	}
+
+	@Test
+	public void testBothInputAndOutputDirectoriesAreValid() {
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		Collection<ValidationResult> results;
+		ProcessContext pc;
+
+		results = new HashSet<>();
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+		runner.enqueue(new byte[0]);
+		pc = runner.getProcessContext();
+		if (pc instanceof MockProcessContext) {
+			results = ((MockProcessContext) pc).validate();
+		}
+		Assert.assertEquals(0, results.size());
+	}
+
+	@Test
+	public void testOnScheduledShouldRunCleanly() {
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+		runner.enqueue(new byte[0]);
+		runner.setValidateExpressionUsage(false);
+		runner.run();
+		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+		Assert.assertEquals(7, flowFiles.size());
+	}
+	
+	@Test
+	public void testDotFileFilter() throws IOException {
+		createDotFile();
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
+		runner.enqueue(new byte[0]);
+		runner.setValidateExpressionUsage(false);
+		runner.run();
+		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+		Assert.assertEquals(8, flowFiles.size());
+	}
+	
+	@Test
+	public void testFileFilterRegex() {
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+		runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
+		runner.enqueue(new byte[0]);
+		runner.setValidateExpressionUsage(false);
+		runner.run();
+		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+		Assert.assertEquals(1, flowFiles.size());
+	}
+	
+	@Test
+	public void testSingleFileAsInput() {
+		MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+		TestRunner runner = TestRunners.newTestRunner(proc);
+		runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
+		runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+		runner.enqueue(new byte[0]);
+		runner.setValidateExpressionUsage(false);
+		runner.run();
+		List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+		runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+		Assert.assertEquals(1, flowFiles.size());
+	}
+
+	private void createDotFile() throws IOException {
+		File dotFile = new File(DOT_FILE_PATH);
+		dotFile.createNewFile();
+	}
+
+	private static class TestableMoveHDFS extends MoveHDFS {
+
+		private KerberosProperties testKerberosProperties;
+
+		public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+			this.testKerberosProperties = testKerberosProperties;
+		}
+
+		@Override
+		protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
+			return testKerberosProperties;
+		}
+
+	}
+
+}


Mime
View raw message