camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: CAMEL-7884: camel-netty4-http should do a defensive copy of the netty bytebuf if the async routing engine kicks in, so any further processing can still read the stream of data. Netty http server will otherwise have closed the orig
Date Tue, 03 May 2016 12:29:04 GMT
CAMEL-7884: camel-netty4-http should do a defensive copy of the netty bytebuf if the async
routing engine kicks in, so any further processing can still read the stream of data. Netty
http server will otherwise have closed the original bytebuf.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7ecbafc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7ecbafc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7ecbafc

Branch: refs/heads/camel-2.17.x
Commit: a7ecbafc02b10bfac3fb99f21bebb407dd96a645
Parents: 6da2fd4
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue May 3 14:28:14 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue May 3 14:28:54 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    |  2 +
 .../http/NettyChannelBufferStreamCache.java     |  7 ++-
 ...ttyChannelBufferStreamCacheOnCompletion.java | 53 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index 552f5de..fe87f53 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -95,6 +95,8 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable
{
         } else {
             // turn the body into stream cached
             NettyChannelBufferStreamCache cache = new NettyChannelBufferStreamCache(request.content());
+            // add on completion to the cache which is needed for Camel to keep track of
the lifecycle of the cache
+            exchange.addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache));
             answer.setBody(cache);
         }
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index 70635f0..f92fc60 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,7 +32,7 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream implements StreamCache
{
 
-    private final ByteBuf buffer;
+    private ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
         this.buffer = buffer;
@@ -101,4 +101,9 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
     public long length() {
         return buffer.readableBytes();
     }
+
+    void defensiveCopyBuffer() {
+        // make a defensive copy of the buffer
+        this.buffer = buffer.copy();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
new file mode 100644
index 0000000..0cc51a4
--- /dev/null
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of work on the
current {@link Exchange}
+ * that has the {@link NettyChannelBufferStreamCache} as message body. This cache is wrapping
the raw original
+ * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server ({@link HttpServerChannelHandler})
will
+ * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing the HttpMessage,
then any further
+ * access to the cache will cause in a buffer unreadable. In the case of Camel async routing
engine will
+ * handover the processing of the {@link Exchange} to another thread, then we need to keep
track of this event
+ * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} so Camel is
able to read
+ * the content from other threads, while Netty has closed the original {@link io.netty.buffer.ByteBuf}.
+ */
+public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAdapter {
+
+    private final NettyChannelBufferStreamCache cache;
+
+    public NettyChannelBufferStreamCacheOnCompletion(NettyChannelBufferStreamCache cache)
{
+        this.cache = cache;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        // okay netty is no longer being active, so we need to signal to the cache that its
to preserve the buffer if still in need.
+        cache.defensiveCopyBuffer();
+    }
+
+    @Override
+    public boolean allowHandover() {
+        // do not allow handover, so we can do the defensive copy in the onDone method
+        return false;
+    }
+
+}


Mime
View raw message