cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1210748 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java src/java/org/apache/cassandra/net/MessagingService.java src/java/org/apache/cassandra/streaming/FileStreamTask.java
Date Tue, 06 Dec 2011 02:11:50 GMT
Author: jbellis
Date: Tue Dec  6 02:11:50 2011
New Revision: 1210748

URL: http://svn.apache.org/viewvc?rev=1210748&view=rev
Log:
multithreaded streaming
patch by Peter Schuller; reviewed by yukim for CASSANDRA-3494

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1210748&r1=1210747&r2=1210748&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Dec  6 02:11:50 2011
@@ -1,4 +1,5 @@
 1.1-dev
+ * multithreaded streaming (CASSANDRA-3494)
  * removed in-tree redhat spec (CASSANDRA-3567)
  * "defragment" rows for name-based queries under STCS, again (CASSANDRA-2503)
  * Recycle commitlog segments for improved performance 

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1210748&r1=1210747&r2=1210748&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Tue Dec  6 02:11:50 2011
@@ -86,9 +86,9 @@ public class DebuggableThreadPoolExecuto
         this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
     }
 
-    protected DebuggableThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
     {
-        super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
         allowCoreThreadTimeOut(true);
 
         // block task submissions until queue has room.

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1210748&r1=1210747&r2=1210748&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Dec  6 02:11:50
2011
@@ -27,14 +27,20 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,8 +87,22 @@ public final class MessagingService impl
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
-    /* Thread pool to handle messaging write activities */
-    private final DebuggableThreadPoolExecutor streamExecutor_;
+    /** One executor per destination InetAddress for streaming.
+     *
+     * See CASSANDRA-3494 for the background. We have streaming in place so we do not want
to limit ourselves to
+     * one stream at a time for throttling reasons. But, we also do not want to just arbitrarily
stream an unlimited
+     * amount of files at once because a single destination might have hundreds of files
pending and it would cause a
+     * seek storm. So, transfer exactly one file per destination host. That puts a very natural
rate limit on it, in
+     * addition to mapping well to the expected behavior in many cases.
+     *
+     * We will create our stream executors with a core size of 0 so that they time out and
do not consume threads. This
+     * means the overhead in the degenerate case of having streamed to everyone in the ring
over time as a ring changes,
+     * is not going to be a thread per node - but rather an instance per node. That's totally
fine.
+     */
+    private final HashMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors
= new HashMap<InetAddress, DebuggableThreadPoolExecutor>();
+    /** Very rarely acquired lock protecting streamExecutors. */
+    private final Lock streamExecutorsLock = new ReentrantLock();
+    private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0);
 
     private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_
= new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
 
@@ -137,7 +157,6 @@ public final class MessagingService impl
 
         listenGate = new SimpleCondition();
         verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class);
-        streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", Thread.MIN_PRIORITY);
         Runnable logDropped = new Runnable()
         {
             public void run()
@@ -449,14 +468,40 @@ public final class MessagingService impl
 
     public void stream(StreamHeader header, InetAddress to)
     {
-        /* Streaming asynchronously on streamExector_ threads. */
-        streamExecutor_.execute(new FileStreamTask(header, to));
+        this.streamExecutorsLock.lock();
+        try
+        {
+            if (!streamExecutors.containsKey(to))
+            {
+                // Using a core pool size of 0 is important. See documentation of streamExecutors.
+                streamExecutors.put(to, new DebuggableThreadPoolExecutor(0, 1, 1, TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<Runnable>(),
+                        new NamedThreadFactory("Streaming to " + to)));
+            }
+            DebuggableThreadPoolExecutor executor = streamExecutors.get(to);
+
+            executor.execute(new FileStreamTask(header, to));
+        }
+        finally
+        {
+            this.streamExecutorsLock.unlock();
+        }
+    }
+
+    public void incrementActiveStreamsOutbound()
+    {
+        activeStreamsOutbound.incrementAndGet();
+    }
+
+    public void decrementActiveStreamsOutbound()
+    {
+        activeStreamsOutbound.decrementAndGet();
     }
 
     /** The count of active outbound stream tasks. */
     public int getActiveStreamsOutbound()
     {
-        return streamExecutor_.getActiveCount();
+        return activeStreamsOutbound.get();
     }
 
     public void register(ILatencySubscriber subcriber)
@@ -464,14 +509,44 @@ public final class MessagingService impl
         subscribers.add(subcriber);
     }
 
-    public void waitForStreaming() throws InterruptedException
+    public void clearCallbacksUnsafe()
     {
-        streamExecutor_.awaitTermination(24, TimeUnit.HOURS);
+        callbacks.clear();
     }
 
-    public void clearCallbacksUnsafe()
+    public void waitForStreaming() throws InterruptedException
     {
-        callbacks.clear();
+        while (true)
+        {
+            boolean stillWaiting = false;
+
+            streamExecutorsLock.lock();
+            try
+            {
+                for (DebuggableThreadPoolExecutor e : streamExecutors.values())
+                {
+                    if (!e.isTerminated())
+                    {
+                        stillWaiting = true;
+                        break;
+                    }
+                }
+            }
+            finally
+            {
+                streamExecutorsLock.unlock();
+            }
+            if (stillWaiting)
+            {
+                // Up to a second of unneeded delay is acceptable, relative to the amount
of time a typical stream
+                // takes.
+                Thread.sleep(1000);
+            }
+            else
+            {
+                break;
+            }
+        }
     }
 
     public void shutdown()
@@ -490,7 +565,20 @@ public final class MessagingService impl
             throw new IOError(e);
         }
 
-        streamExecutor_.shutdown();
+        streamExecutorsLock.lock();
+        try
+        {
+            for (DebuggableThreadPoolExecutor e : streamExecutors.values())
+            {
+                e.shutdown();
+            }
+        }
+        finally
+        {
+            streamExecutorsLock.unlock();
+        }
+
+        callbacks.shutdown();
 
         logger_.info("Waiting for in-progress requests to complete");
         callbacks.shutdown();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1210748&r1=1210747&r2=1210748&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Tue Dec  6
02:11:50 2011
@@ -138,6 +138,7 @@ public class FileStreamTask extends Wrap
         // setting up data compression stream
         compressedoutput = new LZFOutputStream(output);
 
+        MessagingService.instance().incrementActiveStreamsOutbound();
         try
         {
             // stream each of the required sections of the file
@@ -170,6 +171,8 @@ public class FileStreamTask extends Wrap
         }
         finally
         {
+            MessagingService.instance().decrementActiveStreamsOutbound();
+
             // no matter what happens close file
             FileUtils.closeQuietly(file);
         }



Mime
View raw message