cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r1036898 - in /cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Table.java
Date Fri, 19 Nov 2010 15:28:51 GMT
Author: gdusbabek
Date: Fri Nov 19 15:28:51 2010
New Revision: 1036898

URL: http://svn.apache.org/viewvc?rev=1036898&view=rev
Log:
make addIndex asynchronous and race proof. patch by jbellis, reviewed by gdusbabek. CASSANDRA-1715

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036898&r1=1036897&r2=1036898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Nov 19 15:28:51
2010
@@ -113,7 +113,7 @@ public class ColumnFamilyStore implement
     /* active memtable associated with this ColumnFamilyStore. */
     private Memtable memtable;
 
-    private final SortedMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
+    private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
 
     // TODO binarymemtable ops are not threadsafe (do they need to be?)
     private AtomicReference<BinaryMemtable> binaryMemtable;
@@ -174,37 +174,25 @@ public class ColumnFamilyStore implement
         ssTables.updateCacheSizes();
         
         // figure out what needs to be added and dropped.
-        final Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
-        final Set<ColumnDefinition> indexesToAdd = new HashSet<ColumnDefinition>();
-        
-        for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
-            if (!indexedColumns.containsKey(cdef.name))
-                indexesToAdd.add(cdef);
-        for (ByteBuffer indexName : indexedColumns.keySet())
-            if (!metadata.getColumn_metadata().containsKey(indexName))
-                indexesToDrop.add(indexName);
         // future: if/when we have modifiable settings for secondary indexes, they'll need
to be handled here.
-        
-        final Runnable indexMaintenance = new Runnable() 
+        for (ByteBuffer indexName : indexedColumns.keySet())
         {
-            public void run() 
+            if (!metadata.getColumn_metadata().containsKey(indexName))
             {
-                // drop indexes no longer needed.
-                for (ByteBuffer indexName : indexesToDrop)
+                ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
+                if (indexCfs == null)
                 {
-                    ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
-                    assert indexCfs != null;
-                    SystemTable.setIndexRemoved(metadata.tableName, metadata.cfName);
-                    indexCfs.removeAllSSTables();
-                }
-                // add new indexes.
-                for (ColumnDefinition info : indexesToAdd)
-                    if (info.getIndexType() != null)
-                        addIndex(info);        
+                    logger.debug("index {} already removed; ignoring", FBUtilities.bytesToHex(indexName));
+                    continue;
+                }
+                SystemTable.setIndexRemoved(metadata.tableName, metadata.cfName);
+                indexCfs.removeAllSSTables();
             }
-        };
-        // reset the memtable with new settings.
-        maybeSwitchMemtable(memtable, true, indexMaintenance);
+        }
+
+        for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
+            if (!indexedColumns.containsKey(cdef.name) && cdef.getIndexType() !=
null)
+                addIndex(cdef);
     }
 
     private ColumnFamilyStore(Table table, String columnFamilyName, IPartitioner partitioner,
int generation, CFMetaData metadata)
@@ -310,6 +298,8 @@ public class ColumnFamilyStore implement
     public void addIndex(final ColumnDefinition info)
     {
         assert info.getIndexType() != null;
+
+        // create the index CFS
         IPartitioner rowPartitioner = StorageService.getPartitioner();
         AbstractType columnComparator = (rowPartitioner instanceof OrderPreservingPartitioner
|| rowPartitioner instanceof ByteOrderedPartitioner)
                                         ? BytesType.instance
@@ -319,28 +309,42 @@ public class ColumnFamilyStore implement
                                                                                  indexedCfMetadata.cfName,
                                                                                  new LocalPartitioner(metadata.getColumn_metadata().get(info.name).validator),
                                                                                  indexedCfMetadata);
-        // record that the column is supposed to be indexed, before we start building it
-        // (so we don't omit indexing writes that happen during build process)
-        indexedColumns.put(info.name, indexedCfs);
-        if (!SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+
+        // link in indexedColumns.  this means that writes will add new data to the index
immediately,
+        // so we don't have to lock everything while we do the build.  it's up to the operator
to wait
+        // until the index is actually built before using in queries.
+        if (indexedColumns.putIfAbsent(info.name, indexedCfs) != null)
+            return;
+
+        // if we're just linking in the index to indexedColumns on an already-built index
post-restart, we're done
+        if (SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+            return;
+
+        // build it asynchronously; addIndex gets called by CFS open and schema update, neither
of which
+        // we want to block for a long period.  (actual build is serialized on CompactionManager.)
+        Runnable runnable = new Runnable()
         {
-            logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName);
-            try
-            {
-                forceBlockingFlush();
-            }
-            catch (ExecutionException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (InterruptedException e)
+            public void run()
             {
-                throw new AssertionError(e);
+                logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName);
+                try
+                {
+                    forceBlockingFlush();
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+                buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
+                logger.info("Index {} complete", indexedCfMetadata.cfName);
+                SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
             }
-            buildSecondaryIndexes(getSSTables(), FBUtilities.singleton(info.name));
-            logger.info("Index {} complete", indexedCfMetadata.cfName);
-            SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
-        }
+        };
+        new Thread(runnable, "Create index " + indexedCfMetadata.cfName).start();
     }
 
     public void buildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<ByteBuffer>
columns)
@@ -608,7 +612,7 @@ public class ColumnFamilyStore implement
     }
 
     /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen
already.  threadsafe. */
-    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog,
final Runnable postFlush)
+    Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
     {
         /*
          * If we can get the writelock, that means no new updates can come in and
@@ -663,8 +667,6 @@ public class ColumnFamilyStore implement
                         // the log header with "you can discard anything written before the
context" is not valid
                         CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
                     }
-                    if (postFlush != null)
-                        postFlush.run();
                 }
             });
         }
@@ -696,7 +698,7 @@ public class ColumnFamilyStore implement
         if (memtable.isClean())
             return null;
 
-        return maybeSwitchMemtable(memtable, true, null);
+        return maybeSwitchMemtable(memtable, true);
     }
 
     public void forceBlockingFlush() throws ExecutionException, InterruptedException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036898&r1=1036897&r2=1036898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19 15:28:51 2010
@@ -448,7 +448,7 @@ public class Table
         // flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires
writeLock).
         // usually mTF will be empty and this will be a no-op.
         for (Memtable memtable : memtablesToFlush)
-            memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog, null);
+            memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog);
     }
 
     private static List<Memtable> addFullMemtable(List<Memtable> memtablesToFlush,
Memtable fullMemtable)
@@ -594,7 +594,7 @@ public class Table
 
                 // during index build, we do flush index memtables separately from master;
otherwise we could OOM
                 for (Memtable memtable : memtablesToFlush)
-                    memtable.cfs.maybeSwitchMemtable(memtable, false, null);
+                    memtable.cfs.maybeSwitchMemtable(memtable, false);
             }
 
             try



Mime
View raw message