cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: parallelize row cache loading patch by jbellis; reviewed by vijay for CASSANDRA-4282
Date Mon, 27 Aug 2012 22:19:36 GMT
Updated Branches:
  refs/heads/trunk e1ee63602 -> 08b309191


parallelize row cache loading
patch by jbellis; reviewed by vijay for CASSANDRA-4282


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08b30919
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08b30919
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08b30919

Branch: refs/heads/trunk
Commit: 08b3091914e99cc3c4ebded66184bb0882e5dd8a
Parents: e1ee636
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Aug 21 20:33:29 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Aug 27 17:19:08 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |   15 +++++++--
 .../cassandra/config/DatabaseDescriptor.java       |    2 +-
 .../org/apache/cassandra/service/CacheService.java |   25 ++++++++++----
 4 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2a10d98..ffd9485 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * parallelize row cache loading (CASSANDRA-4282)
  * Make compaction, flush JBOD-aware (CASSANDRA-4292)
  * run local range scans on the read stage (CASSANDRA-3687)
  * clean up ioexceptions (CASSANDRA-2116)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7ce2beb..2e86672 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -100,6 +100,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     {
         int count = 0;
         long start = System.currentTimeMillis();
+
+        // old cache format that only saves keys
         File path = getCachePath(cfs.table.name, cfs.columnFamily, null);
         if (path.exists())
         {
@@ -127,6 +129,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
         }
 
+        // modern format, allows both key and value (so key cache load can be purely sequential)
         path = getCachePath(cfs.table.name, cfs.columnFamily, CURRENT_VERSION);
         if (path.exists())
         {
@@ -135,14 +138,20 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             {
                 logger.info(String.format("reading saved cache %s", path));
                 in = new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+                List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K,
V>>>();
                 while (in.available() > 0)
                 {
-                    Pair<K, V> entry = cacheLoader.deserialize(in, cfs);
+                    futures.add(cacheLoader.deserialize(in, cfs));
+                    count++;
+                }
+
+                for (Future<Pair<K, V>> future : futures)
+                {
+                    Pair<K, V> entry = future.get();
                     // Key cache entry can return null, if the SSTable doesn't exist.
                     if (entry == null)
                         continue;
                     put(entry.left, entry.right);
-                    count++;
                 }
             }
             catch (Exception e)
@@ -314,7 +323,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     {
         void serialize(K key, DataOutput out) throws IOException;
 
-        Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException;
+        Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore
cfs) throws IOException;
 
         @Deprecated
         void load(Set<ByteBuffer> buffer, ColumnFamilyStore cfs);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1c389b4..0cb1d1b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1044,7 +1044,7 @@ public class DatabaseDescriptor
 
     public static File getSerializedCachePath(String ksName, String cfName, CacheService.CacheType
cacheType, String version)
     {
-        return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName
+ "-" + cacheType + ((version != null) ? "-" + version + ".db" : ""));
+        return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName
+ "-" + cacheType + (version == null ? "" : "-" + version + ".db"));
     }
 
     public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 8446b8d..25a38ef 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -26,14 +26,19 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.util.concurrent.Futures;
+
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -320,12 +325,18 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.key, out);
         }
 
-        public Pair<RowCacheKey, IRowCacheEntry> deserialize(DataInputStream in, ColumnFamilyStore
cfs) throws IOException
+        public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream
in, final ColumnFamilyStore cfs) throws IOException
         {
-            ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
-            DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-            ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key,
new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true);
-            return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(cfs.metadata.cfId,
key), data);
+            final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey,
IRowCacheEntry>>()
+            {
+                public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
+                {
+                    DecoratedKey key = cfs.partitioner.decorateKey(buffer);
+                    ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key,
new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true);
+                    return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(cfs.metadata.cfId,
key), data);
+                }
+            });
         }
 
         public void load(Set<ByteBuffer> buffers, ColumnFamilyStore cfs)
@@ -355,7 +366,7 @@ public class CacheService implements CacheServiceMBean
             RowIndexEntry.serializer.serialize(entry, out);
         }
 
-        public Pair<KeyCacheKey, RowIndexEntry> deserialize(DataInputStream input,
ColumnFamilyStore cfs) throws IOException
+        public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream
input, ColumnFamilyStore cfs) throws IOException
         {
             ByteBuffer key = ByteBufferUtil.readWithLength(input);
             int generation = input.readInt();
@@ -370,7 +381,7 @@ public class CacheService implements CacheServiceMBean
                 entry = RowIndexEntry.serializer.deserialize(input, reader.descriptor.version);
             else
                 entry = reader.getPosition(reader.partitioner.decorateKey(key), Operator.EQ);
-            return new Pair<KeyCacheKey, RowIndexEntry>(new KeyCacheKey(reader.descriptor,
key), entry);
+            return Futures.immediateFuture(Pair.create(new KeyCacheKey(reader.descriptor,
key), entry));
         }
 
         private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)


Mime
View raw message