nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [67/79] [abbrv] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure
Date Thu, 22 Jan 2015 17:04:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 0000000,20ac738..0610d8f
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@@ -1,0 -1,563 +1,562 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
 -
+ import org.apache.commons.io.IOUtils;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.PathFilter;
+ 
+ /**
+  * This processor reads files from HDFS into NiFi FlowFiles.
+  */
+ @TriggerWhenEmpty
+ @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
+ @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles")
+ public class GetHDFS extends AbstractHadoopProcessor {
+ 
+     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+     public static final int BUFFER_SIZE_DEFAULT = 4096;
+     public static final int MAX_WORKING_QUEUE_SIZE = 25000;
+ 
+     // relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("All files retrieved from HDFS are transferred to this relationship")
+             .build();
+ 
+     public static final Relationship REL_PASSTHROUGH = new Relationship.Builder()
+             .name("passthrough")
+             .description(
+                     "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
+             .build();
+ 
+     // properties
+     public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+             .name(DIRECTORY_PROP_NAME)
+             .description("The HDFS directory from which files should be read")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
+             .name("Recurse Subdirectories")
+             .description("Indicates whether to pull files from subdirectories of the HDFS directory")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
+             .name("Keep Source File")
+             .description("Determines whether to delete the file from HDFS after it has been successfully transferred")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
+             .name("File Filter Regex")
+             .description(
+                     "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched")
+             .required(false)
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder()
+             .name("Filter Match Name Only")
+             .description(
+                     "If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
+             .name("Ignore Dotted Files")
+             .description("If true, files whose names begin with a dot (\".\") will be ignored")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
+             .name("Minimum File Age")
+             .description(
+                     "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
+             .required(true)
+             .addValidator(
+                     StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+             .defaultValue("0 sec")
+             .build();
+ 
+     public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
+             .name("Maximum File Age")
+             .description(
+                     "The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
+             .required(false)
+             .addValidator(
+                     StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+             .build();
+ 
+     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+             .name("Batch Size")
+             .description("The maximum number of files to pull in each iteration, based on run schedule.")
+             .required(true)
+             .defaultValue("100")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
+             .name("Polling Interval")
+             .description("Indicates how long to wait between performing directory listings")
+             .required(true)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("0 sec")
+             .build();
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("IO Buffer Size")
+             .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     private static final Set<Relationship> relationships;
+     protected static final List<PropertyDescriptor> localProperties;
+ 
+     static {
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_SUCCESS);
+         rels.add(REL_PASSTHROUGH);
+         relationships = Collections.unmodifiableSet(rels);
+ 
+         List<PropertyDescriptor> props = new ArrayList<>(properties);
+         props.add(DIRECTORY);
+         props.add(RECURSE_SUBDIRS);
+         props.add(KEEP_SOURCE_FILE);
+         props.add(FILE_FILTER_REGEX);
+         props.add(FILTER_MATCH_NAME_ONLY);
+         props.add(IGNORE_DOTTED_FILES);
+         props.add(MIN_AGE);
+         props.add(MAX_AGE);
+         props.add(POLLING_INTERVAL);
+         props.add(BATCH_SIZE);
+         props.add(BUFFER_SIZE);
+         localProperties = Collections.unmodifiableList(props);
+     }
+ 
+     protected ProcessorConfiguration processorConfig;
+     private final AtomicLong logEmptyListing = new AtomicLong(2L);
+ 
+     private final AtomicLong lastPollTime = new AtomicLong(0L);
+     private final Lock listingLock = new ReentrantLock();
+     private final Lock queueLock = new ReentrantLock();
+ 
+     private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>(MAX_WORKING_QUEUE_SIZE);
+     private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return localProperties;
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext context) {
+         final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
+ 
+         final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
+         final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
+         if (minimumAge > maximumAge) {
+             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
+                     .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
+         }
+ 
+         return problems;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(ProcessContext context) throws IOException {
+         abstractOnScheduled(context);
+         // copy configuration values to pass them around cleanly
+         processorConfig = new ProcessorConfiguration(context);
+         FileSystem fs = hdfsResources.get().getValue();
+         Path dir = new Path(context.getProperty(DIRECTORY).getValue());
+         if (!fs.exists(dir)) {
+             throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
+         }
+ 
+         // forget the state of the queue in case HDFS contents changed while this processor was turned off
+         queueLock.lock();
+         try {
+             filePathQueue.clear();
+             processing.clear();
+         } finally {
+             queueLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ 
+         int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+         final List<Path> files = new ArrayList<>(batchSize);
+         List<FlowFile> inputFlowFiles = session.get(10);
+         for (FlowFile ff : inputFlowFiles) {
+             session.transfer(ff, REL_PASSTHROUGH);
+         }
+ 
+         // retrieve new file names from HDFS and place them into work queue
+         if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) {
+             try {
+                 final StopWatch stopWatch = new StopWatch(true);
+                 Set<Path> listedFiles = performListing(context);
+                 stopWatch.stop();
+                 final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ 
+                 if (listedFiles != null) {
+                     // place files into the work queue
+                     int newItems = 0;
+                     queueLock.lock();
+                     try {
+                         for (Path file : listedFiles) {
+                             if (!filePathQueue.contains(file) && !processing.contains(file)) {
+                                 if (!filePathQueue.offer(file)) {
+                                     break;
+                                 }
+                                 newItems++;
+                             }
+                         }
+                     } catch (Exception e) {
+                         getLogger().warn("Could not add to processing queue due to {}", new Object[]{e});
+                     } finally {
+                         queueLock.unlock();
+                     }
+                     if (listedFiles.size() > 0) {
+                         logEmptyListing.set(3L);
+                     }
+                     if (logEmptyListing.getAndDecrement() > 0) {
+                         getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
+                                 new Object[]{millis, listedFiles.size(), newItems});
+                     }
+                 }
+             } catch (IOException e) {
+                 context.yield();
+                 getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
+                 return;
+             }
+         }
+ 
+         // prepare to process a batch of files in the queue
+         queueLock.lock();
+         try {
+             filePathQueue.drainTo(files, batchSize);
+             if (files.isEmpty()) {
+                 // nothing to do!
+                 context.yield();
+                 return;
+             }
+             processing.addAll(files);
+         } finally {
+             queueLock.unlock();
+         }
+ 
+         processBatchOfFiles(files, context, session);
+ 
+         queueLock.lock();
+         try {
+             processing.removeAll(files);
+         } finally {
+             queueLock.unlock();
+         }
+     }
+ 
+     protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
+         // process the batch of files
+         FSDataInputStream stream = null;
+         Configuration conf = hdfsResources.get().getKey();
+         FileSystem hdfs = hdfsResources.get().getValue();
+         final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
+                 BUFFER_SIZE_DEFAULT);
+         final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue());
+         for (final Path file : files) {
+             try {
+                 if (!hdfs.exists(file)) {
+                     continue; // if file is no longer there then move on
+                 }
+                 final String filename = file.getName();
+                 final String relativePath = getPathDifference(rootDir, file);
+ 
+                 stream = hdfs.open(file, bufferSize);
+                 FlowFile flowFile = session.create();
+ 
+                 final StopWatch stopWatch = new StopWatch(true);
+                 flowFile = session.importFrom(stream, flowFile);
+                 stopWatch.stop();
+                 final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+                 final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ 
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
+                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename);
+ 
+                 if (!keepSourceFiles && !hdfs.delete(file, false)) {
+                     getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
+                             new Object[]{file});
+                     session.remove(flowFile);
+                     continue;
+                 }
+ 
+                 final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+                 session.getProvenanceReporter().receive(flowFile, transitUri);
+                 session.transfer(flowFile, REL_SUCCESS);
+                 getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
+                         new Object[]{flowFile, file, millis, dataRate});
+                 session.commit();
+             } catch (final Throwable t) {
+                 getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
+                 session.rollback();
+                 context.yield();
+             } finally {
+                 IOUtils.closeQuietly(stream);
+                 stream = null;
+             }
+         }
+     }
+ 
+     /**
+      * Do a listing of HDFS if the POLLING_INTERVAL has lapsed.
+      *
+      * Will return null if POLLING_INTERVAL has not lapsed. Will return an empty
+      * set if no files were found on HDFS that matched the configured filters.
+      * @param context
+      * @return 
+      * @throws java.io.IOException
+      */
+     protected Set<Path> performListing(final ProcessContext context) throws IOException {
+ 
+         final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
+         Set<Path> listing = null;
+ 
+         if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
+             try {
+                 final FileSystem hdfs = hdfsResources.get().getValue();
+                 // get listing
+                 listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null);
+                 lastPollTime.set(System.currentTimeMillis());
+             } finally {
+                 listingLock.unlock();
+             }
+         }
+ 
+         return listing;
+     }
+ 
+     /**
+      * Poll HDFS for files to process that match the configured file filters.
+      * @param hdfs
+      * @param dir
+      * @param filesVisited
+      * @return 
+      * @throws java.io.IOException 
+      */
+     protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, Set<Path> filesVisited) throws IOException {
+         if (null == filesVisited) {
+             filesVisited = new HashSet<>();
+         }
+ 
+         if (!hdfs.exists(dir)) {
+             throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!");
+         }
+ 
+         final Set<Path> files = new HashSet<>();
+ 
+         for (final FileStatus file : hdfs.listStatus(dir)) {
+             if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
+                 // no need to make the files set larger than what we would queue anyway
+                 break;
+             }
+ 
+             final Path canonicalFile = file.getPath();
+ 
+             if (!filesVisited.add(canonicalFile)) { // skip files we've already seen (may be looping directory links)
+                 continue;
+             }
+ 
+             if (file.isDirectory() && processorConfig.getRecurseSubdirs()) {
+                 files.addAll(selectFiles(hdfs, canonicalFile, filesVisited));
+ 
+             } else if (!file.isDirectory() && processorConfig.getPathFilter().accept(canonicalFile)) {
+                 final long fileAge = System.currentTimeMillis() - file.getModificationTime();
+                 if (processorConfig.getMinimumAge() < fileAge && fileAge < processorConfig.getMaximumAge()) {
+                     files.add(canonicalFile);
+ 
+                     if (getLogger().isDebugEnabled()) {
+                         getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
+                     }
+ 
+                 }
+             }
+         }
+         return files;
+     }
+ 
+     /**
+      * Returns the relative path of the child that does not include the filename
+      * or the root path.
+      * @param root
+      * @param child
+      * @return 
+      */
+     public static String getPathDifference(final Path root, final Path child) {
+         final int depthDiff = child.depth() - root.depth();
+         if (depthDiff <= 1) {
+             return "".intern();
+         }
+         String lastRoot = root.getName();
+         Path childsParent = child.getParent();
+         final StringBuilder builder = new StringBuilder();
+         builder.append(childsParent.getName());
+         for (int i = (depthDiff - 3); i >= 0; i--) {
+             childsParent = childsParent.getParent();
+             String name = childsParent.getName();
+             if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
+                 break;
+             }
+             builder.insert(0, Path.SEPARATOR).insert(0, name);
+         }
+         return builder.toString();
+     }
+ 
+     /**
+      * Holder for a snapshot in time of some processor properties that are
+      * passed around.
+      */
+     protected static class ProcessorConfiguration {
+ 
+         final private Path configuredRootDirPath;
+         final private Pattern fileFilterPattern;
+         final private boolean ignoreDottedFiles;
+         final private boolean filterMatchBasenameOnly;
+         final private long minimumAge;
+         final private long maximumAge;
+         final private boolean recurseSubdirs;
+         final private PathFilter pathFilter;
+ 
+         ProcessorConfiguration(final ProcessContext context) {
+             configuredRootDirPath = new Path(context.getProperty(DIRECTORY).getValue());
+             ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+             final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
+             fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
+             filterMatchBasenameOnly = context.getProperty(FILTER_MATCH_NAME_ONLY).asBoolean();
+             final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+             minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
+             final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+             maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
+             recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+             pathFilter = new PathFilter() {
+ 
+                 @Override
+                 public boolean accept(Path path) {
+                     if (ignoreDottedFiles && path.getName().startsWith(".")) {
+                         return false;
+                     }
+                     final String pathToCompare;
+                     if (filterMatchBasenameOnly) {
+                         pathToCompare = path.getName();
+                     } else {
+                         // figure out portion of path that does not include the provided root dir.
+                         String relativePath = getPathDifference(configuredRootDirPath, path);
+                         if (relativePath.length() == 0) {
+                             pathToCompare = path.getName();
+                         } else {
+                             pathToCompare = relativePath + Path.SEPARATOR + path.getName();
+                         }
+                     }
+ 
+                     if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) {
+                         return false;
+                     }
+                     return true;
+                 }
+ 
+             };
+         }
+ 
+         public Path getConfiguredRootDirPath() {
+             return configuredRootDirPath;
+         }
+ 
+         protected long getMinimumAge() {
+             return minimumAge;
+         }
+ 
+         protected long getMaximumAge() {
+             return maximumAge;
+         }
+ 
+         public boolean getRecurseSubdirs() {
+             return recurseSubdirs;
+         }
+ 
+         protected PathFilter getPathFilter() {
+             return pathFilter;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 0000000,5581430..ec8b5e6
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@@ -1,0 -1,146 +1,145 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
+ import org.apache.nifi.util.StopWatch;
+ import org.apache.nifi.util.Tuple;
 -
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ 
+ /**
+  * This processor is used to pull files from HDFS. The files being pulled in
+  * MUST be SequenceFile formatted files. The processor creates a flow file for
+  * each key/value entry in the ingested SequenceFile. The created flow file's
+  * content depends on the value of the optional configuration property FlowFile
+  * Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR.
+  * With the prior, only the SequenceFile value element is written to the flow
+  * file contents. With the latter, the SequenceFile key and value are written to
+  * the flow file contents as serialized objects; the format is key length (int),
+  * key(String), value length(int), value(bytes). The default is VALUE ONLY.
+  * <p>
+  * NOTE: This processor loads the entire value entry into memory. While the size
+  * limit for a value entry is 2GB, this will cause memory problems if there are
+  * too many concurrent tasks and the data being ingested is large.
+  *
+  */
+ @TriggerWhenEmpty
+ @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "sequence file"})
+ @CapabilityDescription("Fetch sequence files from Hadoop Distributed File System (HDFS) into FlowFiles")
+ public class GetHDFSSequenceFile extends GetHDFS {
+ 
+     static final String VALUE_ONLY = "VALUE ONLY";
+ 
+     static final PropertyDescriptor FLOWFILE_CONTENT = new PropertyDescriptor.Builder()
+             .name("FlowFile Content")
+             .description("Indicate if the content is to be both the key and value of the Sequence File, or just the value.")
+             .allowableValues(VALUE_ONLY, "KEY VALUE PAIR")
+             .defaultValue(VALUE_ONLY)
+             .required(true)
+             .build();
+ 
+     static final List<PropertyDescriptor> props;
+ 
+     static {
+         List<PropertyDescriptor> someProps = new ArrayList<>(localProperties);
+         someProps.add(FLOWFILE_CONTENT);
+         props = Collections.unmodifiableList(someProps);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return props;
+     }
+ 
+     @Override
+     protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
+         final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get();
+         final Configuration conf = hadoopResources.getKey();
+         final FileSystem hdfs = hadoopResources.getValue();
+         final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue();
+         final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         if (bufferSizeProp != null) {
+             int bufferSize = bufferSizeProp.intValue();
+             conf.setInt(BUFFER_SIZE_KEY, bufferSize);
+         }
+         ProcessorLog logger = getLogger();
+         final SequenceFileReader<Set<FlowFile>> reader;
+         if (flowFileContentValue.equalsIgnoreCase(VALUE_ONLY)) {
+             reader = new ValueReader(session);
+         } else {
+             reader = new KeyValueReader(session);
+         }
+         Set<FlowFile> flowFiles = Collections.emptySet();
+         for (final Path file : files) {
+             if (!this.isScheduled()) {
+                 break; // This processor should stop running immediately.
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(false);
+             try {
+                 stopWatch.start();
+                 if (!hdfs.exists(file)) {
+                     continue; // If file is no longer here move on.
+                 }
+                 logger.debug("Reading file");
+                 flowFiles = reader.readSequenceFile(file, conf, hdfs);
+                 if (!keepSourceFiles && !hdfs.delete(file, false)) {
+                     logger.warn("Unable to delete path " + file.toString() + " from HDFS.  Will likely be picked up over and over...");
+                 }
+             } catch (Throwable t) {
+                 logger.error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
+                 session.rollback();
+                 context.yield();
+             } finally {
+                 stopWatch.stop();
+                 long totalSize = 0;
+                 for (FlowFile flowFile : flowFiles) {
+                     totalSize += flowFile.getSize();
+                     session.getProvenanceReporter().receive(flowFile, file.toString());
+                 }
+                 if (totalSize > 0) {
+                     final String dataRate = stopWatch.calculateDataRate(totalSize);
+                     final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                     logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{
+                         flowFiles.size(), file.toUri().toASCIIString(), millis, dataRate});
+                     logger.info("Transferred flowFiles {}  to success", new Object[]{flowFiles});
+                     session.transfer(flowFiles, REL_SUCCESS);
+                 }
+             }
+         }
+ 
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 0000000,e84b575..9a5aa74
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@@ -1,0 -1,403 +1,403 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ import org.apache.nifi.util.Tuple;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.ipc.RemoteException;
+ 
+ /**
+  * This processor copies FlowFiles to HDFS.
+  */
+ @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
+ @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")
+ public class PutHDFS extends AbstractHadoopProcessor {
+ 
+     public static final String REPLACE_RESOLUTION = "replace";
+     public static final String IGNORE_RESOLUTION = "ignore";
+     public static final String FAIL_RESOLUTION = "fail";
+ 
+     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+     public static final int BUFFER_SIZE_DEFAULT = 4096;
+ 
+     // relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("Files that have been successfully written to HDFS are transferred to this relationship")
+             .build();
+ 
+     public static final Relationship REL_FAILURE = new Relationship.Builder()
+             .name("failure")
+             .description(
+                     "Files that could not be written to HDFS for some reason are transferred to this relationship")
+             .build();
+ 
+     // properties
+     public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+             .name(DIRECTORY_PROP_NAME)
+             .description("The parent HDFS directory to which files should be written")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+             .name("Conflict Resolution Strategy")
+             .description("Indicates what should happen when a file with the same name already exists in the output directory")
+             .required(true)
+             .defaultValue(FAIL_RESOLUTION)
+             .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION)
+             .build();
+ 
+     public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
+             .name("Block Size")
+             .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
+             .name("IO Buffer Size")
+             .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder()
+             .name("Replication")
+             .description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration")
+             .addValidator(createPositiveShortValidator())
+             .build();
+ 
+     public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
+             .name("Permissions umask")
+             .description(
+                     "A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode")
+             .addValidator(createUmaskValidator())
+             .build();
+ 
+     public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
+             .name("Remote Owner")
+             .description(
+                     "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
+             .name("Remote Group")
+             .description(
+                     "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     private static final Set<Relationship> relationships;
+     private static final List<PropertyDescriptor> localProperties;
+ 
+     static {
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_SUCCESS);
+         rels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(rels);
+ 
+         List<PropertyDescriptor> props = new ArrayList<>(properties);
+         props.add(DIRECTORY);
+         props.add(CONFLICT_RESOLUTION);
+         props.add(BLOCK_SIZE);
+         props.add(BUFFER_SIZE);
+         props.add(REPLICATION_FACTOR);
+         props.add(UMASK);
+         props.add(REMOTE_OWNER);
+         props.add(REMOTE_GROUP);
+         localProperties = Collections.unmodifiableList(props);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return localProperties;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(ProcessContext context) throws Exception {
+         super.abstractOnScheduled(context);
+ 
+         // Set umask once, to avoid thread safety issues doing it in onTrigger
+         final PropertyValue umaskProp = context.getProperty(UMASK);
+         final short dfsUmask;
+         if (umaskProp.isSet()) {
+             dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
+         } else {
+             dfsUmask = FsPermission.DEFAULT_UMASK;
+         }
+         final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
+         final Configuration conf = resources.getKey();
+         FsPermission.setUMask(conf, new FsPermission(dfsUmask));
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
+         if (resources == null || resources.getKey() == null || resources.getValue() == null) {
+             getLogger().error("HDFS not configured properly");
+             session.transfer(flowFile, REL_FAILURE);
+             context.yield();
+             return;
+         }
+         final Configuration conf = resources.getKey();
+         final FileSystem hdfs = resources.getValue();
+ 
+         final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile)
+                 .getValue());
+         final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+ 
+         final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+         final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
+ 
+         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
+                 BUFFER_SIZE_DEFAULT);
+ 
+         final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
+         final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
+                 .getDefaultReplication(configuredRootDirPath);
+ 
+         Path tempDotCopyFile = null;
+         try {
+             final Path tempCopyFile;
+             final Path copyFile;
+ 
+             tempCopyFile = new Path(configuredRootDirPath, "." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+             copyFile = new Path(configuredRootDirPath, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ 
+             // Create destination directory if it does not exist
+             try {
+                 if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) {
+                     throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
+                 }
+             } catch (FileNotFoundException fe) {
+                 if (!hdfs.mkdirs(configuredRootDirPath)) {
+                     throw new IOException(configuredRootDirPath.toString() + " could not be created");
+                 }
+                 changeOwner(context, hdfs, configuredRootDirPath);
+             }
+ 
+             // If destination file already exists, resolve that based on processor configuration
+             if (hdfs.exists(copyFile)) {
+                 switch (conflictResponse) {
+                     case REPLACE_RESOLUTION:
+                         if (hdfs.delete(copyFile, false)) {
+                             getLogger().info("deleted {} in order to replace with the contents of {}",
+                                     new Object[]{copyFile, flowFile});
+                         }
+                         break;
+                     case IGNORE_RESOLUTION:
+                         session.transfer(flowFile, REL_SUCCESS);
+                         getLogger().info("transferring {} to success because file with same name already exists",
+                                 new Object[]{flowFile});
+                         return;
+                     case FAIL_RESOLUTION:
+                         flowFile = session.penalize(flowFile);
+                         session.transfer(flowFile, REL_FAILURE);
+                         getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
+                                 new Object[]{flowFile});
+                         return;
+                     default:
+                         break;
+                 }
+             }
+ 
+             // Write FlowFile to temp file on HDFS
+             final StopWatch stopWatch = new StopWatch(true);
+             session.read(flowFile, new InputStreamCallback() {
+ 
+                 @Override
+                 public void process(InputStream in) throws IOException {
+                     FSDataOutputStream fos = null;
+                     Path createdFile = null;
+                     try {
+                         fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
+                         createdFile = tempCopyFile;
+                         BufferedInputStream bis = new BufferedInputStream(in);
+                         StreamUtils.copy(bis, fos);
+                         bis = null;
+                         fos.flush();
+                     } finally {
+                         try {
+                             if (fos != null) {
+                                 fos.close();
+                             }
+                         } catch (RemoteException re) {
+                             // when talking to remote HDFS clusters, we don't notice problems until fos.close()
+                             if (createdFile != null) {
+                                 try {
+                                     hdfs.delete(createdFile, false);
+                                 } catch (Throwable ignore) {
+                                 }
+                             }
+                             throw re;
+                         } catch (Throwable ignore) {
+                         }
+                         fos = null;
+                     }
+                 }
+ 
+             });
+             stopWatch.stop();
+             final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
+             final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+             tempDotCopyFile = tempCopyFile;
+ 
+             boolean renamed = false;
+             for (int i = 0; i < 10; i++) { // try to rename multiple times.
+                 if (hdfs.rename(tempCopyFile, copyFile)) {
+                     renamed = true;
+                     break;// rename was successful
+                 }
+                 Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+             }
+             if (!renamed) {
+                 hdfs.delete(tempCopyFile, false);
+                 throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+                         + " to its final filename");
+             }
+ 
+             changeOwner(context, hdfs, copyFile);
+ 
+             getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
+                     new Object[]{flowFile, copyFile, millis, dataRate});
+ 
+             final String filename = copyFile.toString();
+             final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+             session.getProvenanceReporter().send(flowFile, transitUri);
+             session.transfer(flowFile, REL_SUCCESS);
+ 
+         } catch (final Throwable t) {
+             if (tempDotCopyFile != null) {
+                 try {
+                     hdfs.delete(tempDotCopyFile, false);
+                 } catch (Exception e) {
+                     getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+                 }
+             }
+             getLogger().error("Failed to write to HDFS due to {}", t);
+             session.rollback();
+             context.yield();
+         }
+     }
+ 
+     protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {
+         try {
+             // Change owner and group of file if configured to do so
+             String owner = context.getProperty(REMOTE_OWNER).getValue();
+             String group = context.getProperty(REMOTE_GROUP).getValue();
+             if (owner != null || group != null) {
+                 hdfs.setOwner(name, owner, group);
+             }
+         } catch (Exception e) {
+             getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
+         }
+     }
+ 
+     /*
+      * Validates that a property is a valid short number greater than 0.
+      */
+     static Validator createPositiveShortValidator() {
+         return new Validator() {
+             @Override
+             public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+                 String reason = null;
+                 try {
+                     final short shortVal = Short.parseShort(value);
+                     if (shortVal <= 0) {
+                         reason = "short integer must be greater than zero";
+                     }
+                 } catch (final NumberFormatException e) {
+                     reason = "[" + value + "] is not a valid short integer";
+                 }
+                 return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
+                         .build();
+             }
+         };
+     }
+ 
+     /*
+      * Validates that a property is a valid umask, i.e. a short octal number that is not negative.
+      */
+     static Validator createUmaskValidator() {
+         return new Validator() {
+             @Override
+             public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+                 String reason = null;
+                 try {
+                     final short shortVal = Short.parseShort(value, 8);
+                     if (shortVal < 0) {
+                         reason = "octal umask [" + value + "] cannot be negative";
+                     } else if (shortVal > 511) {
+                         // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked
+                         reason = "octal umask [" + value + "] is not a valid umask";
+                     }
+                 } catch (final NumberFormatException e) {
+                     reason = "[" + value + "] is not a valid short octal number";
+                 }
+                 return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null)
+                         .build();
+             }
+         };
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 0000000,f202e29..5383e9d
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@@ -1,0 -1,330 +1,330 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.kafka;
+ 
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import kafka.consumer.Consumer;
+ import kafka.consumer.ConsumerConfig;
+ import kafka.consumer.ConsumerIterator;
+ import kafka.consumer.KafkaStream;
+ import kafka.javaapi.consumer.ConsumerConnector;
+ import kafka.message.MessageAndMetadata;
+ 
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.OnUnscheduled;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @SupportsBatching
+ @CapabilityDescription("Fetches messages from Apache Kafka")
+ @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+ public class GetKafka extends AbstractProcessor {
+     public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
+         .name("ZooKeeper Connection String")
+         .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+     public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+         .name("Topic Name")
+         .description("The Kafka Topic to pull messages from")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+     public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
+ 		.name("Zookeeper Commit Frequency")
+ 		.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
+ 		.required(true)
+ 		.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ 		.expressionLanguageSupported(false)
+ 		.defaultValue("60 secs")
+ 		.build();
+     public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
+ 	    .name("ZooKeeper Communications Timeout")
+ 	    .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
+ 	    .required(true)
+ 	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ 	    .expressionLanguageSupported(false)
+ 	    .defaultValue("30 secs")
+ 	    .build();
+     public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
+ 	    .name("Kafka Communications Timeout")
+ 	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+ 	    .required(true)
+ 	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ 	    .expressionLanguageSupported(false)
+ 	    .defaultValue("30 secs")
+ 	    .build();
+     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+         .name("Batch Size")
+         .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be "
+                 + "concatenated together with the <Message Demarcator> string placed between the content of each message. "
+                 + "If the messages from Kafka should not be concatenated together, leave this value at 1.")
+         .required(true)
+         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .defaultValue("1")
+         .build();
+     public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+         .name("Message Demarcator")
+         .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> "
+                 + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, "
+                 + "this value will be placed in between them.")
+         .required(true)
+         .addValidator(Validator.VALID)  // accept anything as a demarcator, including empty string
+         .expressionLanguageSupported(false)
+         .defaultValue("\\n")
+         .build();
+     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+         .name("Client Name")
+         .description("Client Name to use when communicating with Kafka")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ 	    .name("success")
+ 	    .description("All FlowFiles that are created are routed to this relationship")
+ 	    .build();
+ 
+     
+     private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
+     private volatile ConsumerConnector consumer;
+ 
+     final Lock interruptionLock = new ReentrantLock();
+     // guarded by interruptionLock
+     private final Set<Thread> interruptableThreads = new HashSet<>();
+     
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+     	final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
+     		.fromPropertyDescriptor(CLIENT_NAME)
+     		.defaultValue("NiFi-" + getIdentifier())
+     		.build();
+     	
+         final List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(ZOOKEEPER_CONNECTION_STRING);
+         props.add(TOPIC);
+         props.add(ZOOKEEPER_COMMIT_DELAY);
+         props.add(BATCH_SIZE);
+         props.add(MESSAGE_DEMARCATOR);
+         props.add(clientNameWithDefault);
+         props.add(KAFKA_TIMEOUT);
+         props.add(ZOOKEEPER_TIMEOUT);
+         return props;
+     }
+     
+     @Override
+     public Set<Relationship> getRelationships() {
+         final Set<Relationship> relationships = new HashSet<>(1);
+         relationships.add(REL_SUCCESS);
+         return relationships;
+     }
+     
+     @OnScheduled
+     public void createConsumers(final ProcessContext context) {
+     	final String topic = context.getProperty(TOPIC).getValue();
+     	
+     	final Map<String, Integer> topicCountMap = new HashMap<>(1);
+     	topicCountMap.put(topic, context.getMaxConcurrentTasks());
+     	
+     	final Properties props = new Properties();
+     	props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); 
+     	props.setProperty("group.id", getIdentifier());
+     	props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+     	props.setProperty("auto.commit.enable", "true"); // just be explicit
+     	props.setProperty("auto.offset.reset", "smallest");
+     	
+     	final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+     	consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+     	
+     	final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+     	final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+     	
+     	this.streamIterators.clear();
+     	
+     	for ( final KafkaStream<byte[], byte[]> stream : streams ) {
+     		streamIterators.add(stream.iterator());
+     	}
+     }
+     
+     @OnStopped
+     public void shutdownConsumer() {
+     	if ( consumer != null ) {
+     		try {
+     			consumer.commitOffsets();
+     		} finally {
+     			consumer.shutdown();
+     		}
+     	}
+     }
+     
+     @OnUnscheduled
+     public void interruptIterators() {
+     	// Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
+     	// interrupt the Threads. We do this when the Processor is stopped so that we have the
+     	// ability to shutdown the Processor.
+     	interruptionLock.lock();
+     	try {
+     		for ( final Thread t : interruptableThreads ) {
+     			t.interrupt();
+     		}
+     		
+     		interruptableThreads.clear();
+     	} finally {
+     		interruptionLock.unlock();
+     	}
+     }
+     
+     protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+         return streamIterators.poll();
+     }
+     
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+     	ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
+     	if ( iterator == null ) {
+     		return;
+     	}
+     	
+     	final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+     	final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+     	final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
+     	final String topic = context.getProperty(TOPIC).getValue();
+     	
+     	FlowFile flowFile = null;
+     	try {
+     	    // add the current thread to the Set of those to be interrupted if processor stopped.
+     		interruptionLock.lock();
+     		try {
+     			interruptableThreads.add(Thread.currentThread());
+     		} finally {
+     			interruptionLock.unlock();
+     		}
+     		
+     		final long start = System.nanoTime();
+     		flowFile = session.create();
+     		
+     		final Map<String, String> attributes = new HashMap<>();
+             attributes.put("kafka.topic", topic);
+ 
+             int numMessages = 0;
+     		for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+     		    // if the processor is stopped, iterator.hasNext() will throw an Exception.
+     		    // In this case, we just break out of the loop.
+     		    try {
+         		    if ( !iterator.hasNext() ) {
+         		        break;
+         		    }
+     		    } catch (final Exception e) {
+     		        break;
+     		    }
+     		    
+         		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+         		if ( mam == null ) {
+         			return;
+         		}
+         		
+         		final byte[] key = mam.key();
+         		
+         		if ( batchSize == 1 ) {
+         		    // the kafka.key, kafka.offset, and kafka.partition attributes are added only
+         		    // for a batch size of 1.
+         		    if ( key != null ) {
+         		        attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+         		    }
+         		    
+             		attributes.put("kafka.offset", String.valueOf(mam.offset()));
+             		attributes.put("kafka.partition", String.valueOf(mam.partition()));
+         		}
+         		
+         		// add the message to the FlowFile's contents
+         		final boolean firstMessage = (msgCount == 0);
+         		flowFile = session.append(flowFile, new OutputStreamCallback() {
+     				@Override
+     				public void process(final OutputStream out) throws IOException {
+     				    if ( !firstMessage ) {
+     				        out.write(demarcatorBytes);
+     				    }
+     					out.write(mam.message());
+     				}
+         		});
+         		numMessages++;
+     		}
+     		
+     		// If we received no messages, remove the FlowFile. Otherwise, send to success.
+     		if ( flowFile.getSize() == 0L ) {
+     		    session.remove(flowFile);
+     		} else {
+         		flowFile = session.putAllAttributes(flowFile, attributes);
+         		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+         		session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
+         		getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
+         		session.transfer(flowFile, REL_SUCCESS);
+     		}
+     	} catch (final Exception e) {
+     		getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
+     		if ( flowFile != null ) {
+     			session.remove(flowFile);
+     		}
+     	} finally {
+     	    // Remove the current thread from the Set of Threads to interrupt.
+     		interruptionLock.lock();
+     		try {
+     			interruptableThreads.remove(Thread.currentThread());
+     		} finally {
+     			interruptionLock.unlock();
+     		}
+     		
+     		// Add the iterator back to the queue
+     		if ( iterator != null ) {
+     			streamIterators.offer(iterator);
+     		}
+     	}
+     }
+ 	
+ }


Mime
View raw message