cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1080310 - in /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra: db/ColumnFamilyStore.java service/StorageService.java
Date Thu, 10 Mar 2011 18:38:03 GMT
Author: jbellis
Date: Thu Mar 10 18:38:03 2011
New Revision: 1080310

URL: http://svn.apache.org/viewvc?rev=1080310&view=rev
Log:
include secondary index CFs when deciding which CFs to flush under memory pressure
patch by jbellis; tested by Matt Conway for CASSANDRA-2295

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080310&r1=1080309&r2=1080310&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Mar 10 18:38:03 2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -334,7 +335,6 @@ public class ColumnFamilyStore implement
         {
             public void run()
             {
-                logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName);
                 try
                 {
                     forceBlockingFlush();
@@ -348,7 +348,6 @@ public class ColumnFamilyStore implement
                     throw new AssertionError(e);
                 }
                 buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
-                logger.info("Index {} complete", indexedCfMetadata.cfName);
                 SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
             }
         };
@@ -357,7 +356,8 @@ public class ColumnFamilyStore implement
 
     public void buildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer>
columns)
     {
-        logger.debug("Submitting index build to compactionmanager");
+        logger.info(String.format("Submitting index build of %s for data in %s",
+                                  metadata.comparator.getString(columns), StringUtils.join(sstables,
", ")));
         Table.IndexBuilder builder = table.createIndexBuilder(this, columns, new ReducingKeyIterator(sstables));
         Future future = CompactionManager.instance.submitIndexBuild(this, builder);
         try
@@ -374,6 +374,7 @@ public class ColumnFamilyStore implement
         {
             throw new RuntimeException(e);
         }
+        logger.info("Index build of " + metadata.comparator.getString(columns) + " complete");
     }
 
     // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates
CFS to other operations.
@@ -684,26 +685,31 @@ public class ColumnFamilyStore implement
         try
         {
             if (oldMemtable.isFrozen())
+            {
+                logger.debug("memtable is already frozen; another thread must be flushing
it");
                 return null;
+            }
 
             boolean isDropped = isIndex()
                               ? DatabaseDescriptor.getCFMetaData(table.name, getParentColumnfamily())
== null
                               : DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
             if (isDropped)
-                return null; // column family was dropped. no point in flushing.
+            {
+                logger.debug("column family was dropped; no point in flushing");
+                return null;
+            }
 
             assert memtable == oldMemtable;
             memtable.freeze();
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext()
: null;
-            logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx);
 
             // submit the memtable for any indexed sub-cfses, and our own.
             List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>(indexedColumns.size());
-            icc.add(this);
-            for (ColumnFamilyStore indexCfs : indexedColumns.values())
+            // don't assume that this.memtable is dirty; forceFlush can bring us here during
index build even if it is not
+            for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), indexedColumns.values()))
             {
-                if (!indexCfs.memtable.isClean())
-                    icc.add(indexCfs);
+                if (!cfs.memtable.isClean())
+                    icc.add(cfs);
             }
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
@@ -711,6 +717,10 @@ public class ColumnFamilyStore implement
                 submitFlush(cfs.memtable, latch);
                 cfs.memtable = new Memtable(cfs);
             }
+            // we marked our memtable as frozen as part of the concurrency control,
+            // so even if there was nothing to flush we need to switch it out
+            if (!icc.contains(this))
+                memtable = new Memtable(this);
 
             // when all the memtables have been written, including for indexes, mark the
flush in the commitlog header.
             // a second executor makes sure the onMemtableFlushes get called in the right
order,
@@ -754,8 +764,17 @@ public class ColumnFamilyStore implement
 
     public Future<?> forceFlush()
     {
-        if (memtable.isClean())
+        // during index build, 2ary index memtables can be dirty even if parent is not. 
if so,
+        // we want flushLargestMemtables to flush the 2ary index ones too.
+        boolean clean = true;
+        for (ColumnFamilyStore cfs : Iterables.concat(Collections.singleton(this), getIndexColumnFamilyStores()))
+            clean &= cfs.memtable.isClean();
+
+        if (clean)
+        {
+            logger.debug("forceFlush requested but everything is clean");
             return null;
+        }
 
         return maybeSwitchMemtable(memtable, true);
     }
@@ -1937,6 +1956,11 @@ public class ColumnFamilyStore implement
         return indexedColumns.get(column);
     }
 
+    public Collection<ColumnFamilyStore> getIndexColumnFamilyStores()
+    {
+        return indexedColumns.values();
+    }
+
     public ColumnFamily newIndexedColumnFamily(ByteBuffer column)
     {
         return ColumnFamily.create(indexedColumns.get(column).metadata);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1080310&r1=1080309&r2=1080310&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Thu Mar 10 18:38:03 2011
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -2195,14 +2196,28 @@ public class StorageService implements I
         ColumnFamilyStore largestByThroughput = null;
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            if (largestByOps == null || cfs.getMemtableColumnsCount() > largestByOps.getMemtableColumnsCount())
+            long ops = 0;
+            long throughput = 0;
+            for (ColumnFamilyStore subordinate : Iterables.concat(Collections.singleton(cfs),
cfs.getIndexColumnFamilyStores()))
+            {
+                ops += subordinate.getMemtableColumnsCount();
+                throughput = subordinate.getMemtableThroughputInMB();
+            }
+
+            if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount()))
+            {
+                logger_.debug(ops + " total ops in " + cfs);
                 largestByOps = cfs;
-            if (largestByThroughput == null || cfs.getMemtableThroughputInMB() > largestByThroughput.getMemtableThroughputInMB())
+            }
+            if (throughput > 0 && (largestByThroughput == null || throughput >
largestByThroughput.getMemtableThroughputInMB()))
+            {
+                logger_.debug(throughput + " total throughput in " + cfs);
                 largestByThroughput = cfs;
+            }
         }
         if (largestByOps == null)
         {
-            logger_.error("Unable to reduce heap usage since there are no column families
defined");
+            logger_.info("Unable to reduce heap usage since there are no dirty column families");
             return;
         }
 



Mime
View raw message