cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r921961 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ test/unit/org/apache/cassandra/db/
Date Thu, 11 Mar 2010 17:53:10 GMT
Author: gdusbabek
Date: Thu Mar 11 17:53:09 2010
New Revision: 921961

URL: http://svn.apache.org/viewvc?rev=921961&view=rev
Log:
drop a cf on a single node. Patch by Gary Dusbabek, reviewed by Jonthan Ellis. CASSANDRA-840

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/cache/AbstractCache.java Thu Mar
11 17:53:09 2010
@@ -32,8 +32,11 @@ public class AbstractCache
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            String mbeanName = "org.apache.cassandra.db:type=Caches,keyspace=" + table +
",cache=" + name;
-            mbs.registerMBean(cache, new ObjectName(mbeanName));
+            ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=Caches,keyspace="
+ table + ",cache=" + name);
+            // unregister any previous, as this may be a replacement.
+            if (mbs.isRegistered(mbeanName))
+                mbs.unregisterMBean(mbeanName);
+            mbs.registerMBean(cache, mbeanName);
         }
         catch (Exception e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu Mar
11 17:53:09 2010
@@ -86,6 +86,13 @@ public final class CFMetaData
         this(tableName, cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, nextId());
         cfIdMap.put(new Pair<String, String>(tableName, cfName), cfId);
     }
+    
+    /** used for evicting cf data out of static tracking collections. */
+    public static void purge(CFMetaData cfm)
+    {
+        cfIdMap.remove(new Pair<String, String>(cfm.tableName, cfm.cfName));
+        currentCfNames.remove(cfm.cfId);
+    }
 
     // a quick and dirty pretty printer for describing the column family...
     public String pretty()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Mar 11 17:53:09 2010
@@ -1082,7 +1082,7 @@ public class DatabaseDescriptor
         assert tableName != null;
         CFMetaData cfmd = getCFMetaData(tableName, cfName);
         if (cfmd == null)
-            throw new NullPointerException("Unknown ColumnFamily " + cfName + " in keyspace
" + tableName);
+            throw new IllegalArgumentException("Unknown ColumnFamily " + cfName + " in keyspace
" + tableName);
         return cfmd.comparator;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Mar 11 17:53:09 2010
@@ -221,8 +221,10 @@ public class ColumnFamilyStore implement
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            String mbeanName = "org.apache.cassandra.db:type=ColumnFamilyStores,keyspace="
+ table + ",columnfamily=" + columnFamily;
-            mbs.registerMBean(cfs, new ObjectName(mbeanName));
+            ObjectName mbeanName = new ObjectName("org.apache.cassandra.db:type=ColumnFamilyStores,keyspace="
+ table + ",columnfamily=" + columnFamily);
+            if (mbs.isRegistered(mbeanName))
+                mbs.unregisterMBean(mbeanName);
+            mbs.registerMBean(cfs, mbeanName);
         }
         catch (Exception e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Thu
Mar 11 17:53:09 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.io.FileFilter;
 import java.io.IOException;
 import java.io.File;
 import java.lang.management.ManagementFactory;
@@ -27,6 +28,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import javax.management.*;
 
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
@@ -73,6 +76,50 @@ public class CompactionManager implement
     private CompactionExecutor executor = new CompactionExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore,
Integer>();
 
+    /** cleans up data files for CFs that have been dropped. */
+    public Future submitGraveyardCleanup()
+    {
+        Callable c =  new Callable()
+        {
+            public Object call() throws Exception
+            {
+                logger.debug("Cleaning up abandoned column families...");
+                ColumnFamily dropped = SystemTable.getDroppedCFs();
+                Collection<IColumn> successes = new ArrayList<IColumn>();
+                for (IColumn col : dropped.getSortedColumns())
+                {
+                    if (!col.isMarkedForDelete())
+                    {
+                        final String[] parts = new String(col.name()).split("-");
+                        // table-cfname-cfid
+                        for (String dataDir : DatabaseDescriptor.getAllDataFileLocationsForTable(parts[0]))
+                        {
+                            File dir = new File(dataDir);
+                            if (dir.exists())
+                            {
+                                File[] dbFiles = dir.listFiles(new FileFilter()
+                                {
+                                    public boolean accept(File pathname)
+                                    {
+                                        return pathname.getName().startsWith(parts[1] + "-")
&& pathname.exists();
+                                    }
+                                });
+                                for (File f : dbFiles)
+                                {
+                                    FileUtils.deleteWithConfirm(f);
+                                }
+                            }
+                        }
+                        successes.add(col);
+                    }
+                }
+                SystemTable.deleteDroppedCfMarkers(successes);
+                return null;
+            }
+        };
+        return executor.submit(c);
+    }
+    
     /**
      * Call this whenever a compaction might be needed on the given columnfamily.
      * It's okay to over-call (within reason) since the compactions are single-threaded,

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java Thu Mar 11 17:53:09
2010
@@ -37,6 +37,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 public class DefsTable
 {
@@ -80,6 +81,66 @@ public class DefsTable
         }
     }
 
+    /**
+     * drop a column family. blockOnDeletion was added to make testing simpler.
+     */
+    public static synchronized void drop(CFMetaData cfm, boolean blockOnDeletion) throws
IOException, ConfigurationException
+    {
+        Table.openLock.lock();
+        try
+        {
+            KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cfm.tableName);
+            if (ksm == null)
+                throw new ConfigurationException("Keyspace does not already exist.");
+            else if (!ksm.cfMetaData().containsKey(cfm.cfName))
+                throw new ConfigurationException("CF is not defined in that keyspace.");
+            
+            // clone ksm but do not include the new def
+            List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values());
+            newCfs.remove(cfm);
+            assert newCfs.size() == ksm.cfMetaData().size() - 1;
+            ksm = new KSMetaData(ksm.name, ksm.strategyClass, ksm.replicationFactor, ksm.snitch,
newCfs.toArray(new CFMetaData[newCfs.size()]));
+            
+            // store it.
+            UUID newVersion = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+            RowMutation rm = new RowMutation(Table.DEFINITIONS, newVersion.toString());
+            rm.add(new QueryPath(SCHEMA_CF, null, ksm.name.getBytes()), KSMetaData.serialize(ksm),
System.currentTimeMillis());
+            rm.apply();
+            
+            // reinitialize the table.
+            CFMetaData.purge(cfm);
+            DatabaseDescriptor.setTableDefinition(ksm, newVersion);
+            Table.reinitialize(ksm.name);
+            
+            // indicate that some files need to be deleted (eventually)
+            SystemTable.markForRemoval(cfm);
+            
+            // we don't really need a new segment, but let's force it to be consistent with
other operations.
+            CommitLog.instance().forceNewSegment();
+        }
+        finally
+        {
+            Table.openLock.unlock();
+        }
+        
+        if (blockOnDeletion)
+        {
+            // notify the compaction manager that it needs to clean up the dropped cf files.
+            try
+            {
+                CompactionManager.instance.submitGraveyardCleanup().get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     public static synchronized void dumpToStorage(UUID version) throws IOException
     {
         String versionKey = version.toString();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Thu Mar 11
17:53:09 2010
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.io.IOError;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.service.StorageService;
@@ -35,6 +38,7 @@ import org.apache.cassandra.db.marshal.B
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 import java.net.InetAddress;
+import java.util.Collection;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -44,6 +48,7 @@ public class SystemTable
     public static final String STATUS_CF = "LocationInfo"; // keep the old CF string for
backwards-compatibility
     private static final String LOCATION_KEY = "L";
     private static final String BOOTSTRAP_KEY = "Bootstrap";
+    private static final String GRAVEYARD_KEY = "Graveyard";
     private static final byte[] BOOTSTRAP = utf8("B");
     private static final byte[] TOKEN = utf8("Token");
     private static final byte[] GENERATION = utf8("Generation");
@@ -204,6 +209,38 @@ public class SystemTable
         }
     }
 
+    public static ColumnFamily getDroppedCFs() throws IOException
+    {
+        ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(SystemTable.STATUS_CF);
+        return cfs.getColumnFamily(new SliceQueryFilter(SystemTable.GRAVEYARD_KEY, new QueryPath(STATUS_CF),
"".getBytes(), "".getBytes(), false, 100));
+    }
+    
+    public static void deleteDroppedCfMarkers(Collection<IColumn> cols) throws IOException
+    {
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY);
+        long now = System.currentTimeMillis();
+        for (IColumn col : cols)
+            rm.delete(new QueryPath(STATUS_CF, null, col.name()), now);
+        rm.apply();
+    }
+    
+    /** when a cf is dropped, it needs to be marked so its files get deleted at some point.
*/
+    public static void markForRemoval(CFMetaData cfm)
+    {
+        ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
+        cf.addColumn(new Column((cfm.tableName + "-" + cfm.cfName + "-" + cfm.cfId).getBytes(),
ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis()));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, GRAVEYARD_KEY);
+        rm.add(cf);
+        try
+        {
+            rm.apply();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static class StorageMetadata
     {
         private Token token;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu
Mar 11 17:53:09 2010
@@ -22,6 +22,7 @@ import com.google.common.collect.HashMul
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
@@ -237,10 +238,14 @@ public class CommitLog
                 {
                     public void runMayThrow() throws IOException
                     {
-                        /* remove column families that have already been flushed before applying
the rest */
+                        KSMetaData ksm = DatabaseDescriptor.getTableDefinition(table.name);
+                        
+                        /* remove a) column families that have already been flushed before
applying the rest, and
+                        *  b) column families that have been dropped. */
                         for (ColumnFamily columnFamily : columnFamilies)
                         {
-                            int id = cfIdMap.get(new Pair<String, String>(table.name,
columnFamily.name()));
+                            Pair<String, String> key = new Pair<String, String>(table.name,
columnFamily.name()); 
+                            int id = cfIdMap.get(key);
                             if (!clHeader.isDirty(id) || entryLocation < clHeader.getPosition(id))
                             {
                                 rm.removeColumnFamily(columnFamily);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=921961&r1=921960&r2=921961&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Thu Mar 11 17:53:09
2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.CFMetaData;
 import static org.apache.cassandra.config.DatabaseDescriptor.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +33,8 @@ import org.apache.cassandra.utils.UUIDGe
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -40,7 +43,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
-public class DefsTest
+public class DefsTest extends CleanupHelper
 {
     @Before
     public void setup()
@@ -95,5 +98,63 @@ public class DefsTest
         assert Arrays.equals("value0".getBytes(), col.value());
     }
 
+    @Test
+    public void removeCf() throws IOException, ConfigurationException, ExecutionException,
InterruptedException
+    {
+        // sanity
+        final KSMetaData ks = DatabaseDescriptor.getTableDefinition("Keyspace1");
+        assert ks != null;
+        final CFMetaData cfm = ks.cfMetaData().get("Standard1");
+        assert cfm != null;
+        
+        // write some data, force a flush, then verify that files exist on disk.
+        RowMutation rm = new RowMutation(ks.name, "key0");
+        for (int i = 0; i < 100; i++)
+            rm.add(new QueryPath(cfm.cfName, null, ("col" + i).getBytes()), "anyvalue".getBytes(),
1L);
+        rm.apply();
+        ColumnFamilyStore store = Table.open(cfm.tableName).getColumnFamilyStore(cfm.cfName);
+        assert store != null;
+        store.forceBlockingFlush();
+        store.getFlushPath();
+        assert getFiles(cfm.tableName, cfm.cfName).size() > 0;
+        
+        DefsTable.drop(cfm, true);
+        
+        assert !DatabaseDescriptor.getTableDefinition(ks.name).cfMetaData().containsKey(cfm.cfName);
+        
+        // any write should fail.
+        rm = new RowMutation(ks.name, "key0");
+        try
+        {
+            rm.add(new QueryPath("Standard1", null, "col0".getBytes()), "value0".getBytes(),
1L);
+            rm.apply();
+            assert false : "This mutation should have failed since the CF no longer exists.";
+        }
+        catch (Throwable th)
+        {
+            assert th instanceof IllegalArgumentException;
+        }
+        
+        // verify that the files are gone.
+        assert getFiles(cfm.tableName, cfm.cfName).size() == 0;
+    }
     
+    private static Collection<File> getFiles(String table, final String cf)
+    {
+        List<File> found = new ArrayList<File>();
+        for (String path : DatabaseDescriptor.getAllDataFileLocationsForTable(table))
+        {
+            File[] dbFiles = new File(path).listFiles(new FileFilter()
+            {
+                public boolean accept(File pathname)
+                {
+                    return pathname.getName().startsWith(cf + "-") && pathname.getName().endsWith(".db")
&& pathname.exists();
+                            
+                }
+            });
+            for (File f : dbFiles)
+                found.add(f);
+        }
+        return found;
+    }
 }



Mime
View raw message