nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marka...@apache.org
Subject [4/5] incubator-nifi git commit: NIFI-221: Finished initial implementation of http procs
Date Sun, 01 Mar 2015 20:14:24 GMT
NIFI-221: Finished initial implementation of http procs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7c990541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7c990541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7c990541

Branch: refs/heads/develop
Commit: 7c99054183428b0a696cdf16be46267089f9faf4
Parents: c53b0f9
Author: Mark Payne <markap14@hotmail.com>
Authored: Sun Mar 1 14:31:26 2015 -0500
Committer: Mark Payne <markap14@hotmail.com>
Committed: Sun Mar 1 14:31:26 2015 -0500

----------------------------------------------------------------------
 .../processors/standard/HandleHttpRequest.java  |  66 +++++--
 .../standard/TestHandleHttpRequest.java         | 127 ++++++++++++++
 .../standard/TestHandleHttpResponse.java        | 172 +++++++++++++++++++
 3 files changed, 353 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 98b5fdd..e72df7d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLDecoder;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -73,6 +75,7 @@ import com.sun.jersey.api.client.ClientResponse.Status;
 @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request,
creates a FlowFile and transfers to 'success'. This Processor is designed to be used in conjunction
with the HandleHttpResponse Processor in order to create a Web Service")
 public class HandleHttpRequest extends AbstractProcessor {
     public static final String HTTP_CONTEXT_ID = "http.context.identifier";
+    private static final Pattern URL_QUERY_PARAM_DELIMITER = Pattern.compile("&");
     
     // Allowable values for client auth
     public static final AllowableValue CLIENT_NONE = new AllowableValue("No Authentication",
"Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
@@ -107,6 +110,13 @@ public class HandleHttpRequest extends AbstractProcessor {
         .required(false)
         .identifiesControllerService(SSLContextService.class)
         .build();
+    public static final PropertyDescriptor URL_CHARACTER_SET = new PropertyDescriptor.Builder()
+        .name("URL Character Set")
+        .description("The character set to use for decoding URL parameters")
+        .required(true)
+        .defaultValue("UTF-8")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PATH_REGEX = new PropertyDescriptor.Builder()
         .name("Allowed Paths")
         .description("A Regular Expression that specifies the valid HTTP Paths that are allowed
in the incoming URL Requests. If this value is specified and the path of the HTTP Requests
does not match this Regular Expression, the Processor will respond with a 404: NotFound")
@@ -189,6 +199,7 @@ public class HandleHttpRequest extends AbstractProcessor {
         descriptors.add(SSL_CONTEXT);
         descriptors.add(HTTP_CONTEXT_MAP);
         descriptors.add(PATH_REGEX);
+        descriptors.add(URL_CHARACTER_SET);
         descriptors.add(ALLOW_GET);
         descriptors.add(ALLOW_POST);
         descriptors.add(ALLOW_PUT);
@@ -360,7 +371,7 @@ public class HandleHttpRequest extends AbstractProcessor {
     protected int getPort() {
         for ( final Connector connector : server.getConnectors() ) {
             if ( connector instanceof ServerConnector ) {
-                return ((ServerConnector) connector).getPort();
+                return ((ServerConnector) connector).getLocalPort();
             }
         }
         
@@ -421,19 +432,50 @@ public class HandleHttpRequest extends AbstractProcessor {
             return;
         }
         
+        final String charset = context.getProperty(URL_CHARACTER_SET).getValue();
+        
         final String contextIdentifier = UUID.randomUUID().toString();
         final Map<String, String> attributes = new HashMap<>();
-        putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
-        putAttribute(attributes, "mime.type", request.getContentType());
-        putAttribute(attributes, "http.servlet.path", request.getServletPath());
-        putAttribute(attributes, "http.context.path", request.getContextPath());
-        putAttribute(attributes, "http.method", request.getMethod());
-        putAttribute(attributes, "http.query.string", request.getQueryString());
-        putAttribute(attributes, "http.remote.host", request.getRemoteHost());
-        putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
-        putAttribute(attributes, "http.remote.user", request.getRemoteUser());
-        putAttribute(attributes, "http.request.uri", request.getRequestURI());
-        putAttribute(attributes, "http.auth.type", request.getAuthType());
+        try {
+            putAttribute(attributes, HTTP_CONTEXT_ID, contextIdentifier);
+            putAttribute(attributes, "mime.type", request.getContentType());
+            putAttribute(attributes, "http.servlet.path", request.getServletPath());
+            putAttribute(attributes, "http.context.path", request.getContextPath());
+            putAttribute(attributes, "http.method", request.getMethod());
+            if ( request.getQueryString() != null ) {
+                putAttribute(attributes, "http.query.string", URLDecoder.decode(request.getQueryString(),
charset));
+            }
+            putAttribute(attributes, "http.remote.host", request.getRemoteHost());
+            putAttribute(attributes, "http.remote.addr", request.getRemoteAddr());
+            putAttribute(attributes, "http.remote.user", request.getRemoteUser());
+            putAttribute(attributes, "http.request.uri", request.getRequestURI());
+            putAttribute(attributes, "http.auth.type", request.getAuthType());
+            
+            final String queryString = request.getQueryString();
+            if ( queryString != null ) {
+                final String[] params = URL_QUERY_PARAM_DELIMITER.split(queryString);
+                for ( final String keyValueString : params ) {
+                    final int indexOf = keyValueString.indexOf("=");
+                    if ( indexOf < 0 ) {
+                        // no =, then it's just a key with no value
+                        attributes.put("http.query.param." + URLDecoder.decode(keyValueString,
charset), "");
+                    } else {
+                        final String key = keyValueString.substring(0, indexOf);
+                        final String value;
+                        
+                        if ( indexOf == keyValueString.length() - 1 ) {
+                            value = "";
+                        } else {
+                            value = keyValueString.substring(indexOf + 1);
+                        }
+                        
+                        attributes.put("http.query.param." + URLDecoder.decode(key, charset),
URLDecoder.decode(value, charset));
+                    }
+                }
+            }
+        } catch (final UnsupportedEncodingException uee) {
+            throw new ProcessException("Invalid character encoding", uee);  // won't happen
because charset has been validated
+        }
         
         final Enumeration<String> headerNames = request.getHeaderNames();
         while ( headerNames.hasMoreElements() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
new file mode 100644
index 0000000..85f35e2
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpRequest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHandleHttpRequest {
+
+    @Test
+    public void testRequestAddedToService() throws InitializationException, MalformedURLException,
IOException {
+        final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
+        runner.setProperty(HandleHttpRequest.PORT, "0");
+        
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
+        
+        // trigger processor to stop but not shutdown.
+        runner.run(1, false);
+        try {
+            final Thread httpThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        final int port = ((HandleHttpRequest) runner.getProcessor()).getPort();
+                        final HttpURLConnection connection = (HttpURLConnection) new URL("http://localhost:"
+ port + "/my/path?query=true&value1=value1&value2=&value3&value4=apple=orange").openConnection();
+                        connection.setDoOutput(false);
+                        connection.setRequestMethod("GET");
+                        connection.setRequestProperty("header1", "value1");
+                        connection.setRequestProperty("header2", "");
+                        connection.setRequestProperty("header3", "apple=orange");
+                        connection.setConnectTimeout(3000);
+                        connection.setReadTimeout(3000);
+                        
+                        StreamUtils.copy(connection.getInputStream(), new NullOutputStream());
+                    } catch (final Throwable t) {
+                        t.printStackTrace();
+                        Assert.fail(t.toString());
+                    }
+                }
+            });
+            httpThread.start();
+            
+            try { Thread.sleep(100L); } catch (final InterruptedException ie) {}
+            
+            // process the request.
+            runner.run(1, false);
+            
+            runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
+            assertEquals(1, contextMap.size());
+            
+            final MockFlowFile mff = runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).get(0);
+            mff.assertAttributeEquals("http.query.param.query", "true");
+            mff.assertAttributeEquals("http.query.param.value1", "value1");
+            mff.assertAttributeEquals("http.query.param.value2", "");
+            mff.assertAttributeEquals("http.query.param.value3", "");
+            mff.assertAttributeEquals("http.query.param.value4", "apple=orange");
+            mff.assertAttributeEquals("http.headers.header1", "value1");
+            mff.assertAttributeEquals("http.headers.header3", "apple=orange");
+        } finally {
+            // shut down the server
+            runner.run(1, true);
+        }
+    }
+    
+    
+    private static class MockHttpContextMap extends AbstractControllerService implements
HttpContextMap {
+        private final ConcurrentMap<String, HttpServletResponse> responseMap = new
ConcurrentHashMap<>();
+        
+        @Override
+        public boolean register(String identifier, HttpServletRequest request, HttpServletResponse
response, AsyncContext context) {
+            responseMap.put(identifier, response);
+            return true;
+        }
+
+        @Override
+        public HttpServletResponse getResponse(String identifier) {
+            return responseMap.get(identifier);
+        }
+
+        @Override
+        public void complete(String identifier) {
+            responseMap.remove(identifier);
+        }
+        
+        public int size() {
+            return responseMap.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7c990541/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
new file mode 100644
index 0000000..7b41809
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.http.HttpContextMap;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestHandleHttpResponse {
+
+    @Test
+    public void testEnsureCompleted() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
+        
+        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id");
+        runner.addControllerService("http-context-map", contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
+        runner.setProperty("my-attr", "${my-attr}");
+        runner.setProperty("no-valid-attr", "${no-valid-attr}");
+        
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(HandleHttpResponse.HTTP_CONTEXT_ID, "my-id");
+        attributes.put("my-attr", "hello");
+        attributes.put("status.code", "201");
+        
+        runner.enqueue("hello".getBytes(), attributes);
+        
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
+        
+        assertEquals("hello", contextMap.baos.toString());
+        assertEquals("hello", contextMap.headersSent.get("my-attr"));
+        assertNull(contextMap.headersSent.get("no-valid-attr"));
+        assertEquals(201, contextMap.statusCode);
+        assertEquals(1, contextMap.getCompletionCount());
+        assertTrue(contextMap.headersWithNoValue.isEmpty());
+    }
+    
+    
+    private static class MockHttpContextMap extends AbstractControllerService implements
HttpContextMap {
+        private final String id;
+        private final AtomicInteger completedCount = new AtomicInteger(0);
+        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
+        private volatile int statusCode = -1;
+        
+        private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
+        
+        public MockHttpContextMap(final String expectedIdentifier) {
+            this.id = expectedIdentifier;
+        }
+        
+        @Override
+        public boolean register(String identifier, HttpServletRequest request, HttpServletResponse
response, AsyncContext context) {
+            return true;
+        }
+
+        @Override
+        public HttpServletResponse getResponse(final String identifier) {
+            if ( !id.equals(identifier) ) {
+                Assert.fail("attempting to respond to wrong request; should have been " +
id + " but was " + identifier);
+            }
+            
+            try {
+                final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+                Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream()
{
+                    @Override
+                    public boolean isReady() { return true; }
+
+                    @Override
+                    public void setWriteListener(WriteListener writeListener) {}
+
+                    @Override
+                    public void write(int b) throws IOException { baos.write(b); }
+                    
+                    @Override
+                    public void write(byte[] b) throws IOException { baos.write(b); }
+                    
+                    @Override
+                    public void write(byte[] b, int off, int len) throws IOException { baos.write(b,
off, len); }
+                });
+                
+                
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) throws Throwable
{
+                        final String key = invocation.getArgumentAt(0, String.class);
+                        final String value = invocation.getArgumentAt(1, String.class);
+                        if ( value == null ) {
+                            headersWithNoValue.add(key);
+                        } else {
+                            headersSent.put(key, value);
+                        }
+                        
+                        return null;
+                    }
+                }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
+                
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) throws Throwable
{
+                        statusCode = invocation.getArgumentAt(0, int.class);
+                        return null;
+                    }
+                }).when(response).setStatus(Mockito.anyInt());
+                
+                return response;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+                return null;
+            }
+        }
+
+        @Override
+        public void complete(final String identifier) {
+            if ( !id.equals(identifier) ) {
+                Assert.fail("attempting to respond to wrong request; should have been " +
id + " but was " + identifier);
+            }
+            
+            completedCount.incrementAndGet();
+        }
+        
+        public int getCompletionCount() {
+            return completedCount.get();
+        }
+    }
+}


Mime
View raw message