cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject git commit: Avoid flushing on request threads. causes netty to over cache buffers and OOM
Date Fri, 15 Aug 2014 21:15:57 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1.0 8137fce52 -> f2d60577c


Avoid flushing on request threads. causes netty to over cache buffers and OOM

patch by benedict; reviewed by tjake for (CASSANDRA-7743)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2d60577
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2d60577
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2d60577

Branch: refs/heads/cassandra-2.1.0
Commit: f2d60577c3944affc8c3beefb07556c005cab185
Parents: 8137fce
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Thu Aug 14 13:39:09 2014 +0700
Committer: Jake Luciani <jake@apache.org>
Committed: Fri Aug 15 17:15:36 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/transport/Message.java | 23 +++++++++++---------
 2 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2d60577/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8714265..eeb115f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc6
+ * Fix OOM issue from netty caching over time (CASSANDRA-7743)
  * json2sstable couldn't import JSON for CQL table (CASSANDRA-7477)
  * Invalidate all caches on table drop (CASSANDRA-7561)
  * Skip strict endpoint selection for ranges if RF == nodes (CASSANRA-7765)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2d60577/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 9a89454..163fffb 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -128,7 +128,7 @@ public abstract class Message
     public final Type type;
     protected Connection connection;
     private int streamId;
-    private Frame sourceFrame = null;
+    private Frame sourceFrame;
 
     protected Message(Type type)
     {
@@ -321,10 +321,12 @@ public abstract class Message
         private static class FlushItem
         {
             final ChannelHandlerContext ctx;
-            final Response response;
-            private FlushItem(ChannelHandlerContext ctx, Response response)
+            final Object response;
+            final Frame sourceFrame;
+            private FlushItem(ChannelHandlerContext ctx, Object response, Frame sourceFrame)
             {
                 this.ctx = ctx;
+                this.sourceFrame = sourceFrame;
                 this.response = response;
             }
         }
@@ -369,7 +371,7 @@ public abstract class Message
                     for (ChannelHandlerContext channel : channels)
                         channel.flush();
                     for (FlushItem item : flushed)
-                        item.response.getSourceFrame().release();
+                        item.sourceFrame.release();
 
                     channels.clear();
                     flushed.clear();
@@ -420,20 +422,21 @@ public abstract class Message
                 response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
                 response.attach(connection);
-                response.setSourceFrame(request.getSourceFrame());
                 connection.applyStateTransition(request.type, response.type);
             }
             catch (Throwable ex)
             {
-                request.getSourceFrame().release();
-                // Don't let the exception propagate to exceptionCaught() if we can help
it so that we can assign the right streamID.
-                ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()),
ctx.voidPromise());
+                flush(new FlushItem(ctx, ErrorMessage.fromException(ex).setStreamId(request.getStreamId()),
request.getSourceFrame()));
                 return;
             }
 
             logger.debug("Responding: {}, v={}", response, connection.getVersion());
+            flush(new FlushItem(ctx, response, request.getSourceFrame()));
+        }
 
-            EventLoop loop = ctx.channel().eventLoop();
+        private void flush(FlushItem item)
+        {
+            EventLoop loop = item.ctx.channel().eventLoop();
             Flusher flusher = flusherLookup.get(loop);
             if (flusher == null)
             {
@@ -442,7 +445,7 @@ public abstract class Message
                     flusher = alt;
             }
 
-            flusher.queued.add(new FlushItem(ctx, response));
+            flusher.queued.add(item);
             flusher.start();
         }
 


Mime
View raw message