Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 88DF4D08D for ; Fri, 14 Sep 2012 15:16:19 +0000 (UTC) Received: (qmail 23768 invoked by uid 500); 14 Sep 2012 15:16:19 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 23714 invoked by uid 500); 14 Sep 2012 15:16:19 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 23656 invoked by uid 99); 14 Sep 2012 15:16:19 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Sep 2012 15:16:19 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B71E1369A1; Fri, 14 Sep 2012 15:16:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eevans@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [1/5] git commit: p/4443/050_process_queued_xfers Message-Id: <20120914151618.B71E1369A1@tyr.zones.apache.org> Date: Fri, 14 Sep 2012 15:16:18 +0000 (UTC) Updated Branches: refs/heads/trunk a26eb3ef6 -> 398b1d263 p/4443/050_process_queued_xfers Read queued token ranges transfers from a column family, and initiate relocations. Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4559 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/398b1d26 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/398b1d26 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/398b1d26 Branch: refs/heads/trunk Commit: 398b1d263379a5b3ae080b3d985e00b032ca1ddf Parents: d09b47c Author: Eric Evans Authored: Fri Sep 14 10:09:14 2012 -0500 Committer: Eric Evans Committed: Fri Sep 14 10:18:08 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 5 + .../org/apache/cassandra/config/KSMetaData.java | 1 + .../apache/cassandra/cql3/UntypedResultSet.java | 6 + src/java/org/apache/cassandra/db/SystemTable.java | 1 + .../ScheduledRangeTransferExecutorService.java | 122 +++++++++++++++ .../apache/cassandra/service/StorageService.java | 14 ++- .../cassandra/service/StorageServiceMBean.java | 5 + 7 files changed, 153 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index adbd853..bd194d1 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -200,6 +200,11 @@ public final class CFMetaData + "data blob" + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0"); + public static final CFMetaData RangeXfersCf = compile(17, "CREATE TABLE " + SystemTable.RANGE_XFERS_CF + " (" + + "token_bytes blob PRIMARY KEY," + + "requested_at timestamp" + + ") WITH COMMENT='ranges requested for transfer here'"); + public enum Caching { ALL, KEYS_ONLY, ROWS_ONLY, NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index 050e32f..c6df2f5 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -79,6 +79,7 @@ public final class KSMetaData public static KSMetaData systemKeyspace() { List cfDefs = Arrays.asList(CFMetaData.BatchlogCF, + CFMetaData.RangeXfersCf, CFMetaData.LocalCf, CFMetaData.PeersCf, CFMetaData.HintsCf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/cql3/UntypedResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java index b8cb8d4..203e4c1 100644 --- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java +++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cql3; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -124,6 +125,11 @@ public class UntypedResultSet implements Iterable return UUIDType.instance.compose(data.get(column)); } + public Date getTimestamp(String column) + { + return DateType.instance.compose(data.get(column)); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index a41dafb..7abccee 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -58,6 +58,7 @@ public class SystemTable public static final String INDEX_CF = "IndexInfo"; public static final String COUNTER_ID_CF = "NodeIdInfo"; public static final String HINTS_CF = "hints"; + public static final String RANGE_XFERS_CF = "range_xfers"; public static final String BATCHLOG_CF = "batchlog"; // see layout description in the DefsTable class header public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java new file mode 100644 index 0000000..12e7fd4 --- /dev/null +++ b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java @@ -0,0 +1,122 @@ +package org.apache.cassandra.service; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.cassandra.cql3.QueryProcessor.processInternal; + +public class ScheduledRangeTransferExecutorService +{ + private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class); + private static final int INTERVAL = 10; + private ScheduledExecutorService scheduler; + + public void setup() + { + if (DatabaseDescriptor.getNumTokens() == 1) + { + LOG.warn("Cannot start range transfer scheduler: endpoint is not virtual nodes-enabled"); + return; + } + + scheduler = Executors.newSingleThreadScheduledExecutor(new RangeTransferThreadFactory()); + scheduler.scheduleWithFixedDelay(new RangeTransfer(), 0, INTERVAL, TimeUnit.SECONDS); + LOG.info("Enabling scheduled transfers of token ranges"); + } + + public void tearDown() + { + if (scheduler == null) + { + LOG.warn("Unabled to shutdown; Scheduler never enabled"); + return; + } + + LOG.info("Shutting down range transfer scheduler"); + scheduler.shutdownNow(); + } +} + +class RangeTransfer implements Runnable +{ + private static final Logger LOG = LoggerFactory.getLogger(RangeTransfer.class); + + public void run() + { + UntypedResultSet res = processInternal("SELECT * FROM system." + SystemTable.RANGE_XFERS_CF + " LIMIT 1"); + + if (res.size() < 1) + { + LOG.debug("No queued ranges to transfer"); + return; + } + + if (!isReady()) + return; + + UntypedResultSet.Row row = res.iterator().next(); + + Date requestedAt = row.getTimestamp("requested_at"); + ByteBuffer tokenBytes = row.getBytes("token_bytes"); + Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(tokenBytes); + + LOG.info("Initiating transfer of {} (scheduled at {})", token, requestedAt.toString()); + try + { + StorageService.instance.relocateTokens(Collections.singleton(token)); + } + catch (Exception e) + { + LOG.error("Error removing {}: {}", token, e); + } + finally + { + LOG.debug("Removing queued entry for transfer of {}", token); + processInternal(String.format("DELETE FROM system.%s WHERE token_bytes = '%s'", + SystemTable.RANGE_XFERS_CF, + ByteBufferUtil.bytesToHex(tokenBytes))); + } + } + + private boolean isReady() + { + int targetTokens = DatabaseDescriptor.getNumTokens(); + int highMark = (int)Math.ceil(targetTokens + (targetTokens * .10)); + int actualTokens = StorageService.instance.getTokens().size(); + + if (actualTokens >= highMark) + { + LOG.warn("Pausing until token count stabilizes (target={}, actual={})", targetTokens, actualTokens); + return false; + } + + return true; + } +} + +class RangeTransferThreadFactory implements ThreadFactory +{ + private AtomicInteger count = new AtomicInteger(0); + + public Thread newThread(Runnable r) + { + Thread rangeXferThread = new Thread(r); + rangeXferThread.setName(String.format("ScheduledRangeXfers:%d", count.getAndIncrement())); + return rangeXferThread; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 241015f..a747e1e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -173,6 +173,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private static final AtomicInteger nextRepairCommand = new AtomicInteger(); + private static ScheduledRangeTransferExecutorService rangeXferExecutor = new ScheduledRangeTransferExecutorService(); + public void finishBootstrapping() { isBootstrapMode = false; @@ -2752,7 +2754,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe relocateTokens(tokens); } - private void relocateTokens(Collection srcTokens) + void relocateTokens(Collection srcTokens) { assert srcTokens != null; InetAddress localAddress = FBUtilities.getBroadcastAddress(); @@ -3505,4 +3507,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { return tracingProbability; } + + public void enableScheduledRangeXfers() + { + rangeXferExecutor.setup(); + } + + public void disableScheduledRangeXfers() + { + rangeXferExecutor.tearDown(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/398b1d26/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 375dacd..ded5298 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -457,4 +457,9 @@ public interface StorageServiceMBean * Returns the configured tracing probability. */ public double getTracingProbability(); + + /** Begin processing of queued range transfers. */ + public void enableScheduledRangeXfers(); + /** Disable processing of queued range transfers. */ + public void disableScheduledRangeXfers(); }