cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject cassandra git commit: Correctly close netty channels when a stream session ends
Date Fri, 29 Sep 2017 16:34:32 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e296ff063 -> ebefc96a8


Correctly close netty channels when a stream session ends

patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebefc96a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebefc96a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebefc96a

Branch: refs/heads/trunk
Commit: ebefc96a8fe63aca5f324984f7f3147f10218643
Parents: e296ff0
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Mon Sep 25 15:39:17 2017 -0700
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Fri Sep 29 09:33:08 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../async/NettyStreamingMessageSender.java      | 24 ++++++++++++--------
 .../org/apache/cassandra/utils/FBUtilities.java | 17 +++++++++++++-
 3 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ea73fcd..99b5a59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
  * Update lz4 to 1.4.0 (CASSANDRA-13741)
  * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
  * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index f872005..0b38760 100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -21,7 +21,9 @@ package org.apache.cassandra.streaming.async;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -106,10 +108,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
     private final ThreadPoolExecutor fileTransferExecutor;
 
     /**
-     * A {@link ThreadLocal} used by the threads in {@link #fileTransferExecutor} to stash
references to constructed
-     * and connected {@link Channel}s.
+     * A mapping of each {@link #fileTransferExecutor} thread to a channel that can be written
to (on that thread).
      */
-    private final ConcurrentMap<Thread, Channel> threadLocalChannel = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Thread, Channel> threadToChannelMap = new ConcurrentHashMap<>();
 
     /**
      * A netty channel attribute used to indicate if a channel is currently transferring
a file. This is primarily used
@@ -373,12 +374,12 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
             Thread currentThread = Thread.currentThread();
             try
             {
-                Channel channel = threadLocalChannel.get(currentThread);
+                Channel channel = threadToChannelMap.get(currentThread);
                 if (channel != null)
                     return channel;
 
                 channel = createChannel();
-                threadLocalChannel.put(currentThread, channel);
+                threadToChannelMap.put(currentThread, channel);
                 return channel;
             }
             catch (Exception e)
@@ -393,10 +394,10 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
         void injectChannel(Channel channel)
         {
             Thread currentThread = Thread.currentThread();
-            if (threadLocalChannel.get(currentThread) != null)
+            if (threadToChannelMap.get(currentThread) != null)
                 throw new IllegalStateException("previous channel already set");
 
-            threadLocalChannel.put(currentThread, channel);
+            threadToChannelMap.put(currentThread, channel);
         }
 
         /**
@@ -404,7 +405,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
          */
         void unsetChannel()
         {
-            threadLocalChannel.remove(Thread.currentThread());
+            threadToChannelMap.remove(Thread.currentThread());
         }
     }
 
@@ -498,8 +499,11 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
         channelKeepAlives.stream().map(scheduledFuture -> scheduledFuture.cancel(false));
         channelKeepAlives.clear();
 
-        threadLocalChannel.values().stream().map(channel -> channel.close());
-        threadLocalChannel.clear();
+        List<Future<Void>> futures = new ArrayList<>(threadToChannelMap.size());
+        for (Channel channel : threadToChannelMap.values())
+            futures.add(channel.close());
+        FBUtilities.waitOnFutures(futures, 10 * 1000);
+        threadToChannelMap.clear();
         fileTransferExecutor.shutdownNow();
 
         if (controlMessageChannel != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 319512d..f45a1ab 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -383,13 +383,28 @@ public class FBUtilities
 
     public static <T> List<T> waitOnFutures(Iterable<? extends Future<?
extends T>> futures)
     {
+        return waitOnFutures(futures, -1);
+    }
+
+    /**
+     * Block for a collection of futures, with an optional timeout for each future.
+     *
+     * @param futures
+     * @param ms The number of milliseconds to wait on each future. If this value is less
than or equal to zero,
+     *           no tiemout value will be passed to {@link Future#get()}.
+     */
+    public static <T> List<T> waitOnFutures(Iterable<? extends Future<?
extends T>> futures, long ms)
+    {
         List<T> results = new ArrayList<>();
         Throwable fail = null;
         for (Future<? extends T> f : futures)
         {
             try
             {
-                results.add(f.get());
+                if (ms <= 0)
+                    results.add(f.get());
+                else
+                    results.add(f.get(ms, TimeUnit.MILLISECONDS));
             }
             catch (Throwable t)
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message