Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EAAFCD7C1 for ; Mon, 27 Aug 2012 22:19:36 +0000 (UTC) Received: (qmail 98063 invoked by uid 500); 27 Aug 2012 22:19:36 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 98041 invoked by uid 500); 27 Aug 2012 22:19:36 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 98032 invoked by uid 99); 27 Aug 2012 22:19:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Aug 2012 22:19:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 74396214A3; Mon, 27 Aug 2012 22:19:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: parallelize row cache loading patch by jbellis; reviewed by vijay for CASSANDRA-4282 Message-Id: <20120827221936.74396214A3@tyr.zones.apache.org> Date: Mon, 27 Aug 2012 22:19:36 +0000 (UTC) Updated Branches: refs/heads/trunk e1ee63602 -> 08b309191 parallelize row cache loading patch by jbellis; reviewed by vijay for CASSANDRA-4282 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08b30919 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08b30919 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08b30919 Branch: refs/heads/trunk Commit: 08b3091914e99cc3c4ebded66184bb0882e5dd8a Parents: e1ee636 Author: Jonathan Ellis Authored: Tue Aug 21 20:33:29 2012 -0500 Committer: Jonathan Ellis Committed: Mon Aug 27 17:19:08 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 15 +++++++-- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../org/apache/cassandra/service/CacheService.java | 25 ++++++++++---- 4 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2a10d98..ffd9485 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * parallelize row cache loading (CASSANDRA-4282) * Make compaction, flush JBOD-aware (CASSANDRA-4292) * run local range scans on the read stage (CASSANDRA-3687) * clean up ioexceptions (CASSANDRA-2116) http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 7ce2beb..2e86672 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -100,6 +100,8 @@ public class AutoSavingCache extends InstrumentingCache extends InstrumentingCache extends InstrumentingCache>> futures = new ArrayList>>(); while (in.available() > 0) { - Pair entry = cacheLoader.deserialize(in, cfs); + futures.add(cacheLoader.deserialize(in, cfs)); + count++; + } + + for (Future> future : futures) + { + Pair entry = future.get(); // Key cache entry can return null, if the SSTable doesn't exist. if (entry == null) continue; put(entry.left, entry.right); - count++; } } catch (Exception e) @@ -314,7 +323,7 @@ public class AutoSavingCache extends InstrumentingCache deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; + Future> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; @Deprecated void load(Set buffer, ColumnFamilyStore cfs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1c389b4..0cb1d1b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1044,7 +1044,7 @@ public class DatabaseDescriptor public static File getSerializedCachePath(String ksName, String cfName, CacheService.CacheType cacheType, String version) { - return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType + ((version != null) ? "-" + version + ".db" : "")); + return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType + (version == null ? "" : "-" + version + ".db")); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 8446b8d..25a38ef 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -26,14 +26,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.util.concurrent.Futures; + import org.apache.cassandra.cache.*; import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; @@ -320,12 +325,18 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.key, out); } - public Pair deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException + public Future> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { - ByteBuffer buffer = ByteBufferUtil.readWithLength(in); - DecoratedKey key = cfs.partitioner.decorateKey(buffer); - ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true); - return new Pair(new RowCacheKey(cfs.metadata.cfId, key), data); + final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + return StageManager.getStage(Stage.READ).submit(new Callable>() + { + public Pair call() throws Exception + { + DecoratedKey key = cfs.partitioner.decorateKey(buffer); + ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true); + return new Pair(new RowCacheKey(cfs.metadata.cfId, key), data); + } + }); } public void load(Set buffers, ColumnFamilyStore cfs) @@ -355,7 +366,7 @@ public class CacheService implements CacheServiceMBean RowIndexEntry.serializer.serialize(entry, out); } - public Pair deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException + public Future> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException { ByteBuffer key = ByteBufferUtil.readWithLength(input); int generation = input.readInt(); @@ -370,7 +381,7 @@ public class CacheService implements CacheServiceMBean entry = RowIndexEntry.serializer.deserialize(input, reader.descriptor.version); else entry = reader.getPosition(reader.partitioner.decorateKey(key), Operator.EQ); - return new Pair(new KeyCacheKey(reader.descriptor, key), entry); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(reader.descriptor, key), entry)); } private SSTableReader findDesc(int generation, Collection collection)