cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject [1/5] git commit: p/4443/050_process_queued_xfers
Date Fri, 14 Sep 2012 15:16:18 GMT
Updated Branches:
  refs/heads/trunk a26eb3ef6 -> 398b1d263


p/4443/050_process_queued_xfers

Read queued token ranges transfers from a column family, and initiate
relocations.

Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/398b1d26
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/398b1d26
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/398b1d26

Branch: refs/heads/trunk
Commit: 398b1d263379a5b3ae080b3d985e00b032ca1ddf
Parents: d09b47c
Author: Eric Evans <eevans@apache.org>
Authored: Fri Sep 14 10:09:14 2012 -0500
Committer: Eric Evans <eevans@apache.org>
Committed: Fri Sep 14 10:18:08 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java    |    5 +
 .../org/apache/cassandra/config/KSMetaData.java    |    1 +
 .../apache/cassandra/cql3/UntypedResultSet.java    |    6 +
 src/java/org/apache/cassandra/db/SystemTable.java  |    1 +
 .../ScheduledRangeTransferExecutorService.java     |  122 +++++++++++++++
 .../apache/cassandra/service/StorageService.java   |   14 ++-
 .../cassandra/service/StorageServiceMBean.java     |    5 +
 7 files changed, 153 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index adbd853..bd194d1 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -200,6 +200,11 @@ public final class CFMetaData
                                                             + "data blob"
                                                             + ") WITH COMMENT='uncommited
batches' AND gc_grace_seconds=0");
 
+    public static final CFMetaData RangeXfersCf = compile(17, "CREATE TABLE " + SystemTable.RANGE_XFERS_CF
+ " ("
+                                                              + "token_bytes blob PRIMARY
KEY,"
+                                                              + "requested_at timestamp"
+                                                              + ") WITH COMMENT='ranges requested
for transfer here'");
+
     public enum Caching
     {
         ALL, KEYS_ONLY, ROWS_ONLY, NONE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/config/KSMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java
index 050e32f..c6df2f5 100644
--- a/src/java/org/apache/cassandra/config/KSMetaData.java
+++ b/src/java/org/apache/cassandra/config/KSMetaData.java
@@ -79,6 +79,7 @@ public final class KSMetaData
     public static KSMetaData systemKeyspace()
     {
         List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.BatchlogCF,
+                                                CFMetaData.RangeXfersCf,
                                                 CFMetaData.LocalCf,
                                                 CFMetaData.PeersCf,
                                                 CFMetaData.HintsCf,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index b8cb8d4..203e4c1 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -124,6 +125,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
             return UUIDType.instance.compose(data.get(column));
         }
 
+        public Date getTimestamp(String column)
+        {
+            return DateType.instance.compose(data.get(column));
+        }
+
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index a41dafb..7abccee 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -58,6 +58,7 @@ public class SystemTable
     public static final String INDEX_CF = "IndexInfo";
     public static final String COUNTER_ID_CF = "NodeIdInfo";
     public static final String HINTS_CF = "hints";
+    public static final String RANGE_XFERS_CF = "range_xfers";
     public static final String BATCHLOG_CF = "batchlog";
     // see layout description in the DefsTable class header
     public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
new file mode 100644
index 0000000..12e7fd4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java
@@ -0,0 +1,122 @@
+package org.apache.cassandra.service;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Date;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+
+public class ScheduledRangeTransferExecutorService
+{
+    private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class);
+    private static final int INTERVAL = 10;
+    private ScheduledExecutorService scheduler;
+
+    public void setup()
+    {
+        if (DatabaseDescriptor.getNumTokens() == 1)
+        {
+            LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled");
+            return;
+        }
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory());
+        scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS);
+        LOG.info("Enabling scheduled transfers of token ranges");
+    }
+
+    public void tearDown()
+    {
+        if (scheduler == null)
+        {
+            LOG.warn("Unabled to shutdown; Scheduler never enabled");
+            return;
+        }
+ 
+        LOG.info("Shutting down range transfer scheduler");
+        scheduler.shutdownNow();
+    }
+}
+
+class RangeTransfer implements Runnable
+{
+    private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class);
+
+    public void run()
+    {
+        UntypedResultSet res = processInternal("SELECT * FROM system." + SystemTable.RANGE_XFERS_CF
+ " LIMIT 1");
+
+        if (res.size() < 1)
+        {
+            LOG.debug("No queued ranges to transfer");
+            return;
+        }
+
+        if (!isReady())
+            return;
+
+        UntypedResultSet.Row row = res.iterator().next();
+
+        Date requestedAt = row.getTimestamp("requested_at");
+        ByteBuffer tokenBytes = row.getBytes("token_bytes");
+        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes);
+
+        LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString());
+        try
+        {
+            StorageService.instance.relocateTokens(Collections.singleton(token));
+        }
+        catch (Exception e)
+        {
+            LOG.error("Error removing {}: {}", token, e);
+        }
+        finally
+        {
+            LOG.debug("Removing queued entry for transfer of {}", token);
+            processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = '%s'",
+                                          SystemTable.RANGE_XFERS_CF,
+                                          ByteBufferUtil.bytesToHex(tokenBytes)));
+        }
+    }
+
+    private boolean isReady()
+    {
+        int targetTokens = DatabaseDescriptor.getNumTokens();
+        int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10));
+        int actualTokens = StorageService.instance.getTokens().size();
+
+        if (actualTokens >= highMark)
+        {
+            LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens,
actualTokens);
+            return false;
+        }
+
+        return true;
+    }
+}
+
+class RangeTransferThreadFactory implements ThreadFactory
+{
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public Thread newThread(Runnable r)
+    {
+        Thread rangeXferThread = new Thread(r);
+        rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement()));
+        return rangeXferThread;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 241015f..a747e1e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -173,6 +173,8 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
 
     private static final AtomicInteger nextRepairCommand = new AtomicInteger();
 
+    private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService();
+
     public void finishBootstrapping()
     {
         isBootstrapMode = false;
@@ -2752,7 +2754,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         relocateTokens(tokens);
     }
 
-    private void relocateTokens(Collection<Token> srcTokens)
+    void relocateTokens(Collection<Token> srcTokens)
     {
         assert srcTokens != null;
         InetAddress localAddress = FBUtilities.getBroadcastAddress();
@@ -3505,4 +3507,14 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
     {
         return tracingProbability;
     }
+
+    public void enableScheduledRangeXfers()
+    {
+        rangeXferExecutor.setup();
+    }
+
+    public void disableScheduledRangeXfers()
+    {
+        rangeXferExecutor.tearDown();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 375dacd..ded5298 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -457,4 +457,9 @@ public interface StorageServiceMBean
      * Returns the configured tracing probability.
      */
     public double getTracingProbability();
+
+    /** Begin processing of queued range transfers. */
+    public void enableScheduledRangeXfers();
+    /** Disable processing of queued range transfers. */
+    public void disableScheduledRangeXfers();
 }


Mime
View raw message