Repository: nifi
Updated Branches:
refs/heads/master c138987bb -> 600586d6b
NIFI-3366 MoveHDFS processor supports expressions language for input and copy operations
Signed-off-by: Jeff Storck <jtswork@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3731fbee
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3731fbee
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3731fbee
Branch: refs/heads/master
Commit: 3731fbee8883885a87f5f7548f46d9190aa3045d
Parents: c138987
Author: Gray Gwizdz <ggwizdz1@ford.com>
Authored: Wed Mar 15 09:40:02 2017 -0400
Committer: joewitt <joewitt@apache.org>
Committed: Mon Dec 11 08:41:25 2017 -0500
----------------------------------------------------------------------
.../apache/nifi/processors/hadoop/MoveHDFS.java | 527 +++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../nifi/processors/hadoop/MoveHDFSTest.java | 195 +++++++
3 files changed, 723 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
new file mode 100644
index 0000000..e9842b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+/**
+ * This processor renames files on HDFS.
+ */
+@Tags({ "hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS" })
+@CapabilityDescription("Rename existing files on Hadoop Distributed File System (HDFS)")
+@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS
comes from the value of this attribute.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The name of the file written to
HDFS is stored in this attribute."),
+ @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to
the file on HDFS is stored in this attribute.") })
+@SeeAlso({ PutHDFS.class, GetHDFS.class })
+public class MoveHDFS extends AbstractHadoopProcessor {
+
+ // static global
+ public static final int MAX_WORKING_QUEUE_SIZE = 25000;
+ public static final String REPLACE_RESOLUTION = "replace";
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String FAIL_RESOLUTION = "fail";
+
+ private static final Set<Relationship> relationships;
+
+ public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+ REPLACE_RESOLUTION, "Replaces the existing file if any.");
+ public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION,
IGNORE_RESOLUTION,
+ "Failed rename operation stops processing and routes to success.");
+ public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION,
FAIL_RESOLUTION,
+ "Failing to rename a file routes to failure.");
+
+ public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+ public static final int BUFFER_SIZE_DEFAULT = 4096;
+
+ public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
+
+ // relationships
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("Files that have been successfully renamed on HDFS are transferred to this
relationship")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("Files that could not be renamed on HDFS are transferred to this relationship").build();
+
+ // properties
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("Conflict Resolution Strategy")
+ .description(
+ "Indicates what should happen when a file with the same name already exists in the output
directory")
+ .required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
+ .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
+
+ public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
+ .name("File Filter Regex")
+ .description(
+ "A Java Regular Expression for filtering Filenames; if a filter is supplied then only
files whose names match that Regular "
+ + "Expression will be fetched, otherwise all files will be fetched")
+ .required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
+ .name("Ignore Dotted Files")
+ .description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
+ .allowableValues("true", "false").defaultValue("true").build();
+
+ public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
+ .name("Input Directory or File")
+ .description("The HDFS directory from which files should be read, or a single file to
read")
+ .defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .expressionLanguageSupported(true).build();
+
+ public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output
Directory")
+ .description("The HDFS directory where the files will be moved to").required(true)
+ .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS
Operation")
+ .description("The operation that will be performed on the source file").required(true)
+ .allowableValues("move", "copy").defaultValue("move").build();
+
+ public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote
Owner")
+ .description(
+ "Changes the owner of the HDFS file to this value after it is written. This only works
if NiFi is running as a user that has HDFS super user privilege to change owner")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote
Group")
+ .description(
+ "Changes the group of the HDFS file to this value after it is written. This only works
if NiFi is running as a user that has HDFS super user privilege to change group")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ // non-static global
+ protected ProcessorConfiguration processorConfig;
+ private final AtomicLong logEmptyListing = new AtomicLong(2L);
+
+ private final Lock listingLock = new ReentrantLock();
+ private final Lock queueLock = new ReentrantLock();
+
+ private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
+
+ // methods
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> props = new ArrayList<>(properties);
+ props.add(CONFLICT_RESOLUTION);
+ props.add(INPUT_DIRECTORY_OR_FILE);
+ props.add(OUTPUT_DIRECTORY);
+ props.add(OPERATION);
+ props.add(FILE_FILTER_REGEX);
+ props.add(IGNORE_DOTTED_FILES);
+ props.add(REMOTE_OWNER);
+ props.add(REMOTE_GROUP);
+ return props;
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws Exception {
+ super.abstractOnScheduled(context);
+ // copy configuration values to pass them around cleanly
+ processorConfig = new ProcessorConfiguration(context);
+ // forget the state of the queue in case HDFS contents changed while
+ // this processor was turned off
+ queueLock.lock();
+ try {
+ filePathQueue.clear();
+ processing.clear();
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
+ // MoveHDFS
+ FlowFile parentFlowFile = session.get();
+ if (parentFlowFile == null) {
+ return;
+ }
+
+ final FileSystem hdfs = getFileSystem();
+ final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(parentFlowFile).getValue();
+
+ Path inputPath = null;
+ try {
+ inputPath = new Path(filenameValue);
+ if(!hdfs.exists(inputPath)) {
+ throw new IOException("Input Directory or File does not exist in HDFS");
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to retrieve content from {} for {} due to {}; routing
to failure", new Object[] {filenameValue, parentFlowFile, e});
+ parentFlowFile = session.putAttribute(parentFlowFile, "hdfs.failure.reason",
e.getMessage());
+ parentFlowFile = session.penalize(parentFlowFile);
+ session.transfer(parentFlowFile, REL_FAILURE);
+ return;
+ }
+ session.remove(parentFlowFile);
+
+ List<Path> files = new ArrayList<Path>();
+
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ Set<Path> listedFiles = performListing(context, inputPath);
+ stopWatch.stop();
+ final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
+ if (listedFiles != null) {
+ // place files into the work queue
+ int newItems = 0;
+ queueLock.lock();
+ try {
+ for (Path file : listedFiles) {
+ if (!filePathQueue.contains(file) && !processing.contains(file)) {
+ if (!filePathQueue.offer(file)) {
+ break;
+ }
+ newItems++;
+ }
+ }
+ } catch (Exception e) {
+ getLogger().warn("Could not add to processing queue due to {}", new Object[] { e });
+ } finally {
+ queueLock.unlock();
+ }
+ if (listedFiles.size() > 0) {
+ logEmptyListing.set(3L);
+ }
+ if (logEmptyListing.getAndDecrement() > 0) {
+ getLogger().info(
+ "Obtained file listing in {} milliseconds; listing had {} items, {} of which were
new",
+ new Object[] { millis, listedFiles.size(), newItems });
+ }
+ }
+ } catch (IOException e) {
+ context.yield();
+ getLogger().warn("Error while retrieving list of files due to {}", new Object[] { e });
+ return;
+ }
+
+ // prepare to process a batch of files in the queue
+ queueLock.lock();
+ try {
+ filePathQueue.drainTo(files);
+ if (files.isEmpty()) {
+ // nothing to do!
+ context.yield();
+ return;
+ }
+ } finally {
+ queueLock.unlock();
+ }
+
+ processBatchOfFiles(files, context, session);
+
+ queueLock.lock();
+ try {
+ processing.removeAll(files);
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
+ final ProcessSession session) {
+ // process the batch of files
+ final Configuration conf = getConfiguration();
+ final FileSystem hdfs = getFileSystem();
+ final UserGroupInformation ugi = getUserGroupInformation();
+
+ if (conf == null || ugi == null) {
+ getLogger().error("Configuration or UserGroupInformation not configured properly");
+ session.transfer(session.get(), REL_FAILURE);
+ context.yield();
+ return;
+ }
+
+ for (final Path file : files) {
+
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ FlowFile flowFile = session.create();
+ try {
+ final String originalFilename = file.getName();
+ final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
+ final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
+ final boolean destinationExists = hdfs.exists(newFile);
+ // If destination file already exists, resolve that
+ // based on processor configuration
+ if (destinationExists) {
+ switch (processorConfig.getConflictResolution()) {
+ case REPLACE_RESOLUTION:
+ if (hdfs.delete(file, false)) {
+ getLogger().info("deleted {} in order to replace with the contents of {}",
+ new Object[] { file, flowFile });
+ }
+ break;
+ case IGNORE_RESOLUTION:
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info(
+ "transferring {} to success because file with same name already exists",
+ new Object[] { flowFile });
+ return null;
+ case FAIL_RESOLUTION:
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ getLogger().warn(
+ "penalizing {} and routing to failure because file with same name already exists",
+ new Object[] { flowFile });
+ return null;
+ default:
+ break;
+ }
+ }
+
+ // Create destination directory if it does not exist
+ try {
+ if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
+ throw new IOException(configuredRootOutputDirPath.toString()
+ + " already exists and is not a directory");
+ }
+ } catch (FileNotFoundException fe) {
+ if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
+ throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
+ }
+ changeOwner(context, hdfs, configuredRootOutputDirPath);
+ }
+
+ boolean moved = false;
+ for (int i = 0; i < 10; i++) { // try to rename multiple
+ // times.
+ if (processorConfig.getOperation().equals("move")) {
+ if (hdfs.rename(file, newFile)) {
+ moved = true;
+ break;// rename was successful
+ }
+ } else {
+ if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
+ moved = true;
+ break;// copy was successful
+ }
+ }
+ Thread.sleep(200L);// try waiting to let whatever
+ // might cause rename failure to
+ // resolve
+ }
+ if (!moved) {
+ throw new ProcessException("Could not move file " + file + " to its final filename");
+ }
+
+ changeOwner(context, hdfs, file);
+ final String outputPath = newFile.toString();
+ final String newFilename = newFile.getName();
+ final String hdfsPath = newFile.getParent().toString();
+ flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
+ flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+ final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
+ : "hdfs://" + outputPath;
+ session.getProvenanceReporter().send(flowFile, transitUri);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ } catch (final Throwable t) {
+ getLogger().error("Failed to rename on HDFS due to {}", new Object[] { t });
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ context.yield();
+ }
+ return null;
+ }
+ });
+ }
+ }
+
+ protected Set<Path> performListing(final ProcessContext context, Path path) throws
IOException {
+ Set<Path> listing = null;
+
+ if (listingLock.tryLock()) {
+ try {
+ final FileSystem hdfs = getFileSystem();
+ // get listing
+ listing = selectFiles(hdfs, path, null);
+ } finally {
+ listingLock.unlock();
+ }
+ }
+
+ return listing;
+ }
+
+ protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path
name) {
+ try {
+ // Change owner and group of file if configured to do so
+ String owner = context.getProperty(REMOTE_OWNER).getValue();
+ String group = context.getProperty(REMOTE_GROUP).getValue();
+ if (owner != null || group != null) {
+ hdfs.setOwner(name, owner, group);
+ }
+ } catch (Exception e) {
+ getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]
{ name, e });
+ }
+ }
+
+ protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path>
filesVisited)
+ throws IOException {
+ if (null == filesVisited) {
+ filesVisited = new HashSet<>();
+ }
+
+ if (!hdfs.exists(inputPath)) {
+ throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear
to exist!");
+ }
+
+ final Set<Path> files = new HashSet<>();
+
+ FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+
+ if (inputStatus.isDirectory()) {
+ for (final FileStatus file : hdfs.listStatus(inputPath)) {
+ final Path canonicalFile = file.getPath();
+
+ if (!filesVisited.add(canonicalFile)) { // skip files we've
+ // already
+ // seen (may be looping
+ // directory links)
+ continue;
+ }
+
+ if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile))
{
+ files.add(canonicalFile);
+
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
+ }
+ }
+ }
+ } else if (inputStatus.isFile()) {
+ files.add(inputPath);
+ }
+ return files;
+ }
+
+ protected static class ProcessorConfiguration {
+
+ final private String conflictResolution;
+ final private String operation;
+ final private Path inputRootDirPath;
+ final private Path outputRootDirPath;
+ final private Pattern fileFilterPattern;
+ final private boolean ignoreDottedFiles;
+
+ ProcessorConfiguration(final ProcessContext context) {
+ conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+ operation = context.getProperty(OPERATION).getValue();
+ final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).getValue();
+ inputRootDirPath = new Path(inputDirValue);
+ final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).getValue();
+ outputRootDirPath = new Path(outputDirValue);
+ final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
+ fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
+ ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+ }
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public String getConflictResolution() {
+ return conflictResolution;
+ }
+
+ public Path getInput() {
+ return inputRootDirPath;
+ }
+
+ public Path getOutputDirectory() {
+ return outputRootDirPath;
+ }
+
+ protected PathFilter getPathFilter(final Path dir) {
+ return new PathFilter() {
+
+ @Override
+ public boolean accept(Path path) {
+ if (ignoreDottedFiles && path.getName().startsWith(".")) {
+ return false;
+ }
+ final String pathToCompare;
+ String relativePath = getPathDifference(dir, path);
+ if (relativePath.length() == 0) {
+ pathToCompare = path.getName();
+ } else {
+ pathToCompare = relativePath + Path.SEPARATOR + path.getName();
+ }
+
+ if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches())
{
+ return false;
+ }
+ return true;
+ }
+
+ };
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 165ec2c..920776a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,3 +20,4 @@ org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents
org.apache.nifi.processors.hadoop.ListHDFS
org.apache.nifi.processors.hadoop.PutHDFS
org.apache.nifi.processors.hadoop.DeleteHDFS
+org.apache.nifi.processors.hadoop.MoveHDFS
http://git-wip-us.apache.org/repos/asf/nifi/blob/3731fbee/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
new file mode 100644
index 0000000..6aecdd3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -0,0 +1,195 @@
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MoveHDFSTest {
+
+ private static final String OUTPUT_DIRECTORY = "src/test/resources/testdataoutput";
+ private static final String INPUT_DIRECTORY = "src/test/resources/testdata";
+ private static final String DOT_FILE_PATH = "src/test/resources/testdata/.testfordotfiles";
+ private NiFiProperties mockNiFiProperties;
+ private KerberosProperties kerberosProperties;
+
+ @Before
+ public void setup() {
+ mockNiFiProperties = mock(NiFiProperties.class);
+ when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+ kerberosProperties = new KerberosProperties(null);
+ }
+
+ @After
+ public void teardown() {
+ File outputDirectory = new File(OUTPUT_DIRECTORY);
+ if (outputDirectory.exists()) {
+ if (outputDirectory.isDirectory()) {
+ moveFilesFromOutputDirectoryToInput();
+ }
+ outputDirectory.delete();
+ }
+ removeDotFile();
+ }
+
+ private void removeDotFile() {
+ File dotFile = new File(DOT_FILE_PATH);
+ if (dotFile.exists()) {
+ dotFile.delete();
+ }
+ }
+
+ private void moveFilesFromOutputDirectoryToInput() {
+ File folder = new File(OUTPUT_DIRECTORY);
+ for (File file : folder.listFiles()) {
+ if (file.isFile()) {
+ String path = file.getAbsolutePath();
+ if(!path.endsWith(".crc")) {
+ String newPath = path.replaceAll("testdataoutput", "testdata");
+ File newFile = new File(newPath);
+ if (!newFile.exists()) {
+ file.renameTo(newFile);
+ }
+ } else {
+ file.delete();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testOutputDirectoryValidator() {
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ Collection<ValidationResult> results;
+ ProcessContext pc;
+
+ results = new HashSet<>();
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
+ runner.enqueue(new byte[0]);
+ pc = runner.getProcessContext();
+ if (pc instanceof MockProcessContext) {
+ results = ((MockProcessContext) pc).validate();
+ }
+ Assert.assertEquals(1, results.size());
+ for (ValidationResult vr : results) {
+ assertTrue(vr.toString().contains("Output Directory is required"));
+ }
+ }
+
+ @Test
+ public void testBothInputAndOutputDirectoriesAreValid() {
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ Collection<ValidationResult> results;
+ ProcessContext pc;
+
+ results = new HashSet<>();
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.enqueue(new byte[0]);
+ pc = runner.getProcessContext();
+ if (pc instanceof MockProcessContext) {
+ results = ((MockProcessContext) pc).validate();
+ }
+ Assert.assertEquals(0, results.size());
+ }
+
+ @Test
+ public void testOnScheduledShouldRunCleanly() {
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.enqueue(new byte[0]);
+ runner.setValidateExpressionUsage(false);
+ runner.run();
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+ runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+ Assert.assertEquals(7, flowFiles.size());
+ }
+
+ @Test
+ public void testDotFileFilter() throws IOException {
+ createDotFile();
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
+ runner.enqueue(new byte[0]);
+ runner.setValidateExpressionUsage(false);
+ runner.run();
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+ runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+ Assert.assertEquals(8, flowFiles.size());
+ }
+
+ @Test
+ public void testFileFilterRegex() {
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
+ runner.enqueue(new byte[0]);
+ runner.setValidateExpressionUsage(false);
+ runner.run();
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+ runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+ Assert.assertEquals(1, flowFiles.size());
+ }
+
+ @Test
+ public void testSingleFileAsInput() {
+ MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.enqueue(new byte[0]);
+ runner.setValidateExpressionUsage(false);
+ runner.run();
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+ runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+ Assert.assertEquals(1, flowFiles.size());
+ }
+
+ private void createDotFile() throws IOException {
+ File dotFile = new File(DOT_FILE_PATH);
+ dotFile.createNewFile();
+ }
+
+ private static class TestableMoveHDFS extends MoveHDFS {
+
+ private KerberosProperties testKerberosProperties;
+
+ public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+ this.testKerberosProperties = testKerberosProperties;
+ }
+
+ @Override
+ protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
+ return testKerberosProperties;
+ }
+
+ }
+
+}
|