nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [1/2] incubator-nifi git commit: NIFI-201: Supporting arbitrary HTTP headers as flow file attributes in ListenHTTP
Date Sat, 03 Jan 2015 06:52:13 GMT
Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 9e35c7da9 -> 5b4211d9a


NIFI-201: Supporting arbitrary HTTP headers as flow file attributes in ListenHTTP


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b233eaf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b233eaf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b233eaf2

Branch: refs/heads/develop
Commit: b233eaf20d36cd17de9d79a36a4fe3bafe8f99ba
Parents: ac3c3bb
Author: gresockj <jgresock@gmail.com>
Authored: Fri Dec 26 22:41:12 2014 -0500
Committer: gresockj <jgresock@gmail.com>
Committed: Sat Dec 27 05:50:31 2014 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/ListenHTTP.java   | 11 +++++++++++
 .../standard/servlets/ListenHTTPServlet.java          | 14 +++++++++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b233eaf2/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 5e7ce56..2b0b437 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@ -106,12 +106,19 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
+    public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
+		    .name("HTTP Headers to receive as Attributes (Regex)")
+		    .description("Specifies the Regular Expression that determines the names of HTTP Headers
that should be passed along as FlowFile attributes")
+		    .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+		    .required(false)
+		    .build();
 
     public static final String URI = "/contentListener";
     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder";
     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern";
+    public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern";
     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler";
 
@@ -131,6 +138,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(AUTHORIZED_DN_PATTERN);
         descriptors.add(MAX_UNCONFIRMED_TIME);
+        descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
         this.properties = Collections.unmodifiableList(descriptors);
     }
 
@@ -236,6 +244,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor {
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler);
 
+        if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
+        	contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
+        }
         server.start();
 
         this.server = server;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b233eaf2/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
----------------------------------------------------------------------
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index fb52b80..cae61f0 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.cert.X509Certificate;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -56,7 +57,6 @@ import org.apache.nifi.util.FlowFileUnpackager;
 import org.apache.nifi.util.FlowFileUnpackagerV1;
 import org.apache.nifi.util.FlowFileUnpackagerV2;
 import org.apache.nifi.util.FlowFileUnpackagerV3;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
@@ -88,6 +88,7 @@ public class ListenHTTPServlet extends HttpServlet {
     private ProcessorLog logger;
     private AtomicReference<ProcessSessionFactory> sessionFactoryHolder;
     private Pattern authorizedPattern;
+    private Pattern headerPattern;
     private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap;
     private StreamThrottler streamThrottler;
 
@@ -103,6 +104,7 @@ public class ListenHTTPServlet extends HttpServlet {
         this.logger = (ProcessorLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER);
         this.sessionFactoryHolder = (AtomicReference<ProcessSessionFactory>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER);
         this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN);
+        this.headerPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_HEADER_PATTERN);
         this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP);
         this.streamThrottler = (StreamThrottler) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_STREAM_THROTTLER);
     }
@@ -242,6 +244,16 @@ public class ListenHTTPServlet extends HttpServlet {
                 if (StringUtils.isNotBlank(nameVal)) {
                     attributes.put(CoreAttributes.FILENAME.key(), nameVal);
                 }
+                
+                // put arbitrary headers on flow file
+                for(Enumeration<String> headerEnum = request.getHeaderNames(); 
+                		headerEnum.hasMoreElements(); ) {
+                	String headerName = headerEnum.nextElement();
+                	if (headerPattern != null && headerPattern.matcher(headerName).matches())
{
+	                	String headerValue = request.getHeader(headerName);
+	                	attributes.put(headerName, headerValue);
+	                }
+                }
 
                 String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
                 if (sourceSystemFlowFileIdentifier != null) {


Mime
View raw message