cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject svn commit: r1158642 - in /cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/ColumnFamilyStoreMBean.java service/StorageService.java service/StorageServiceMBean.java tools/NodeCmd.java tools/NodeProbe.java
Date Wed, 17 Aug 2011 11:46:55 GMT
Author: xedin
Date: Wed Aug 17 11:46:55 2011
New Revision: 1158642

URL: http://svn.apache.org/viewvc?rev=1158642&view=rev
Log:
Add 'load new SSTables' functionality to JMX and corresponding "refresh" command to the nodetool
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-2991

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Aug 17 11:46:55
2011
@@ -235,22 +235,10 @@ public class ColumnFamilyStore implement
         List<SSTableReader> sstables = new ArrayList<SSTableReader>();
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name,
columnFamilyName, false, false).entrySet())
         {
-            SSTableReader sstable;
-            try
-            {
-                sstable = SSTableReader.open(sstableFiles.getKey(), sstableFiles.getValue(),
savedKeys, data, metadata, this.partitioner);
-            }
-            catch (FileNotFoundException ex)
-            {
-                logger.error("Missing sstable component in " + sstableFiles + "; skipped
because of " + ex.getMessage());
-                continue;
-            }
-            catch (IOException ex)
-            {
-                logger.error("Corrupt sstable " + sstableFiles + "; skipped", ex);
-                continue;
-            }
-            sstables.add(sstable);
+            SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata,
partitioner);
+
+            if (reader != null) // if == null, logger errors where already fired
+                sstables.add(reader);
         }
         data.addSSTables(sstables);
 
@@ -512,6 +500,97 @@ public class ColumnFamilyStore implement
     }
 
     /**
+     * See #{@code StorageService.loadNewSSTables(String, String)} for more info
+     *
+     * @param ksName The keyspace name
+     * @param cfName The columnFamily name
+     */
+    public static synchronized void loadNewSSTables(String ksName, String cfName)
+    {
+        /** ks/cf existence checks will be done by open and getCFS methods for us */
+        Table table = Table.open(ksName);
+        table.getColumnFamilyStore(cfName).loadNewSSTables();
+    }
+
+    /**
+     * #{@inheritDoc}
+     */
+    public synchronized void loadNewSSTables()
+    {
+        logger.info("Loading new SSTables for " + table.name + "/" + columnFamily + "...");
+
+        // current view over ColumnFamilyStore
+        DataTracker.View view = data.getView();
+        // descriptors of currently registered SSTables
+        Set<Descriptor> currentDescriptors = new HashSet<Descriptor>();
+        // going to hold new SSTable view of the CFS containing old and new SSTables
+        Set<SSTableReader> sstables = new HashSet<SSTableReader>();
+        Set<DecoratedKey> savedKeys = keyCache.readSaved();
+        // get the max generation number, to prevent generation conflicts
+        int generation = 0;
+
+        for (SSTableReader reader : view.sstables)
+        {
+            sstables.add(reader); // first of all, add old SSTables
+            currentDescriptors.add(reader.descriptor);
+
+            if (reader.descriptor.generation > generation)
+                generation = reader.descriptor.generation;
+        }
+
+        SSTableReader reader;
+        // set to true if we have at least one new SSTable to load
+        boolean atLeastOneNew = false;
+
+        for (Map.Entry<Descriptor, Set<Component>> rawSSTable : files(table.name,
columnFamily, false, false).entrySet())
+        {
+            Descriptor descriptor = rawSSTable.getKey();
+
+            if (currentDescriptors.contains(descriptor))
+                continue; // old (initialized) SSTable found, skipping
+
+            if (!descriptor.cfname.equals(columnFamily))
+                continue;
+
+            if (descriptor.isFromTheFuture())
+                throw new RuntimeException(String.format("Can't open sstables from the future!
Current version %s, found file: %s",
+                                                         Descriptor.CURRENT_VERSION,
+                                                         descriptor));
+
+            logger.info("Initializing new SSTable {}", rawSSTable);
+            reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner);
+
+            if (reader == null)
+                continue; // something wrong with SSTable, skipping
+
+            sstables.add(reader);
+
+            if (descriptor.generation > generation)
+                generation = descriptor.generation;
+
+            if (!atLeastOneNew) // set flag only once
+                atLeastOneNew = true;
+        }
+
+        if (!atLeastOneNew)
+        {
+            logger.info("No new SSTables where found for " + table.name + "/" + columnFamily);
+            return;
+        }
+
+        logger.info("Loading new SSTable Set for " + table.name + "/" + columnFamily + ":
" + sstables);
+        data.addSSTables(sstables); // this will call updateCacheSizes() for us
+
+        logger.info("Requesting a full secondary index re-build for " + table.name + "/"
+ columnFamily);
+        indexManager.buildSecondaryIndexes(sstables, indexManager.getIndexedColumns());
+
+        logger.info("Setting up new generation: " + generation);
+        fileIndexGenerator.set(generation);
+
+        logger.info("Done loading load new SSTables for " + table.name + "/" + columnFamily);
+    }
+
+    /**
      * @return the name of the column family
      */
     public String getColumnFamilyName()
@@ -1892,4 +1971,28 @@ public class ColumnFamilyStore implement
     {
        return indexManager.getBuiltIndexes();
     }
+
+    private static SSTableReader openSSTableReader(Map.Entry<Descriptor, Set<Component>>
rawSSTable,
+                                                   Set<DecoratedKey> savedKeys,
+                                                   DataTracker tracker,
+                                                   CFMetaData metadata,
+                                                   IPartitioner partitioner)
+    {
+        SSTableReader reader = null;
+
+        try
+        {
+            reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys,
tracker, metadata, partitioner);
+        }
+        catch (FileNotFoundException ex)
+        {
+            logger.error("Missing sstable component in " + rawSSTable + "; skipped because
of " + ex.getMessage());
+        }
+        catch (IOException ex)
+        {
+            logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
+        }
+
+        return reader;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Aug 17
11:46:55 2011
@@ -234,4 +234,10 @@ public interface ColumnFamilyStoreMBean
 
     public int getRowCacheKeysToSave();
     public void setRowCacheKeysToSave(int keysToSave);
+
+    /**
+     * Scan through Keyspace/ColumnFamily's data directory
+     * determine which SSTables should be loaded and load them
+     */
+    public void loadNewSSTables();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 17 11:46:55
2011
@@ -2608,4 +2608,12 @@ public class StorageService implements I
     {
         SSTableDeletingTask.rescheduleFailedTasks();
     }
+
+    /**
+     * #{@inheritDoc}
+     */
+    public void loadNewSSTables(String ksName, String cfName)
+    {
+        ColumnFamilyStore.loadNewSSTables(ksName, cfName);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Aug
17 11:46:55 2011
@@ -323,4 +323,12 @@ public interface StorageServiceMBean
     public void bulkLoad(String directory);
 
     public void rescheduleFailedDeletions();
+
+    /**
+     * Load new SSTables to the given keyspace/columnFamily
+     *
+     * @param ksName The parent keyspace name
+     * @param cfName The ColumnFamily name where SSTables belong
+     */
+    public void loadNewSSTables(String ksName, String cfName);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Aug 17 11:46:55 2011
@@ -79,7 +79,8 @@ public class NodeCmd
         DECOMMISSION, MOVE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
         COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
-        DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS
+        DISABLETHRIFT, ENABLETHRIFT, STATUSTHRIFT, JOIN, SETCOMPACTIONTHROUGHPUT, GETENDPOINTS,
+        REFRESH
     }
 
     
@@ -125,6 +126,7 @@ public class NodeCmd
         addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", "Invalidate the key
cache of one or more column family");
         addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", "Print
min and max compaction thresholds for a given column family");
         addCmdHelp(header, "cfhistograms <keyspace> <cfname>", "Print statistic
histograms for a given column family");
+        addCmdHelp(header, "refresh <keyspace> <cf-name>", "Load newly placed
SSTables to the system without restart.");
 
         // Three args
         addCmdHelp(header, "getendpoints <keyspace> <cf> <key>", "Print
the end points that owns the key");
@@ -699,6 +701,11 @@ public class NodeCmd
                 nodeCmd.printEndPoints(arguments[0], arguments[1], arguments[2], System.out);
                 break;
 
+            case REFRESH:
+                if (arguments.length != 2) { badUse("load_new_sstables requires ks and cf
args"); }
+                probe.loadNewSSTables(arguments[0], arguments[1]);
+                break;
+
             default :
                 throw new RuntimeException("Unreachable code.");
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1158642&r1=1158641&r2=1158642&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Aug 17 11:46:55
2011
@@ -591,6 +591,11 @@ public class NodeProbe
     {
         return msProxy.getDroppedMessages();
     }
+
+    public void loadNewSSTables(String ksName, String cfName)
+    {
+        ssProxy.loadNewSSTables(ksName, cfName);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>



Mime
View raw message