cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/3] git commit: drain should flush system CFs too patch by jbellis; reviewed by yukim for CASSANDRA-4446
Date Mon, 14 Jan 2013 21:35:07 GMT
drain should flush system CFs too
patch by jbellis; reviewed by yukim for CASSANDRA-4446


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fac06a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fac06a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fac06a8

Branch: refs/heads/cassandra-1.2
Commit: 1fac06a848013863f47067f8ff7f769a2a08e276
Parents: ec21288
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Jan 14 15:34:52 2013 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Jan 14 15:34:52 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/Table.java        |   26 ++++++++----
 .../apache/cassandra/service/StorageService.java   |   31 +++++++++++---
 3 files changed, 43 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64cc60c..2e8d2c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.1
+ * drain should flush system CFs too (CASSANDRA-4446)
  * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)
  * re-allow wrapping ranges for start_token/end_token range pairing (CASSANDRA-5106)
  * fix validation compaction of empty rows (CASSADRA-5136)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index bb5e6ee..d923081 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -75,6 +75,13 @@ public class Table
     private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID,
ColumnFamilyStore>();
     private final Object[] indexLocks;
     private volatile AbstractReplicationStrategy replicationStrategy;
+    public static final Function<String,Table> tableTransformer = new Function<String,
Table>()
+    {
+        public Table apply(String tableName)
+        {
+            return Table.open(tableName);
+        }
+    };
 
     public static Table open(String table)
     {
@@ -456,14 +463,17 @@ public class Table
 
     public static Iterable<Table> all()
     {
-        Function<String, Table> transformer = new Function<String, Table>()
-        {
-            public Table apply(String tableName)
-            {
-                return Table.open(tableName);
-            }
-        };
-        return Iterables.transform(Schema.instance.getTables(), transformer);
+        return Iterables.transform(Schema.instance.getTables(), tableTransformer);
+    }
+
+    public static Iterable<Table> nonSystem()
+    {
+        return Iterables.transform(Schema.instance.getNonSystemTables(), tableTransformer);
+    }
+
+    public static Iterable<Table> system()
+    {
+        return Iterables.transform(Schema.systemKeyspaceNames, tableTransformer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 15339c4..efa7487 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3193,18 +3193,35 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         StorageProxy.instance.verifyNoHintsInProgress();
 
         setMode(Mode.DRAINING, "flushing column families", false);
-        List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
-        for (String tableName : Schema.instance.getNonSystemTables())
+        // count CFs first, since forceFlush could block for the flushWriter to get a queue
slot empty
+        totalCFs = 0;
+        for (Table table : Table.nonSystem())
+            totalCFs += table.getColumnFamilyStores().size();
+        remainingCFs = totalCFs;
+        // flush
+        List<Future<?>> flushes = new ArrayList<Future<?>>();
+        for (Table table : Table.nonSystem())
         {
-            Table table = Table.open(tableName);
-            cfses.addAll(table.getColumnFamilyStores());
+            for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
+                flushes.add(cfs.forceFlush());
         }
-        totalCFs = remainingCFs = cfses.size();
-        for (ColumnFamilyStore cfs : cfses)
+        // wait for the flushes.
+        // TODO this is a godawful way to track progress, since they flush in parallel. 
a long one could
+        // thus make several short ones "instant" if we wait for them later.
+        for (Future f : flushes)
         {
-            cfs.forceBlockingFlush();
+            FBUtilities.waitOnFuture(f);
             remainingCFs--;
         }
+        // flush the system ones after all the rest are done, just in case flushing modifies
any system state
+        // like CASSANDRA-5151. don't bother with progress tracking since system data is
tiny.
+        flushes.clear();
+        for (Table table : Table.system())
+        {
+            for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
+                flushes.add(cfs.forceFlush());
+        }
+        FBUtilities.waitOnFutures(flushes);
 
         ColumnFamilyStore.postFlushExecutor.shutdown();
         ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);


Mime
View raw message