Return-Path: X-Original-To: apmail-nifi-commits-archive@minotaur.apache.org Delivered-To: apmail-nifi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D73F217C0D for ; Thu, 22 Jan 2015 17:04:47 +0000 (UTC) Received: (qmail 10364 invoked by uid 500); 22 Jan 2015 17:04:47 -0000 Delivered-To: apmail-nifi-commits-archive@nifi.apache.org Received: (qmail 10317 invoked by uid 500); 22 Jan 2015 17:04:47 -0000 Mailing-List: contact commits-help@nifi.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.incubator.apache.org Delivered-To: mailing list commits@nifi.incubator.apache.org Received: (qmail 10303 invoked by uid 99); 22 Jan 2015 17:04:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jan 2015 17:04:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 22 Jan 2015 17:04:17 +0000 Received: (qmail 8078 invoked by uid 99); 22 Jan 2015 17:03:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jan 2015 17:03:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B252E0E2E; Thu, 22 Jan 2015 17:03:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: markap14@apache.org To: commits@nifi.incubator.apache.org Date: Thu, 22 Jan 2015 17:04:56 -0000 Message-Id: <76239fe4f8874351a56616308443935e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [67/79] [abbrv] incubator-nifi git commit: NIFI-6: Rebase from develop to include renaming of directory structure X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 0000000,20ac738..0610d8f mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@@ -1,0 -1,563 +1,562 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.hadoop; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; + import java.util.regex.Pattern; + ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.StopWatch; - + import org.apache.commons.io.IOUtils; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FSDataInputStream; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.PathFilter; + + /** + * This processor reads files from HDFS into NiFi FlowFiles. + */ + @TriggerWhenEmpty + @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) + @CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles") + public class GetHDFS extends AbstractHadoopProcessor { + + public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; + public static final int BUFFER_SIZE_DEFAULT = 4096; + public static final int MAX_WORKING_QUEUE_SIZE = 25000; + + // relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All files retrieved from HDFS are transferred to this relationship") + .build(); + + public static final Relationship REL_PASSTHROUGH = new Relationship.Builder() + .name("passthrough") + .description( + "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship") + .build(); + + // properties + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name(DIRECTORY_PROP_NAME) + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether to pull files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder() + .name("Keep Source File") + .description("Determines whether to delete the file from HDFS after it has been successfully transferred") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder() + .name("File Filter Regex") + .description( + "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder() + .name("Filter Match Name Only") + .description( + "If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() + .name("Ignore Dotted Files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("Minimum File Age") + .description( + "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored") + .required(true) + .addValidator( + StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .defaultValue("0 sec") + .build(); + + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("Maximum File Age") + .description( + "The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored") + .required(false) + .addValidator( + StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of files to pull in each iteration, based on run schedule.") + .required(true) + .defaultValue("100") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Polling Interval") + .description("Indicates how long to wait between performing directory listings") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + + public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("IO Buffer Size") + .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + private static final Set relationships; + protected static final List localProperties; + + static { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_PASSTHROUGH); + relationships = Collections.unmodifiableSet(rels); + + List props = new ArrayList<>(properties); + props.add(DIRECTORY); + props.add(RECURSE_SUBDIRS); + props.add(KEEP_SOURCE_FILE); + props.add(FILE_FILTER_REGEX); + props.add(FILTER_MATCH_NAME_ONLY); + props.add(IGNORE_DOTTED_FILES); + props.add(MIN_AGE); + props.add(MAX_AGE); + props.add(POLLING_INTERVAL); + props.add(BATCH_SIZE); + props.add(BUFFER_SIZE); + localProperties = Collections.unmodifiableList(props); + } + + protected ProcessorConfiguration processorConfig; + private final AtomicLong logEmptyListing = new AtomicLong(2L); + + private final AtomicLong lastPollTime = new AtomicLong(0L); + private final Lock listingLock = new ReentrantLock(); + private final Lock queueLock = new ReentrantLock(); + + private final BlockingQueue filePathQueue = new LinkedBlockingQueue<>(MAX_WORKING_QUEUE_SIZE); + private final BlockingQueue processing = new LinkedBlockingQueue<>(); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return localProperties; + } + + @Override + protected Collection customValidate(ValidationContext context) { + final List problems = new ArrayList<>(super.customValidate(context)); + + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + if (minimumAge > maximumAge) { + problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration") + .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build()); + } + + return problems; + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + abstractOnScheduled(context); + // copy configuration values to pass them around cleanly + processorConfig = new ProcessorConfiguration(context); + FileSystem fs = hdfsResources.get().getValue(); + Path dir = new Path(context.getProperty(DIRECTORY).getValue()); + if (!fs.exists(dir)) { + throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); + } + + // forget the state of the queue in case HDFS contents changed while this processor was turned off + queueLock.lock(); + try { + filePathQueue.clear(); + processing.clear(); + } finally { + queueLock.unlock(); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List files = new ArrayList<>(batchSize); + List inputFlowFiles = session.get(10); + for (FlowFile ff : inputFlowFiles) { + session.transfer(ff, REL_PASSTHROUGH); + } + + // retrieve new file names from HDFS and place them into work queue + if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) { + try { + final StopWatch stopWatch = new StopWatch(true); + Set listedFiles = performListing(context); + stopWatch.stop(); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + + if (listedFiles != null) { + // place files into the work queue + int newItems = 0; + queueLock.lock(); + try { + for (Path file : listedFiles) { + if (!filePathQueue.contains(file) && !processing.contains(file)) { + if (!filePathQueue.offer(file)) { + break; + } + newItems++; + } + } + } catch (Exception e) { + getLogger().warn("Could not add to processing queue due to {}", new Object[]{e}); + } finally { + queueLock.unlock(); + } + if (listedFiles.size() > 0) { + logEmptyListing.set(3L); + } + if (logEmptyListing.getAndDecrement() > 0) { + getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", + new Object[]{millis, listedFiles.size(), newItems}); + } + } + } catch (IOException e) { + context.yield(); + getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e}); + return; + } + } + + // prepare to process a batch of files in the queue + queueLock.lock(); + try { + filePathQueue.drainTo(files, batchSize); + if (files.isEmpty()) { + // nothing to do! + context.yield(); + return; + } + processing.addAll(files); + } finally { + queueLock.unlock(); + } + + processBatchOfFiles(files, context, session); + + queueLock.lock(); + try { + processing.removeAll(files); + } finally { + queueLock.unlock(); + } + } + + protected void processBatchOfFiles(final List files, final ProcessContext context, final ProcessSession session) { + // process the batch of files + FSDataInputStream stream = null; + Configuration conf = hdfsResources.get().getKey(); + FileSystem hdfs = hdfsResources.get().getValue(); + final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); + final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); + int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, + BUFFER_SIZE_DEFAULT); + final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); + for (final Path file : files) { + try { + if (!hdfs.exists(file)) { + continue; // if file is no longer there then move on + } + final String filename = file.getName(); + final String relativePath = getPathDifference(rootDir, file); + + stream = hdfs.open(file, bufferSize); + FlowFile flowFile = session.create(); + + final StopWatch stopWatch = new StopWatch(true); + flowFile = session.importFrom(stream, flowFile); + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + + flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename); + + if (!keepSourceFiles && !hdfs.delete(file, false)) { + getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", + new Object[]{file}); + session.remove(flowFile); + continue; + } + + final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", + new Object[]{flowFile, file, millis, dataRate}); + session.commit(); + } catch (final Throwable t) { + getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t}); + session.rollback(); + context.yield(); + } finally { + IOUtils.closeQuietly(stream); + stream = null; + } + } + } + + /** + * Do a listing of HDFS if the POLLING_INTERVAL has lapsed. + * + * Will return null if POLLING_INTERVAL has not lapsed. Will return an empty + * set if no files were found on HDFS that matched the configured filters. + * @param context + * @return + * @throws java.io.IOException + */ + protected Set performListing(final ProcessContext context) throws IOException { + + final long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + final long nextPollTime = lastPollTime.get() + pollingIntervalMillis; + Set listing = null; + + if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) { + try { + final FileSystem hdfs = hdfsResources.get().getValue(); + // get listing + listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null); + lastPollTime.set(System.currentTimeMillis()); + } finally { + listingLock.unlock(); + } + } + + return listing; + } + + /** + * Poll HDFS for files to process that match the configured file filters. + * @param hdfs + * @param dir + * @param filesVisited + * @return + * @throws java.io.IOException + */ + protected Set selectFiles(final FileSystem hdfs, final Path dir, Set filesVisited) throws IOException { + if (null == filesVisited) { + filesVisited = new HashSet<>(); + } + + if (!hdfs.exists(dir)) { + throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!"); + } + + final Set files = new HashSet<>(); + + for (final FileStatus file : hdfs.listStatus(dir)) { + if (files.size() >= MAX_WORKING_QUEUE_SIZE) { + // no need to make the files set larger than what we would queue anyway + break; + } + + final Path canonicalFile = file.getPath(); + + if (!filesVisited.add(canonicalFile)) { // skip files we've already seen (may be looping directory links) + continue; + } + + if (file.isDirectory() && processorConfig.getRecurseSubdirs()) { + files.addAll(selectFiles(hdfs, canonicalFile, filesVisited)); + + } else if (!file.isDirectory() && processorConfig.getPathFilter().accept(canonicalFile)) { + final long fileAge = System.currentTimeMillis() - file.getModificationTime(); + if (processorConfig.getMinimumAge() < fileAge && fileAge < processorConfig.getMaximumAge()) { + files.add(canonicalFile); + + if (getLogger().isDebugEnabled()) { + getLogger().debug(this + " selected file at path: " + canonicalFile.toString()); + } + + } + } + } + return files; + } + + /** + * Returns the relative path of the child that does not include the filename + * or the root path. + * @param root + * @param child + * @return + */ + public static String getPathDifference(final Path root, final Path child) { + final int depthDiff = child.depth() - root.depth(); + if (depthDiff <= 1) { + return "".intern(); + } + String lastRoot = root.getName(); + Path childsParent = child.getParent(); + final StringBuilder builder = new StringBuilder(); + builder.append(childsParent.getName()); + for (int i = (depthDiff - 3); i >= 0; i--) { + childsParent = childsParent.getParent(); + String name = childsParent.getName(); + if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { + break; + } + builder.insert(0, Path.SEPARATOR).insert(0, name); + } + return builder.toString(); + } + + /** + * Holder for a snapshot in time of some processor properties that are + * passed around. + */ + protected static class ProcessorConfiguration { + + final private Path configuredRootDirPath; + final private Pattern fileFilterPattern; + final private boolean ignoreDottedFiles; + final private boolean filterMatchBasenameOnly; + final private long minimumAge; + final private long maximumAge; + final private boolean recurseSubdirs; + final private PathFilter pathFilter; + + ProcessorConfiguration(final ProcessContext context) { + configuredRootDirPath = new Path(context.getProperty(DIRECTORY).getValue()); + ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); + final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); + fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); + filterMatchBasenameOnly = context.getProperty(FILTER_MATCH_NAME_ONLY).asBoolean(); + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + pathFilter = new PathFilter() { + + @Override + public boolean accept(Path path) { + if (ignoreDottedFiles && path.getName().startsWith(".")) { + return false; + } + final String pathToCompare; + if (filterMatchBasenameOnly) { + pathToCompare = path.getName(); + } else { + // figure out portion of path that does not include the provided root dir. + String relativePath = getPathDifference(configuredRootDirPath, path); + if (relativePath.length() == 0) { + pathToCompare = path.getName(); + } else { + pathToCompare = relativePath + Path.SEPARATOR + path.getName(); + } + } + + if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) { + return false; + } + return true; + } + + }; + } + + public Path getConfiguredRootDirPath() { + return configuredRootDirPath; + } + + protected long getMinimumAge() { + return minimumAge; + } + + protected long getMaximumAge() { + return maximumAge; + } + + public boolean getRecurseSubdirs() { + return recurseSubdirs; + } + + protected PathFilter getPathFilter() { + return pathFilter; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index 0000000,5581430..ec8b5e6 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@@ -1,0 -1,146 +1,145 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.hadoop; + + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.processors.hadoop.util.SequenceFileReader; + import org.apache.nifi.util.StopWatch; + import org.apache.nifi.util.Tuple; - + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + + /** + * This processor is used to pull files from HDFS. The files being pulled in + * MUST be SequenceFile formatted files. The processor creates a flow file for + * each key/value entry in the ingested SequenceFile. The created flow file's + * content depends on the value of the optional configuration property FlowFile + * Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR. + * With the prior, only the SequenceFile value element is written to the flow + * file contents. With the latter, the SequenceFile key and value are written to + * the flow file contents as serialized objects; the format is key length (int), + * key(String), value length(int), value(bytes). The default is VALUE ONLY. + *

+ * NOTE: This processor loads the entire value entry into memory. While the size + * limit for a value entry is 2GB, this will cause memory problems if there are + * too many concurrent tasks and the data being ingested is large. + * + */ + @TriggerWhenEmpty + @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "sequence file"}) + @CapabilityDescription("Fetch sequence files from Hadoop Distributed File System (HDFS) into FlowFiles") + public class GetHDFSSequenceFile extends GetHDFS { + + static final String VALUE_ONLY = "VALUE ONLY"; + + static final PropertyDescriptor FLOWFILE_CONTENT = new PropertyDescriptor.Builder() + .name("FlowFile Content") + .description("Indicate if the content is to be both the key and value of the Sequence File, or just the value.") + .allowableValues(VALUE_ONLY, "KEY VALUE PAIR") + .defaultValue(VALUE_ONLY) + .required(true) + .build(); + + static final List props; + + static { + List someProps = new ArrayList<>(localProperties); + someProps.add(FLOWFILE_CONTENT); + props = Collections.unmodifiableList(someProps); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return props; + } + + @Override + protected void processBatchOfFiles(final List files, final ProcessContext context, final ProcessSession session) { + final Tuple hadoopResources = hdfsResources.get(); + final Configuration conf = hadoopResources.getKey(); + final FileSystem hdfs = hadoopResources.getValue(); + final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue(); + final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); + final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); + if (bufferSizeProp != null) { + int bufferSize = bufferSizeProp.intValue(); + conf.setInt(BUFFER_SIZE_KEY, bufferSize); + } + ProcessorLog logger = getLogger(); + final SequenceFileReader> reader; + if (flowFileContentValue.equalsIgnoreCase(VALUE_ONLY)) { + reader = new ValueReader(session); + } else { + reader = new KeyValueReader(session); + } + Set flowFiles = Collections.emptySet(); + for (final Path file : files) { + if (!this.isScheduled()) { + break; // This processor should stop running immediately. + } + + final StopWatch stopWatch = new StopWatch(false); + try { + stopWatch.start(); + if (!hdfs.exists(file)) { + continue; // If file is no longer here move on. + } + logger.debug("Reading file"); + flowFiles = reader.readSequenceFile(file, conf, hdfs); + if (!keepSourceFiles && !hdfs.delete(file, false)) { + logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over..."); + } + } catch (Throwable t) { + logger.error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t}); + session.rollback(); + context.yield(); + } finally { + stopWatch.stop(); + long totalSize = 0; + for (FlowFile flowFile : flowFiles) { + totalSize += flowFile.getSize(); + session.getProvenanceReporter().receive(flowFile, file.toString()); + } + if (totalSize > 0) { + final String dataRate = stopWatch.calculateDataRate(totalSize); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + logger.info("Created {} flowFiles from SequenceFile {}. Ingested in {} milliseconds at a rate of {}", new Object[]{ + flowFiles.size(), file.toUri().toASCIIString(), millis, dataRate}); + logger.info("Transferred flowFiles {} to success", new Object[]{flowFiles}); + session.transfer(flowFiles, REL_SUCCESS); + } + } + } + + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 0000000,e84b575..9a5aa74 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@@ -1,0 -1,403 +1,403 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.hadoop; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.io.InputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.StopWatch; + import org.apache.nifi.util.Tuple; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FSDataOutputStream; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.ipc.RemoteException; + + /** + * This processor copies FlowFiles to HDFS. + */ + @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"}) + @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)") + public class PutHDFS extends AbstractHadoopProcessor { + + public static final String REPLACE_RESOLUTION = "replace"; + public static final String IGNORE_RESOLUTION = "ignore"; + public static final String FAIL_RESOLUTION = "fail"; + + public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; + public static final int BUFFER_SIZE_DEFAULT = 4096; + + // relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Files that have been successfully written to HDFS are transferred to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "Files that could not be written to HDFS for some reason are transferred to this relationship") + .build(); + + // properties + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name(DIRECTORY_PROP_NAME) + .description("The parent HDFS directory to which files should be written") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the output directory") + .required(true) + .defaultValue(FAIL_RESOLUTION) + .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION) + .build(); + + public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() + .name("Block Size") + .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("IO Buffer Size") + .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor REPLICATION_FACTOR = new PropertyDescriptor.Builder() + .name("Replication") + .description("Number of times that HDFS will replicate each file. This overrides the Hadoop Configuration") + .addValidator(createPositiveShortValidator()) + .build(); + + public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder() + .name("Permissions umask") + .description( + "A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode") + .addValidator(createUmaskValidator()) + .build(); + + public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder() + .name("Remote Owner") + .description( + "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() + .name("Remote Group") + .description( + "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final Set relationships; + private static final List localProperties; + + static { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + + List props = new ArrayList<>(properties); + props.add(DIRECTORY); + props.add(CONFLICT_RESOLUTION); + props.add(BLOCK_SIZE); + props.add(BUFFER_SIZE); + props.add(REPLICATION_FACTOR); + props.add(UMASK); + props.add(REMOTE_OWNER); + props.add(REMOTE_GROUP); + localProperties = Collections.unmodifiableList(props); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return localProperties; + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws Exception { + super.abstractOnScheduled(context); + + // Set umask once, to avoid thread safety issues doing it in onTrigger + final PropertyValue umaskProp = context.getProperty(UMASK); + final short dfsUmask; + if (umaskProp.isSet()) { + dfsUmask = Short.parseShort(umaskProp.getValue(), 8); + } else { + dfsUmask = FsPermission.DEFAULT_UMASK; + } + final Tuple resources = hdfsResources.get(); + final Configuration conf = resources.getKey(); + FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Tuple resources = hdfsResources.get(); + if (resources == null || resources.getKey() == null || resources.getValue() == null) { + getLogger().error("HDFS not configured properly"); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + final Configuration conf = resources.getKey(); + final FileSystem hdfs = resources.getValue(); + + final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile) + .getValue()); + final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); + final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); + + final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); + final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, + BUFFER_SIZE_DEFAULT); + + final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); + final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs + .getDefaultReplication(configuredRootDirPath); + + Path tempDotCopyFile = null; + try { + final Path tempCopyFile; + final Path copyFile; + + tempCopyFile = new Path(configuredRootDirPath, "." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); + copyFile = new Path(configuredRootDirPath, flowFile.getAttribute(CoreAttributes.FILENAME.key())); + + // Create destination directory if it does not exist + try { + if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) { + throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory"); + } + } catch (FileNotFoundException fe) { + if (!hdfs.mkdirs(configuredRootDirPath)) { + throw new IOException(configuredRootDirPath.toString() + " could not be created"); + } + changeOwner(context, hdfs, configuredRootDirPath); + } + + // If destination file already exists, resolve that based on processor configuration + if (hdfs.exists(copyFile)) { + switch (conflictResponse) { + case REPLACE_RESOLUTION: + if (hdfs.delete(copyFile, false)) { + getLogger().info("deleted {} in order to replace with the contents of {}", + new Object[]{copyFile, flowFile}); + } + break; + case IGNORE_RESOLUTION: + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("transferring {} to success because file with same name already exists", + new Object[]{flowFile}); + return; + case FAIL_RESOLUTION: + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + getLogger().warn("penalizing {} and routing to failure because file with same name already exists", + new Object[]{flowFile}); + return; + default: + break; + } + } + + // Write FlowFile to temp file on HDFS + final StopWatch stopWatch = new StopWatch(true); + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + FSDataOutputStream fos = null; + Path createdFile = null; + try { + fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + createdFile = tempCopyFile; + BufferedInputStream bis = new BufferedInputStream(in); + StreamUtils.copy(bis, fos); + bis = null; + fos.flush(); + } finally { + try { + if (fos != null) { + fos.close(); + } + } catch (RemoteException re) { + // when talking to remote HDFS clusters, we don't notice problems until fos.close() + if (createdFile != null) { + try { + hdfs.delete(createdFile, false); + } catch (Throwable ignore) { + } + } + throw re; + } catch (Throwable ignore) { + } + fos = null; + } + } + + }); + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + tempDotCopyFile = tempCopyFile; + + boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. + if (hdfs.rename(tempCopyFile, copyFile)) { + renamed = true; + break;// rename was successful + } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!renamed) { + hdfs.delete(tempCopyFile, false); + throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile + + " to its final filename"); + } + + changeOwner(context, hdfs, copyFile); + + getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", + new Object[]{flowFile, copyFile, millis, dataRate}); + + final String filename = copyFile.toString(); + final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + session.getProvenanceReporter().send(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + + } catch (final Throwable t) { + if (tempDotCopyFile != null) { + try { + hdfs.delete(tempDotCopyFile, false); + } catch (Exception e) { + getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e}); + } + } + getLogger().error("Failed to write to HDFS due to {}", t); + session.rollback(); + context.yield(); + } + } + + protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) { + try { + // Change owner and group of file if configured to do so + String owner = context.getProperty(REMOTE_OWNER).getValue(); + String group = context.getProperty(REMOTE_GROUP).getValue(); + if (owner != null || group != null) { + hdfs.setOwner(name, owner, group); + } + } catch (Exception e) { + getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e}); + } + } + + /* + * Validates that a property is a valid short number greater than 0. + */ + static Validator createPositiveShortValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final short shortVal = Short.parseShort(value); + if (shortVal <= 0) { + reason = "short integer must be greater than zero"; + } + } catch (final NumberFormatException e) { + reason = "[" + value + "] is not a valid short integer"; + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) + .build(); + } + }; + } + + /* + * Validates that a property is a valid umask, i.e. a short octal number that is not negative. + */ + static Validator createUmaskValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final short shortVal = Short.parseShort(value, 8); + if (shortVal < 0) { + reason = "octal umask [" + value + "] cannot be negative"; + } else if (shortVal > 511) { + // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked + reason = "octal umask [" + value + "] is not a valid umask"; + } + } catch (final NumberFormatException e) { + reason = "[" + value + "] is not a valid short octal number"; + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) + .build(); + } + }; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 0000000,f202e29..5383e9d mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@@ -1,0 -1,330 +1,330 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.kafka; + + import java.io.IOException; + import java.io.OutputStream; + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Properties; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; + + import kafka.consumer.Consumer; + import kafka.consumer.ConsumerConfig; + import kafka.consumer.ConsumerIterator; + import kafka.consumer.KafkaStream; + import kafka.javaapi.consumer.ConsumerConnector; + import kafka.message.MessageAndMetadata; + ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.lifecycle.OnStopped; ++import org.apache.nifi.annotation.lifecycle.OnUnscheduled; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.OnUnscheduled; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + + @SupportsBatching + @CapabilityDescription("Fetches messages from Apache Kafka") + @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) + public class GetKafka extends AbstractProcessor { + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("ZooKeeper Connection String") + .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of : combinations. For example, host1:2181,host2:2181,host3:2188") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic to pull messages from") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() + .name("Zookeeper Commit Frequency") + .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("60 secs") + .build(); + public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() + .name("ZooKeeper Communications Timeout") + .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() + .name("Kafka Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " + + "concatenated together with the string placed between the content of each message. " + + "If the messages from Kafka should not be concatenated together, leave this value at 1.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .build(); + public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the " + + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " + + "this value will be placed in between them.") + .required(true) + .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string + .expressionLanguageSupported(false) + .defaultValue("\\n") + .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); + + + private final BlockingQueue> streamIterators = new LinkedBlockingQueue<>(); + private volatile ConsumerConnector consumer; + + final Lock interruptionLock = new ReentrantLock(); + // guarded by interruptionLock + private final Set interruptableThreads = new HashSet<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + + final List props = new ArrayList<>(); + props.add(ZOOKEEPER_CONNECTION_STRING); + props.add(TOPIC); + props.add(ZOOKEEPER_COMMIT_DELAY); + props.add(BATCH_SIZE); + props.add(MESSAGE_DEMARCATOR); + props.add(clientNameWithDefault); + props.add(KAFKA_TIMEOUT); + props.add(ZOOKEEPER_TIMEOUT); + return props; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(1); + relationships.add(REL_SUCCESS); + return relationships; + } + + @OnScheduled + public void createConsumers(final ProcessContext context) { + final String topic = context.getProperty(TOPIC).getValue(); + + final Map topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, context.getMaxConcurrentTasks()); + + final Properties props = new Properties(); + props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); + props.setProperty("group.id", getIdentifier()); + props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); + props.setProperty("auto.commit.enable", "true"); // just be explicit + props.setProperty("auto.offset.reset", "smallest"); + + final ConsumerConfig consumerConfig = new ConsumerConfig(props); + consumer = Consumer.createJavaConsumerConnector(consumerConfig); + + final Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + final List> streams = consumerMap.get(topic); + + this.streamIterators.clear(); + + for ( final KafkaStream stream : streams ) { + streamIterators.add(stream.iterator()); + } + } + + @OnStopped + public void shutdownConsumer() { + if ( consumer != null ) { + try { + consumer.commitOffsets(); + } finally { + consumer.shutdown(); + } + } + } + + @OnUnscheduled + public void interruptIterators() { + // Kafka doesn't provide a non-blocking API for pulling messages. We can, however, + // interrupt the Threads. We do this when the Processor is stopped so that we have the + // ability to shutdown the Processor. + interruptionLock.lock(); + try { + for ( final Thread t : interruptableThreads ) { + t.interrupt(); + } + + interruptableThreads.clear(); + } finally { + interruptionLock.unlock(); + } + } + + protected ConsumerIterator getStreamIterator() { + return streamIterators.poll(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + ConsumerIterator iterator = getStreamIterator(); + if ( iterator == null ) { + return; + } + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + final String topic = context.getProperty(TOPIC).getValue(); + + FlowFile flowFile = null; + try { + // add the current thread to the Set of those to be interrupted if processor stopped. + interruptionLock.lock(); + try { + interruptableThreads.add(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + final long start = System.nanoTime(); + flowFile = session.create(); + + final Map attributes = new HashMap<>(); + attributes.put("kafka.topic", topic); + + int numMessages = 0; + for (int msgCount = 0; msgCount < batchSize; msgCount++) { + // if the processor is stopped, iterator.hasNext() will throw an Exception. + // In this case, we just break out of the loop. + try { + if ( !iterator.hasNext() ) { + break; + } + } catch (final Exception e) { + break; + } + + final MessageAndMetadata mam = iterator.next(); + if ( mam == null ) { + return; + } + + final byte[] key = mam.key(); + + if ( batchSize == 1 ) { + // the kafka.key, kafka.offset, and kafka.partition attributes are added only + // for a batch size of 1. + if ( key != null ) { + attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + } + + attributes.put("kafka.offset", String.valueOf(mam.offset())); + attributes.put("kafka.partition", String.valueOf(mam.partition())); + } + + // add the message to the FlowFile's contents + final boolean firstMessage = (msgCount == 0); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if ( !firstMessage ) { + out.write(demarcatorBytes); + } + out.write(mam.message()); + } + }); + numMessages++; + } + + // If we received no messages, remove the FlowFile. Otherwise, send to success. + if ( flowFile.getSize() == 0L ) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis); + getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis}); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (final Exception e) { + getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e}); + if ( flowFile != null ) { + session.remove(flowFile); + } + } finally { + // Remove the current thread from the Set of Threads to interrupt. + interruptionLock.lock(); + try { + interruptableThreads.remove(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + // Add the iterator back to the queue + if ( iterator != null ) { + streamIterators.offer(iterator); + } + } + } + + }