cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/6] git commit: Split out outgoing stream throughput within a DC and inter-DC patch by Vijay and benedict for CASSANDRA-6596
Date Tue, 01 Jul 2014 03:49:11 GMT
Split out outgoing stream throughput within a DC and inter-DC
patch by Vijay and benedict for CASSANDRA-6596


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4112a7fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4112a7fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4112a7fa

Branch: refs/heads/cassandra-2.1.0
Commit: 4112a7fa279b340fb5f0f28f68ec0bf43f1479bf
Parents: 6396a35
Author: vparthasarathy <vijay2win@gmail.com>
Authored: Sat Feb 15 12:25:14 2014 -0800
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Jun 30 20:47:58 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  6 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++++
 .../cassandra/streaming/StreamManager.java      | 53 +++++++++++++++-----
 .../cassandra/streaming/StreamWriter.java       |  5 +-
 6 files changed, 62 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00fcce7..ff8a7a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Make sure high level sstables get compacted (CASSANDRA-7414)
  * Fix AssertionError when using empty clustering columns and static columns
    (CASSANDRA-7455)
+ * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
 
 
 2.0.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ea4d955..f067635 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -480,6 +480,12 @@ compaction_preheat_key_cache: true
 # When unset, the default is 200 Mbps or 25 MB/s.
 # stream_throughput_outbound_megabits_per_sec: 200
 
+# Throttles all streaming file transfer between the datacenters,
+# this setting allows users to throttle inter dc stream throughput in addition
+# to throttling all network stream traffic as configured with
+# stream_throughput_outbound_megabits_per_sec
+# inter_dc_stream_throughput_outbound_megabits_per_sec:
+
 # How long the coordinator should wait for read operations to complete
 read_request_timeout_in_ms: 5000
 # How long the coordinator should wait for seq or index scans to complete

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7a3185a..aab5025 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -129,6 +129,7 @@ public class Config
     public Integer max_streaming_retries = 3;
 
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
+    public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
 
     public String[] data_file_directories;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3905eba..badd975 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -914,6 +914,16 @@ public class DatabaseDescriptor
         conf.stream_throughput_outbound_megabits_per_sec = value;
     }
 
+    public static int getInterDCStreamThroughputOutboundMegabitsPerSec()
+    {
+        return conf.inter_dc_stream_throughput_outbound_megabits_per_sec;
+    }
+
+    public static void setInterDCStreamThroughputOutboundMegabitsPerSec(int value)
+    {
+        conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
+    }
+
     public static String[] getAllDataFileLocations()
     {
         return conf.data_file_directories;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 3fe6179..366f3ff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -32,8 +33,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.RateLimiter;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -47,25 +48,53 @@ public class StreamManager implements StreamManagerMBean
 {
     public static final StreamManager instance = new StreamManager();
 
-    private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
-
     /**
      * Gets streaming rate limiter.
      * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter
      * with the rate of Double.MAX_VALUE bytes per second.
      * Rate unit is bytes per sec.
      *
-     * @return RateLimiter with rate limit set
+     * @return StreamRateLimiter with rate limit set based on peer location.
      */
-    public static RateLimiter getRateLimiter()
+    public static StreamRateLimiter getRateLimiter(InetAddress peer)
     {
-        double currentThroughput = (((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec())
* 1024 * 1024 ) / 8;
-        // if throughput is set to 0, throttling is disabled
-        if (currentThroughput == 0)
-            currentThroughput = Double.MAX_VALUE;
-        if (limiter.getRate() != currentThroughput)
-            limiter.setRate(currentThroughput);
-        return limiter;
+        return new StreamRateLimiter(peer);
+    }
+
+    public static class StreamRateLimiter
+    {
+        private static final double ONE_MEGA_BIT = 1024 * 1024 * 8;
+        private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
+        private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
+        private final boolean isLocalDC;
+
+        public StreamRateLimiter(InetAddress peer)
+        {
+            double throughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec())
* ONE_MEGA_BIT;
+            mayUpdateThroughput(throughput, limiter);
+
+            double interDCThroughput = ((double) DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec())
* ONE_MEGA_BIT;
+            mayUpdateThroughput(interDCThroughput, interDCLimiter);
+
+            isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
+                        DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
+        }
+
+        private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
+        {
+            // if throughput is set to 0, throttling is disabled
+            if (limit == 0)
+                limit = Double.MAX_VALUE;
+            if (rateLimiter.getRate() != limit)
+                rateLimiter.setRate(limit);
+        }
+
+        public void acquire(int toTransfer)
+        {
+            limiter.acquire(toTransfer);
+            if (!isLocalDC)
+                interDCLimiter.acquire(toTransfer);
+        }
     }
 
     private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4112a7fa/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 5609f20..5a5163f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -24,7 +24,6 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 
-import com.google.common.util.concurrent.RateLimiter;
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -33,6 +32,7 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -44,7 +44,7 @@ public class StreamWriter
 
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
-    protected final RateLimiter limiter = StreamManager.getRateLimiter();
+    protected final StreamRateLimiter limiter;
     protected final StreamSession session;
 
     private OutputStream compressedOutput;
@@ -57,6 +57,7 @@ public class StreamWriter
         this.session = session;
         this.sstable = sstable;
         this.sections = sections;
+        this.limiter =  StreamManager.getRateLimiter(session.peer);
     }
 
     /**


Mime
View raw message