cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject cxf git commit: [CXF-6232] Refactor CXF's Atmosphere based WebSocket transport for more flexible and extensible handling
Date Thu, 19 Mar 2015 15:30:20 GMT
Repository: cxf
Updated Branches:
  refs/heads/3.0.x-fixes 450788a57 -> 426dbdc11


[CXF-6232] Refactor CXF's Atmosphere based WebSocket transport for more flexible and extensible
handling


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/426dbdc1
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/426dbdc1
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/426dbdc1

Branch: refs/heads/3.0.x-fixes
Commit: 426dbdc11a70a32a92c581054df0abbf5cf098a7
Parents: 450788a
Author: Akitoshi Yoshida <ay@apache.org>
Authored: Thu Mar 19 16:02:42 2015 +0100
Committer: Akitoshi Yoshida <ay@apache.org>
Committed: Thu Mar 19 16:29:46 2015 +0100

----------------------------------------------------------------------
 .../cxf/transport/websocket/WebSocketUtils.java |  25 +++--
 .../atmosphere/DefaultProtocolInterceptor.java  | 105 +++++++++++++++----
 .../DefaultProtocolInterceptorTest.java         |  91 ++++++++++++++++
 3 files changed, 193 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/426dbdc1/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index 5dbb930..adb621d 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -50,19 +50,24 @@ public final class WebSocketUtils {
      * - charset other than utf-8. (although i would have preferred iso-8859-1 ;-)
      * 
      * @param in the input stream
+     * @param req true if the input stream includes the request line
      * @return a map of name value pairs.
      * @throws IOException
      */
-    public static Map<String, String> readHeaders(InputStream in) throws IOException
{
+    public static Map<String, String> readHeaders(InputStream in, boolean req) throws
IOException {
         Map<String, String> headers = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
-        // read the request line
-        String line = readLine(in);
-        int del = line.indexOf(' ');
-        if (del < 0) {
-            throw new IOException("invalid request: " + line);
+        String line;
+        int del;
+        if (req) {
+            // read the request line
+            line = readLine(in);
+            del = line.indexOf(' ');
+            if (del < 0) {
+                throw new IOException("invalid request: " + line);
+            }
+            headers.put(METHOD_KEY, line.substring(0, del).trim());
+            headers.put(URI_KEY, line.substring(del + 1).trim());
         }
-        headers.put(METHOD_KEY, line.substring(0, del).trim());
-        headers.put(URI_KEY, line.substring(del + 1).trim());
         
         // read headers
         while ((line = readLine(in)) != null) {
@@ -79,6 +84,10 @@ public final class WebSocketUtils {
         return headers;
     }
 
+    public static Map<String, String> readHeaders(InputStream in) throws IOException
{
+        return readHeaders(in, true);
+    }
+
 
     /**
      * Read a line terminated by '\n' optionally preceded by '\r' from the 

http://git-wip-us.apache.org/repos/asf/cxf/blob/426dbdc1/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
index 7c4c6e5..2bc83b6 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Pattern;
 
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
@@ -39,6 +40,7 @@ import org.atmosphere.cpr.Action;
 import org.atmosphere.cpr.AsyncIOInterceptor;
 import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
 import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
 import org.atmosphere.cpr.AtmosphereInterceptorWriter;
@@ -48,6 +50,7 @@ import org.atmosphere.cpr.AtmosphereResponse;
 import org.atmosphere.cpr.FrameworkConfig;
 
 /**
+ * DefaultProtocolInterceptor provides the default CXF's WebSocket protocol that uses.
  * 
  */
 @AtmosphereInterceptorService
@@ -59,6 +62,44 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
 
     private final AsyncIOInterceptor interceptor = new Interceptor();
 
+    private Pattern includedheaders;
+    private Pattern excludedheaders;
+
+    @Override
+    public void configure(AtmosphereConfig config) {
+        super.configure(config);
+        String p = config.getInitParameter("org.apache.cxf.transport.websocket.atmosphere.transport.includedheaders");
+        if (p != null) {
+            includedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        p = config.getInitParameter("org.apache.cxf.transport.websocket.atmosphere.transport.excludedheaders");
+        if (p != null) {
+            excludedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+    }
+
+    public DefaultProtocolInterceptor includedheaders(String p) {
+        if (p != null) {
+            this.includedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        return this;
+    }
+
+    public void setIncludedheaders(Pattern includedheaders) {
+        this.includedheaders = includedheaders;
+    }
+
+    public DefaultProtocolInterceptor excludedheaders(String p) {
+        if (p != null) {
+            this.excludedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        return this;
+    }
+
+    public void setExcludedheaders(Pattern excludedheaders) {
+        this.excludedheaders = excludedheaders;
+    }
+
     @Override
     public Action inspect(final AtmosphereResource r) {
         LOG.log(Level.FINE, "inspect");
@@ -130,7 +171,15 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
         }
     }
 
-    private static AtmosphereRequest createAtmosphereRequest(AtmosphereRequest r, byte[]
data) throws IOException {
+    /**
+     * Creates a virtual request using the specified parent request and the actual data.
+     * 
+     * @param r
+     * @param data
+     * @return
+     * @throws IOException
+     */
+    protected AtmosphereRequest createAtmosphereRequest(AtmosphereRequest r, byte[] data)
throws IOException {
         AtmosphereRequest.Builder b = new AtmosphereRequest.Builder();
         ByteArrayInputStream in = new ByteArrayInputStream(data);
         Map<String, String> hdrs = WebSocketUtils.readHeaders(in);
@@ -161,6 +210,40 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
         return b.build();
     }
 
+    /**
+     * Creates a response data based on the specified payload.
+     * 
+     * @param response
+     * @param payload
+     * @param parent
+     * @return
+     */
+    protected byte[] createResponse(AtmosphereResponse response, byte[] payload, boolean
parent) {
+        AtmosphereRequest request = response.request();
+        String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+
+        Map<String, String> headers = new HashMap<String, String>();
+        if (refid != null) {
+            response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+            headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+        }
+        if (parent) {
+            // include the status code and content-type and those matched headers
+            headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+            if (payload != null && payload.length > 0) {
+                headers.put("Content-Type",  response.getContentType());
+            }
+            for (Map.Entry<String, String> hv : response.headers().entrySet()) {
+                if (!"Content-Type".equalsIgnoreCase(hv.getKey()) 
+                    && includedheaders != null && includedheaders.matcher(hv.getKey()).matches()
+                    && !(excludedheaders != null && excludedheaders.matcher(hv.getKey()).matches()))
{
+                    headers.put(hv.getKey(), hv.getValue());
+                }
+            }
+        }
+        return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
+    }
+
     private final class Interceptor extends AsyncIOInterceptorAdapter {
 
         @Override
@@ -188,26 +271,8 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter
{
         }
     }
 
-    private static byte[] createResponse(AtmosphereResponse response, byte[] payload, boolean
parent) {
-        AtmosphereRequest request = response.request();
-        String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
-
-        Map<String, String> headers = new HashMap<String, String>();
-        if (refid != null) {
-            response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
-            headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
-        }
-        if (parent) {
-            headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
-            if (payload != null && payload.length > 0) {
-                headers.put("Content-Type",  response.getContentType());
-            }
-        }
-        return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
-    }
-
     // a workaround to flush the header data upon close when no write operation occurs  
-    private static class WrappedAtmosphereResponse extends AtmosphereResponse {
+    private class WrappedAtmosphereResponse extends AtmosphereResponse {
         public WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req)
{
             super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req,
resp.isDestroyable());
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/426dbdc1/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
new file mode 100644
index 0000000..e6b70af
--- /dev/null
+++ b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class DefaultProtocolInterceptorTest extends Assert {
+
+    @Test
+    public void testCreateResponseWithHeadersFiltering() throws Exception {
+        DefaultProtocolInterceptor dpi = new DefaultProtocolInterceptor();
+        AtmosphereRequest request = AtmosphereRequest.newInstance();
+        AtmosphereResponse response = AtmosphereResponse.newInstance();
+        response.request(request);
+        String payload = "hello cxf";
+        String contentType = "text/plain";
+        response.headers().put("Content-Type", contentType);
+
+        byte[] transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType}, 
+                          payload, transformed);
+
+        response.headers().put("X-fruit", "peach");
+        response.headers().put("X-vegetable", "tomato");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType}, 
+                          payload, transformed);
+
+        dpi.includedheaders("X-f.*");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach"},

+                          payload, transformed);
+
+        dpi.includedheaders("X-.*");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach", "X-vegetable",
"tomato"}, 
+                          payload, transformed);
+
+        dpi.excludedheaders(".*able");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach"},

+                          payload, transformed);
+    }
+
+    private void verifyTransformed(String code, String[] headers, String body, byte[] transformed)
throws Exception {
+        InputStream in = new ByteArrayInputStream(transformed);
+        String c = WebSocketUtils.readLine(in);
+        Map<String, String> hs = WebSocketUtils.readHeaders(in, false);
+        byte[] b = WebSocketUtils.readBody(in);
+        assertEquals(code, c);
+        assertEquals(headers.length >> 1, hs.size());
+        for (int i = 0; i < headers.length; i += 2) {
+            assertEquals(headers[i + 1], hs.get(headers[i]));
+        }
+        assertEquals(body, new String(b));
+    }
+}


Mime
View raw message