cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r1073896 [1/2] - in /cassandra/trunk: ./ contrib/ contrib/client_only/conf/ contrib/client_only/src/ contrib/stress/src/org/apache/cassandra/contrib/stress/ contrib/stress/src/org/apache/cassandra/contrib/stress/util/ debian/ interface/thri...
Date Wed, 23 Feb 2011 19:32:44 GMT
Author: gdusbabek
Date: Wed Feb 23 19:32:42 2011
New Revision: 1073896

URL: http://svn.apache.org/viewvc?rev=1073896&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/test/data/corrupt-sstables/
      - copied from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/
    cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Data.db
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
    cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Filter.db
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
    cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Index.db
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
    cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Statistics.db
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowCacheTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
      - copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/build.xml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/client_only/conf/cassandra.yaml
    cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
    cassandra/trunk/debian/init
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
    cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1071868
+/cassandra/branches/cassandra-0.7:1026516-1073884
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb 23 19:32:42 2011
@@ -22,7 +22,21 @@
  * refactor stress.py to have only one copy of the format string 
    used for creating row keys (CASSANDRA-2108)
  * validate index names for \w+ (CASSANDRA-2196)
- * Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
+ * Fix Cassandra cli to respect timeout if schema does not settle 
+   (CASSANDRA-2187)
+ * update memtable_throughput to be a long (CASSANDRA-2158)
+ * fix for compaction and cleanup writing old-format data into new-version 
+   sstable (CASSANDRA-2211, -2216)
+ * add nodetool scrub (CASSANDRA-2217)
+ * fix sstable2json large-row pagination (CASSANDRA-2188)
+ * fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
+ * fix BRAF performance when seeking to EOF (CASSANDRA-2218)
+ * check for memtable flush_after_mins exceeded every 10s (CASSANDRA-2183)
+ * fix cache saving on Windows (CASSANDRA-2207)
+ * add validateSchemaAgreement call + synchronization to schema
+   modification operations (CASSANDRA-2222)
+ * fix for reversed slice queries on large rows (CASSANDRA-2212)
+ * fat clients were writing local data (CASSANDRA-2223)
  * update memtable_throughput to be a long (CASSANDRA-2158)
 
 
@@ -34,6 +48,7 @@
 
 
 0.7.1
+ * refactor MessageDigest creation code. (CASSANDRA-2107)
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)
  * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Feb 23 19:32:42 2011
@@ -1,3 +1,4 @@
+<<<<<<< .working
 Whatever
 ========
 
@@ -24,6 +25,23 @@ JMX
     - By default, JMX now listens on port 7199.
 
 
+=======
+0.7.3
+=====
+
+Upgrading
+---------
+    - 0.7.1 and 0.7.2 shipped with a bug that caused incorrect row-level
+      bloom filters to be generated when compacting sstables generated
+      with earlier versions.  This would manifest in IOExceptions during
+      column name-based queries.  0.7.3 provides "nodetool scrub" to 
+      rebuild sstables with correct bloom filters, with no data lost.
+      (If your cluster was never on 0.7.0 or earlier, you don't have to
+      worry about this.)  Note that nodetool scrub will snapshot your
+      data files before rebuilding, just in case.
+
+
+>>>>>>> .merge-right.r1073884
 0.7.1
 =====
 

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Feb 23 19:32:42 2011
@@ -579,6 +579,7 @@
   <target name="test" depends="build-test" description="Execute unit tests">
     <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+      <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
     </testmacro>
     <testmacro suitename="driverunit" inputdir="${test.src.driver}" timeout="60000">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1071868
+/cassandra/branches/cassandra-0.7/contrib:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/client_only/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/client_only/conf/cassandra.yaml?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/client_only/conf/cassandra.yaml (original)
+++ cassandra/trunk/contrib/client_only/conf/cassandra.yaml Wed Feb 23 19:32:42 2011
@@ -34,6 +34,8 @@ hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, hints will be dropped.
 max_hint_window_in_ms: 3600000 # one hour
+# Sleep this long after delivering each row or row fragment
+hinted_handoff_throttle_delay_in_ms: 50
 
 # authentication backend, implementing IAuthenticator; used to identify users
 authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
@@ -90,6 +92,31 @@ commitlog_sync: periodic
 # milliseconds.
 commitlog_sync_period_in_ms: 10000
 
+# emergency pressure valve: each time heap usage after a full (CMS)
+# garbage collection is above this fraction of the max, Cassandra will
+# flush the largest memtables.  
+#
+# Set to 1.0 to disable.  Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+#
+# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
+# it is most effective under light to moderate load, or read-heavy
+# workloads; under truly massive write load, it will often be too
+# little, too late.
+flush_largest_memtables_at: 0.75
+
+# emergency pressure valve #2: the first time heap usage after a full
+# (CMS) garbage collection is above this fraction of the max,
+# Cassandra will reduce cache maximum _capacity_ to the given fraction
+# of the current _size_.  Should usually be set substantially above
+# flush_largest_memtables_at, since that will have less long-term
+# impact on the system.  
+# 
+# Set to 1.0 to disable.  Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+reduce_cache_sizes_at: 0.85
+reduce_cache_capacity_to: 0.6
+
 # Addresses of hosts that are deemed contact points.
 # Cassandra nodes use this list of hosts to find each other and learn
 # the topology of the ring.  You must change this if you are running
@@ -199,6 +226,11 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable.  Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
 # Time to wait for a reply from other nodes before failing the command 
 rpc_timeout_in_ms: 10000
 

Modified: cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java (original)
+++ cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java Wed Feb 23 19:32:42 2011
@@ -43,8 +43,8 @@ public class ClientOnlyExample
 
     private static final String KEYSPACE = "Keyspace1";
     private static final String COLUMN_FAMILY = "Standard1";
-
-    private static void testWriting() throws Exception
+    
+    private static void startClient() throws Exception
     {
         StorageService.instance.initClient();
         // sleep for a bit so that gossip can do its thing.
@@ -56,7 +56,10 @@ public class ClientOnlyExample
         {
             throw new AssertionError(ex);
         }
+    }
 
+    private static void testWriting() throws Exception
+    {
         // do some writing.
         for (int i = 0; i < 100; i++)
         {
@@ -72,22 +75,10 @@ public class ClientOnlyExample
             System.out.println("wrote key" + i);
         }
         System.out.println("Done writing.");
-        StorageService.instance.stopClient();
     }
 
     private static void testReading() throws Exception
     {
-        StorageService.instance.initClient();
-        // sleep for a bit so that gossip can do its thing.
-        try
-        {
-            Thread.sleep(10000L);
-        }
-        catch (Exception ex)
-        {
-            throw new AssertionError(ex);
-        }
-
         // do some queries.
         Collection<ByteBuffer> cols = new ArrayList<ByteBuffer>()
         {{
@@ -114,11 +105,6 @@ public class ClientOnlyExample
             else
                 System.err.println("This output indicates that nothing was read.");
         }
-
-        // no need to do this:
-        // StorageService.instance().decommission();
-        // do this instead:
-        StorageService.instance.stopClient();
     }
 
     /**
@@ -137,17 +123,26 @@ public class ClientOnlyExample
      */
     public static void main(String args[]) throws Exception
     {
-        if (args.length == 0)
-            System.out.println("run with \"read\" or \"write\".");
-        else if ("read".equalsIgnoreCase(args[0]))
+        startClient();
+        setupKeyspace(createConnection());
+        testWriting();
+        logger.info("Writing is done. Sleeping, then will try to read.");
+        try
         {
-            testReading();
+            Thread.currentThread().sleep(10000);
         }
-        else if ("write".equalsIgnoreCase(args[0]))
+        catch (InterruptedException ex) 
         {
-            setupKeyspace(createConnection());
-            testWriting();
+            throw new RuntimeException(ex);
         }
+        
+        testReading();
+        
+        // no need to do this:
+        // StorageService.instance().decommission();
+        // do this instead:
+        StorageService.instance.stopClient();
+        System.exit(0); // the only way to really stop the process.
     }
     
     /**
@@ -159,15 +154,22 @@ public class ClientOnlyExample
         CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
         cfDefList.add(columnFamily);
 
-        client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
-        int magnitude = client.describe_ring(KEYSPACE).size();
-        try
+        try 
         {
-            Thread.sleep(1000 * magnitude);
+            client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
+            int magnitude = client.describe_ring(KEYSPACE).size();
+            try
+            {
+                Thread.sleep(1000 * magnitude);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        catch (InterruptedException e)
+        catch (InvalidRequestException probablyExists) 
         {
-            throw new RuntimeException(e);
+            logger.warn("Problem creating keyspace: " + probablyExists.getMessage());    
         }
     }
 

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Wed Feb 23 19:32:42 2011
@@ -162,7 +162,7 @@ public class Session
                 STDev = Float.parseFloat(cmd.getOptionValue("s"));
 
             if (cmd.hasOption("r"))
-                random = Boolean.parseBoolean(cmd.getOptionValue("r"));
+                random = true;
 
             if (cmd.hasOption("f"))
             {

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java Wed Feb 23 19:32:42 2011
@@ -17,18 +17,18 @@
  */
 package org.apache.cassandra.contrib.stress.util;
 
-import org.apache.cassandra.contrib.stress.Session;
-import org.apache.cassandra.contrib.stress.Stress;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.InvalidRequestException;
-
 import java.math.BigInteger;
 import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.cassandra.contrib.stress.Session;
+import org.apache.cassandra.contrib.stress.Stress;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+
 public abstract class OperationThread extends Thread
 {
     public final int index;
@@ -125,7 +125,9 @@ public abstract class OperationThread ex
     private double nextGaussian(int mu, float sigma)
     {
         Random random = Stress.randomizer;
+
         Double currentState = nextGaussian;
+        nextGaussian = null;
 
         if (currentState == null)
         {
@@ -146,21 +148,14 @@ public abstract class OperationThread ex
      */
     private String getMD5(String input)
     {
-        try
-        {
-            MessageDigest md = MessageDigest.getInstance("MD5");
-            byte[] messageDigest = md.digest(input.getBytes());
-            StringBuilder hash = new StringBuilder(new BigInteger(1, messageDigest).toString(16));
+        MessageDigest md = FBUtilities.threadLocalMD5Digest();
+        byte[] messageDigest = md.digest(input.getBytes());
+        StringBuilder hash = new StringBuilder(new BigInteger(1, messageDigest).toString(16));
 
-            while (hash.length() < 32)
-                hash.append("0").append(hash);
+        while (hash.length() < 32)
+            hash.append("0").append(hash);
 
-            return hash.toString();
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return hash.toString();
     }
 
     /**

Modified: cassandra/trunk/debian/init
URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/init?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/debian/init (original)
+++ cassandra/trunk/debian/init Wed Feb 23 19:32:42 2011
@@ -119,6 +119,8 @@ do_start()
     #   2 if daemon could not be started
     is_running && return 1
 
+    ulimit -l unlimited
+
     cassandra_home=`getent passwd cassandra | awk -F ':' '{ print $6; }'`
     cd /    # jsvc doesn't chdir() for us
 

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1073884
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java Wed Feb 23 19:32:42 2011
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -111,16 +110,12 @@ public class SimpleAuthenticator impleme
                     authenticated = password.equals(props.getProperty(username));
                     break;
                 case MD5:
-                    authenticated = MessageDigest.isEqual(MessageDigest.getInstance("MD5").digest(password.getBytes()), FBUtilities.hexToBytes(props.getProperty(username)));
+                    authenticated = MessageDigest.isEqual(FBUtilities.threadLocalMD5Digest().digest(password.getBytes()), FBUtilities.hexToBytes(props.getProperty(username)));
                     break;
                 default:
                     throw new RuntimeException("Unknown PasswordMode " + mode);
             }
         }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new RuntimeException("You requested MD5 checking but the MD5 digest algorithm is not available: " + e.getMessage());
-        }
         catch (IOException e)
         {
             throw new RuntimeException("Authentication table file given by property " + PASSWD_FILENAME_PROPERTY + " could not be opened: " + e.getMessage());

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Wed Feb 23 19:32:42 2011
@@ -25,6 +25,7 @@ import java.nio.charset.CharacterCodingE
 import java.util.*;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.cassandra.auth.SimpleAuthenticator;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -753,8 +755,8 @@ public class CliClient extends CliUserHe
             KsDef updatedKsDef = updateKsDefAttributes(statement, currentKsDef);
 
             String mySchemaVersion = thriftClient.system_update_keyspace(updatedKsDef);
-            validateSchemaIsSettled(mySchemaVersion);
             sessionState.out.println(mySchemaVersion);
+            validateSchemaIsSettled(mySchemaVersion);
             keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName));
         }
         catch (InvalidRequestException e)
@@ -2087,13 +2089,13 @@ public class CliClient extends CliUserHe
     /** validates schema is propagated to all nodes */
     private void validateSchemaIsSettled(String currentVersionId)
     {
-        Map<String, List<String>> versions;
-
-        long start = System.currentTimeMillis();
-        long limit = start + sessionState.schema_mwt;
+        sessionState.out.println("Waiting for schema agreement...");
+        Map<String, List<String>> versions = null;
 
+        long limit = System.currentTimeMillis() + sessionState.schema_mwt;
         boolean inAgreement = false;
-        while (limit - start >= 0)
+        outer:
+        while (limit - System.currentTimeMillis() >= 0 && !inAgreement)
         {
             try
             {
@@ -2105,29 +2107,23 @@ public class CliClient extends CliUserHe
                 continue;
             }
 
-            boolean currentlyInAgreement = true;
             for (String version : versions.keySet())
             {
-                if (!version.equals(currentVersionId))
-                {
-                    currentlyInAgreement = false;
-                    break; // only one disagreement is enough
-                }
-            }
-
-            if (currentlyInAgreement)
-            {
-                inAgreement = true;
-                break; // all nodes are in agreement no need to loop
+                if (!version.equals(currentVersionId) && !version.equals(StorageProxy.UNREACHABLE))
+                    continue outer;
             }
-            start = System.currentTimeMillis();
+            inAgreement = true;
         }
 
+        if (versions.containsKey(StorageProxy.UNREACHABLE))
+            sessionState.err.printf("Warning: unreachable nodes %s", Joiner.on(", ").join(versions.get(StorageProxy.UNREACHABLE)));
         if (!inAgreement)
         {
-            sessionState.err.printf("The schema has not settled in %d seconds and further migrations are ill-advised until it does.%n", sessionState.schema_mwt / 1000);
+            sessionState.err.printf("The schema has not settled in %d seconds; further migrations are ill-advised until it does.%nVersions are %s%n",
+                                    sessionState.schema_mwt / 1000, FBUtilities.toString(versions));
             System.exit(-1);
         }
+        sessionState.out.println("... schemas agree across the cluster");
     }
 
     private static class CfDefNamesComparator implements Comparator<CfDef>

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Feb 23 19:32:42 2011
@@ -212,6 +212,7 @@ public final class CFMetaData
         this.memtableThroughputInMb = memtableThroughputInMb == null
                                       ? DEFAULT_MEMTABLE_THROUGHPUT_IN_MB
                                       : memtableThroughputInMb;
+
         this.memtableOperationsInMillions = memtableOperationsInMillions == null
                                             ? DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS
                                             : memtableOperationsInMillions;
@@ -904,28 +905,22 @@ public final class CFMetaData
 
     public static void validateMemtableSettings(org.apache.cassandra.thrift.CfDef cf_def) throws ConfigurationException
     {
-        if (cf_def.isSetMemtable_flush_after_mins() && cf_def.memtable_flush_after_mins <= 0) {
-            throw new ConfigurationException("memtable_flush_after_mins cannot be non-positive");
-        }
-        if (cf_def.isSetMemtable_throughput_in_mb() && cf_def.memtable_throughput_in_mb <= 0) {
-            throw new ConfigurationException("memtable_throughput_in_mb cannot be non-positive.");
-        }
-        if (cf_def.isSetMemtable_operations_in_millions() && cf_def.memtable_operations_in_millions <= 0) {
-            throw new ConfigurationException("memtable_operations_in_millions cannot be non-positive");
-        }
+        if (cf_def.isSetMemtable_flush_after_mins())
+            DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins);
+        if (cf_def.isSetMemtable_throughput_in_mb())
+            DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb);
+        if (cf_def.isSetMemtable_operations_in_millions())
+            DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions);
     }
 
     public static void validateMemtableSettings(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
     {
-        if (cf_def.memtable_flush_after_mins != null && cf_def.memtable_flush_after_mins <= 0) {
-            throw new ConfigurationException("memtable_flush_after_mins cannot be non-positive");
-        }
-        if (cf_def.memtable_throughput_in_mb != null && cf_def.memtable_throughput_in_mb <= 0) {
-            throw new ConfigurationException("memtable_throughput_in_mb cannot be non-positive.");
-        }
-        if (cf_def.memtable_operations_in_millions != null && cf_def.memtable_operations_in_millions <= 0) {
-            throw new ConfigurationException("memtable_operations_in_millions cannot be non-positive");
-        }
+        if (cf_def.memtable_flush_after_mins != null)
+            DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins);
+        if (cf_def.memtable_throughput_in_mb != null)
+            DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb);
+        if (cf_def.memtable_operations_in_millions != null)
+            DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Feb 23 19:32:42 2011
@@ -112,7 +112,7 @@ public class DatabaseDescriptor
 
         return url;
     }
-
+    
     static
     {
         try
@@ -369,10 +369,6 @@ public class DatabaseDescriptor
                     throw new ConfigurationException("saved_caches_directory missing");
             }
 
-            /* threshold after which commit log should be rotated. */
-            if (conf.commitlog_rotation_threshold_in_mb != null)
-                CommitLog.setSegmentSize(conf.commitlog_rotation_threshold_in_mb * 1024 * 1024);
-
             // Hardcoded system tables
             KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
                                                    LocalStrategy.class,
@@ -918,6 +914,14 @@ public class DatabaseDescriptor
         currentIndex = (currentIndex + 1) % conf.data_file_directories.length;
         return dataFileDirectory;
     }
+    
+    /* threshold after which commit log should be rotated. */
+    public static int getCommitLogSegmentSize() 
+    {
+        return conf.commitlog_rotation_threshold_in_mb != null ?
+               conf.commitlog_rotation_threshold_in_mb * 1024 * 1024 :
+               128*1024*1024;
+    }
 
     public static String getCommitLogLocation()
     {
@@ -1201,4 +1205,24 @@ public class DatabaseDescriptor
     {
         return conf.compaction_preheat_key_cache;
     }
+
+    public static void validateMemtableThroughput(int sizeInMB) throws ConfigurationException
+    {
+        if (sizeInMB <= 0)
+            throw new ConfigurationException("memtable_throughput_in_mb must be greater than 0.");
+    }
+
+    public static void validateMemtableOperations(double operationsInMillions) throws ConfigurationException
+    {
+        if (operationsInMillions <= 0)
+            throw new ConfigurationException("memtable_operations_in_millions must be greater than 0.0.");
+        if (operationsInMillions > Long.MAX_VALUE / 1024 * 1024)
+            throw new ConfigurationException("memtable_operations_in_millions must be less than " + Long.MAX_VALUE / 1024 * 1024);
+    }
+
+    public static void validateMemtableFlushPeriod(int minutes) throws ConfigurationException
+    {
+        if (minutes <= 0)
+            throw new ConfigurationException("memtable_flush_after_mins must be greater than 0.");
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
@@ -373,18 +372,9 @@ public class ColumnFamily implements ICo
 
     public static ByteBuffer digest(ColumnFamily cf)
     {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e)
-        {
-            throw new AssertionError(e);
-        }
+        MessageDigest digest = FBUtilities.threadLocalMD5Digest();
         if (cf != null)
             cf.updateDigest(digest);
-
         return ByteBuffer.wrap(digest.digest());
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 23 19:32:42 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.concurrent.N
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -977,6 +978,12 @@ public class ColumnFamilyStore implement
         CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }
 
+    public void scrub() throws ExecutionException, InterruptedException
+    {
+        snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
+        CompactionManager.instance.performScrub(ColumnFamilyStore.this);
+    }
+
     void markCompacted(Collection<SSTableReader> sstables)
     {
         ssTables.markCompacted(sstables);
@@ -1021,12 +1028,12 @@ public class ColumnFamilyStore implement
         flushable.flushAndSignal(latch, flushSorter, flushWriter);
     }
 
-    public int getMemtableColumnsCount()
+    public long getMemtableColumnsCount()
     {
         return getMemtableThreadSafe().getCurrentOperations();
     }
 
-    public int getMemtableDataSize()
+    public long getMemtableDataSize()
     {
         return getMemtableThreadSafe().getCurrentThroughput();
     }
@@ -1668,26 +1675,8 @@ public class ColumnFamilyStore implement
         return metadata.comparator;
     }
 
-    /**
-     * Take a snap shot of this columnfamily store.
-     * 
-     * @param snapshotName the name of the associated with the snapshot 
-     */
-    public void snapshot(String snapshotName)
+    private void snapshotWithoutFlush(String snapshotName)
     {
-        try
-        {
-            forceBlockingFlush();
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
         for (SSTableReader ssTable : ssTables)
         {
             try
@@ -1710,6 +1699,30 @@ public class ColumnFamilyStore implement
         }
     }
 
+
+    /**
+     * Take a snap shot of this columnfamily store.
+     * 
+     * @param snapshotName the name of the associated with the snapshot 
+     */
+    public void snapshot(String snapshotName)
+    {
+        try
+        {
+            forceBlockingFlush();
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        snapshotWithoutFlush(snapshotName);
+    }
+
     public boolean hasUnreclaimedSpace()
     {
         return ssTables.getLiveSize() < ssTables.getTotalSize();
@@ -2012,24 +2025,20 @@ public class ColumnFamilyStore implement
     {
         return memsize.value();
     }
-    public void setMemtableThroughputInMB(int size)
+    public void setMemtableThroughputInMB(int size) throws ConfigurationException
     {
-        if (size <= 0) {
-            throw new RuntimeException("MemtableThroughputInMB must be greater than 0.");
-        }
-        this.memsize.set(size);
+        DatabaseDescriptor.validateMemtableThroughput(size);
+        memsize.set(size);
     }
 
     public double getMemtableOperationsInMillions()
     {
         return memops.value();
     }
-    public void setMemtableOperationsInMillions(double ops)
+    public void setMemtableOperationsInMillions(double ops) throws ConfigurationException
     {
-        if (ops <= 0) {
-            throw new RuntimeException("MemtableOperationsInMillions must be greater than 0.0.");
-        }
-        this.memops.set(ops);
+        DatabaseDescriptor.validateMemtableOperations(ops);
+        memops.set(ops);
     }
 
     public long estimateKeys()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Feb 23 19:32:42 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.config.ConfigurationException;
+
 /**
  * The MBean interface for ColumnFamilyStore
  */
@@ -38,14 +40,14 @@ public interface ColumnFamilyStoreMBean
      * 
      * @return The size in bytes.
      */
-    public int getMemtableDataSize();
+    public long getMemtableDataSize();
     
     /**
      * Returns the total number of columns present in the memtable.
      * 
      * @return The number of columns.
      */
-    public int getMemtableColumnsCount();
+    public long getMemtableColumnsCount();
     
     /**
      * Returns the number of times that a flush has resulted in the
@@ -211,10 +213,10 @@ public interface ColumnFamilyStoreMBean
     public void setMemtableFlushAfterMins(int time);
 
     public int getMemtableThroughputInMB();
-    public void setMemtableThroughputInMB(int size);
+    public void setMemtableThroughputInMB(int size) throws ConfigurationException;
 
     public double getMemtableOperationsInMillions();
-    public void setMemtableOperationsInMillions(double ops);
+    public void setMemtableOperationsInMillions(double ops) throws ConfigurationException;
 
     public long estimateKeys();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Feb 23 19:32:42 2011
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.AbstractCompactedRow;
-import org.apache.cassandra.io.CompactionIterator;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -120,7 +120,7 @@ public class CompactionManager implement
                             Collections.sort(sstables);
                             int gcBefore = cfs.isIndex()
                                          ? Integer.MAX_VALUE
-                                         : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+                                         : getDefaultGcBefore(cfs);
                             return doCompaction(cfs,
                                                 sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
                                                 gcBefore);
@@ -182,9 +182,31 @@ public class CompactionManager implement
         executor.submit(runnable).get();
     }
 
+    public void performScrub(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+    {
+        Callable<Object> runnable = new Callable<Object>()
+        {
+            public Object call() throws IOException
+            {
+                compactionLock.lock();
+                try
+                {
+                    if (!cfStore.isInvalid())
+                        doScrub(cfStore);
+                    return this;
+                }
+                finally
+                {
+                    compactionLock.unlock();
+                }
+            }
+        };
+        executor.submit(runnable).get();
+    }
+
     public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
-        submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.getGcGraceSeconds()).get();
+        submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
     }
 
     public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
@@ -257,7 +279,7 @@ public class CompactionManager implement
         }
 
         ColumnFamilyStore cfs = Table.open(ksname).getColumnFamilyStore(cfname);
-        submitUserDefined(cfs, descriptors, (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds());
+        submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
     }
 
     private Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
@@ -476,6 +498,101 @@ public class CompactionManager implement
     }
 
     /**
+     * Deserialize everything in the CFS and re-serialize w/ the newest version.  Also attempts to recover
+     * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
+     * from early ByteBuffer bugs.
+     *
+     * @throws IOException
+     */
+    private void doScrub(ColumnFamilyStore cfs) throws IOException
+    {
+        assert !cfs.isIndex();
+        Table table = cfs.table;
+        Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
+
+        for (final SSTableReader sstable : cfs.getSSTables())
+        {
+            logger.info("Scrubbing " + sstable);
+
+            // Calculate the expected compacted filesize
+            String compactionFileLocation = table.getDataFileLocation(sstable.length());
+            if (compactionFileLocation == null)
+                throw new IOException("disk full");
+
+            int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
+                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+            if (logger.isDebugEnabled())
+              logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+            // loop through each row, deserializing to check for damage.
+            // we'll also loop through the index at the same time, using the position from the index to recover if the
+            // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+            // "ahead" of the data file.)
+            final BufferedRandomAccessFile dataFile = BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+            String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
+            BufferedRandomAccessFile indexFile = BufferedRandomAccessFile.getUncachingReader(indexFilename);
+            ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+            assert indexFile.readLong() == 0;
+
+            SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
+            executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+
+            while (!dataFile.isEOF())
+            {
+                long rowStart = dataFile.getFilePointer();
+                DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                ByteBuffer currentIndexKey = nextIndexKey;
+                nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+                long nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+
+                long dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                long dataStart = dataFile.getFilePointer();
+
+                SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                writer.mark();
+                try
+                {
+                    writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+                }
+                catch (Exception e)
+                {
+                    logger.warn("Error reading row " + ByteBufferUtil.bytesToHex(key.key) + "(stacktrace follows)", e);
+                    writer.reset();
+                    
+                    long dataStartFromIndex = rowStart + 2 + currentIndexKey.remaining();
+                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                    {
+                        logger.info(String.format("Retrying %s as key %s from row index",
+                                                  ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)));
+                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+                        long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                        row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                        try
+                        {
+                            writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+                        }
+                        catch (Exception e2)
+                        {
+                            logger.info("Retry failed too.  Skipping to next row (retry's stacktrace follows)", e2);
+                            writer.reset();
+                            dataFile.seek(nextRowPositionFromIndex);
+                        }
+                    }
+                    else
+                    {
+                        logger.info("Skipping to next row");
+                        dataFile.seek(nextRowPositionFromIndex);
+                    }
+                }
+            }
+
+            SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+            logger.info("Scrub of " + sstable + " complete");
+        }
+    }
+
+    /**
      * This function goes over each file and removes the keys that the node is not responsible for
      * and only keeps keys that this node is responsible for.
      *
@@ -500,7 +617,7 @@ public class CompactionManager implement
             long totalkeysWritten = 0;
 
             int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
-                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
             if (logger.isDebugEnabled())
               logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
@@ -516,7 +633,7 @@ public class CompactionManager implement
                     if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
                         writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
-                        writer.append(new EchoedRow(row));
+                        writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
                         totalkeysWritten++;
                     }
                     else
@@ -569,6 +686,21 @@ public class CompactionManager implement
         }
     }
 
+    /**
+     * @return an AbstractCompactedRow implementation to write the row in question.
+     * If the data is from a current-version sstable, write it unchanged.  Otherwise,
+     * re-serialize it in the latest version.
+     */
+    private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor, boolean forceDeserialize)
+    {
+        if (descriptor.isLatestVersion && !forceDeserialize)
+            return new EchoedRow(row);
+
+        return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
+               ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize)
+               : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize);
+    }
+
     private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
             throws IOException
     {
@@ -753,11 +885,16 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
+    private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+    {
+        return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+    }
+
     private static class ValidationCompactionIterator extends CompactionIterator
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
         {
-            super(cfs, cfs.getSSTables(), (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(), true);
+            super(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true);
         }
 
         @Override
@@ -966,4 +1103,38 @@ public class CompactionManager implement
             return "Cleanup of " + sstable.getColumnFamilyName();
         }
     }
+
+    private static class ScrubInfo implements ICompactionInfo
+    {
+        private final BufferedRandomAccessFile dataFile;
+        private final SSTableReader sstable;
+
+        public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
+        {
+            this.dataFile = dataFile;
+            this.sstable = sstable;
+        }
+
+        public long getTotalBytes()
+        {
+            try
+            {
+                return dataFile.length();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public long getBytesComplete()
+        {
+            return dataFile.getFilePointer();
+        }
+
+        public String getTaskType()
+        {
+            return "Scrub " + sstable;
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb 23 19:32:42 2011
@@ -255,6 +255,15 @@ public class HintedHandOffManager implem
     {
         Gossiper gossiper = Gossiper.instance;
         int waited = 0;
+        // first, wait for schema to be gossiped.
+        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
+            Thread.sleep(1000);
+            waited += 1000;
+            if (waited > 2 * StorageService.RING_DELAY)
+                throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+        }
+        waited = 0;
+        // then wait for the correct schema version.
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
                 gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Feb 23 19:32:42 2011
@@ -30,12 +30,14 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
 import org.apache.cassandra.db.filter.AbstractColumnIterator;
@@ -52,23 +54,23 @@ public class Memtable implements Compara
 
     private boolean isFrozen;
 
-    private final AtomicInteger currentThroughput = new AtomicInteger(0);
-    private final AtomicInteger currentOperations = new AtomicInteger(0);
+    private final AtomicLong currentThroughput = new AtomicLong(0);
+    private final AtomicLong currentOperations = new AtomicLong(0);
 
     private final long creationTime;
     private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
     public final ColumnFamilyStore cfs;
 
-    private final int THRESHOLD;
-    private final int THRESHOLD_COUNT;
+    private final long THRESHOLD;
+    private final long THRESHOLD_COUNT;
 
     public Memtable(ColumnFamilyStore cfs)
     {
 
         this.cfs = cfs;
         creationTime = System.currentTimeMillis();
-        this.THRESHOLD = cfs.getMemtableThroughputInMB() * 1024 * 1024;
-        this.THRESHOLD_COUNT = (int) (cfs.getMemtableOperationsInMillions() * 1024 * 1024);
+        THRESHOLD = cfs.getMemtableThroughputInMB() * 1024 * 1024;
+        THRESHOLD_COUNT = (long) (cfs.getMemtableOperationsInMillions() * 1024 * 1024);
     }
 
     /**
@@ -88,12 +90,12 @@ public class Memtable implements Compara
     		return 0;
     }
 
-    public int getCurrentThroughput()
+    public long getCurrentThroughput()
     {
         return currentThroughput.get();
     }
     
-    public int getCurrentOperations()
+    public long getCurrentOperations()
     {
         return currentOperations.get();
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Feb 23 19:32:42 2011
@@ -189,6 +189,9 @@ public class RowMutation implements IMut
         Table.open(table_).apply(this, true);
     }
 
+    /**
+     * Apply without touching the commitlog. For testing.
+     */
     public void applyUnsafe() throws IOException
     {
         Table.open(table_).apply(this, false);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Feb 23 19:32:42 2011
@@ -70,15 +70,18 @@ public class Table
 
     // It is possible to call Table.open without a running daemon, so it makes sense to ensure
     // proper directories here as well as in CassandraDaemon.
-    static
+    static 
     {
-        try
-        {
-            DatabaseDescriptor.createAllDirectories();
-        }
-        catch (IOException ex)
+        if (!StorageService.instance.isClientMode()) 
         {
-            throw new IOError(ex);
+            try
+            {
+                DatabaseDescriptor.createAllDirectories();
+            }
+            catch (IOException ex)
+            {
+                throw new IOError(ex);
+            }
         }
     }
 
@@ -160,36 +163,6 @@ public class Table
     }
 
     /**
-     * Do a cleanup of keys that do not belong locally.
-     */
-    public void forceCleanup() throws IOException, ExecutionException, InterruptedException
-    {
-        if (name.equals(SYSTEM_TABLE))
-            throw new UnsupportedOperationException("Cleanup of the system table is neither necessary nor wise");
-
-        // Sort the column families in order of SSTable size, so cleanup of smaller CFs
-        // can free up space for larger ones
-        List<ColumnFamilyStore> sortedColumnFamilies = new ArrayList<ColumnFamilyStore>(columnFamilyStores.values());
-        Collections.sort(sortedColumnFamilies, new Comparator<ColumnFamilyStore>()
-        {
-            // Compare first on size and, if equal, sort by name (arbitrary & deterministic).
-            public int compare(ColumnFamilyStore cf1, ColumnFamilyStore cf2)
-            {
-                long diff = (cf1.getTotalDiskSpaceUsed() - cf2.getTotalDiskSpaceUsed());
-                if (diff > 0)
-                    return 1;
-                if (diff < 0)
-                    return -1;
-                return cf1.columnFamily.compareTo(cf2.columnFamily);
-            }
-        });
-
-        // Cleanup in sorted order to free up space for the larger ones
-        for (ColumnFamilyStore cfs : sortedColumnFamilies)
-            cfs.forceCleanup();
-    }
-
-    /**
      * Take a snapshot of the entire set of column families with a given timestamp.
      * 
      * @param clientSuppliedName the tag associated with the name of the snapshot.  This
@@ -238,16 +211,6 @@ public class Table
         }
     }
     
-    /*
-     * This method is an ADMIN operation to force compaction
-     * of all SSTables on disk. 
-     */
-    public void forceCompaction() throws IOException, ExecutionException, InterruptedException
-    {
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-            CompactionManager.instance.performMajor(cfStore);
-    }
-
     /**
      * @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these).
      */
@@ -282,7 +245,8 @@ public class Table
             try
             {
                 String keyspaceDir = dataDir + File.separator + table;
-                FileUtils.createDirectory(keyspaceDir);
+                if (!StorageService.instance.isClientMode())
+                    FileUtils.createDirectory(keyspaceDir);
     
                 // remove the deprecated streaming directory.
                 File streamingDir = new File(keyspaceDir, "stream");
@@ -301,13 +265,6 @@ public class Table
             initCf(cfm.cfId, cfm.cfName);
         }
 
-        // check 10x as often as the shortest lifetime, so we can exceed all lifetimes by 10% at most
-        int minCheckMs = Integer.MAX_VALUE;
-        for (ColumnFamilyStore cfs : columnFamilyStores.values())
-        {
-            minCheckMs = Math.min(minCheckMs, cfs.getMemtableFlushAfterMins() * 60 * 1000 / 10);
-        }
-
         Runnable runnable = new Runnable()
         {
             public void run()
@@ -318,7 +275,7 @@ public class Table
                 }
             }
         };
-        flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, minCheckMs, minCheckMs, TimeUnit.MILLISECONDS);
+        flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.SECONDS);
     }
 
     public void createReplicationStrategy(KSMetaData ksm) throws ConfigurationException
@@ -370,15 +327,7 @@ public class Table
                                                                      cfName, cfId, columnFamilyStores.get(cfId));
         columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName));
     }
-    
-    public void reloadCf(Integer cfId) throws IOException
-    {
-        ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
-        assert cfs != null;
-        unloadCf(cfs);
-        initCf(cfId, cfs.getColumnFamilyName());
-    }
-    
+
     /** basically a combined drop and add */
     public void renameCf(Integer cfId, String newName) throws IOException
     {
@@ -758,4 +707,9 @@ public class Table
         cfs.truncate().get();
         logger.debug("Truncation done.");
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(name='" + name + "')";
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java Wed Feb 23 19:32:42 2011
@@ -146,8 +146,6 @@ class IndexedSliceReader extends Abstrac
             file.readInt(); // column count
             this.mark = file.mark();
             curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
-            if (reversed && curRangeIndex == indexes.size())
-                curRangeIndex--;
         }
 
         public boolean getNextBlock() throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java Wed Feb 23 19:32:42 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.ColumnFamilySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +109,13 @@ public class SSTableNamesIterator extend
 
         // we can stop early if bloom filter says none of the columns actually exist -- but,
         // we can't stop before initializing the cf above, in case there's a relevant tombstone
-        cf = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+        ColumnFamilySerializer serializer = ColumnFamily.serializer();
+        try {
+            cf = serializer.deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+        } catch (Exception e) {
+            throw new IOException
+                (serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
+        }
 
         List<ByteBuffer> filteredColumnNames = new ArrayList<ByteBuffer>(columns.size());
         for (ByteBuffer name : columns)
@@ -153,7 +160,7 @@ public class SSTableNamesIterator extend
 
         /* get the various column ranges we have to read */
         AbstractType comparator = metadata.comparator;
-        SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator));
+        SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator, false));
         for (ByteBuffer name : filteredColumnNames)
         {
             int index = IndexHelper.indexFor(name, indexList, comparator, false);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Feb 23 19:32:42 2011
@@ -79,20 +79,16 @@ import org.apache.cassandra.utils.Wrappe
 public class CommitLog
 {
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
-    private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
-
+    
     static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
 
     public static final CommitLog instance = new CommitLog();
 
     private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
 
-    public static void setSegmentSize(int size)
-    {
-        SEGMENT_SIZE = size;
-    }
-
     private final ICommitLogExecutorService executor;
+    
+    private volatile int segmentSize = 128*1024*1024; // roll after log gets this big
 
     /**
      * param @ table - name of table for which we are maintaining
@@ -105,6 +101,7 @@ public class CommitLog
         try
         {
             DatabaseDescriptor.createAllDirectories();
+            segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
         }
         catch (IOException e)
         {
@@ -479,7 +476,7 @@ public class CommitLog
             {
                 currentSegment().write(rowMutation);
                 // roll log if necessary
-                if (currentSegment().length() >= SEGMENT_SIZE)
+                if (currentSegment().length() >= segmentSize)
                 {
                     sync();
                     segments.add(new CommitLogSegment());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Wed Feb 23 19:32:42 2011
@@ -210,4 +210,13 @@ public class QueryFilter
     {
         return new QueryFilter(key, path, new NamesQueryFilter(column));
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(key=" + key +
+               ", path=" + path +
+               (filter == null ? "" : ", filter=" + filter) +
+               (superFilter == null ? "" : ", superFilter=" + superFilter) +
+               ")";
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java Wed Feb 23 19:32:42 2011
@@ -71,7 +71,7 @@ public class QueryPath
     @Override
     public String toString()
     {
-        return "QueryPath(" +
+        return getClass().getSimpleName() + "(" +
                "columnFamilyName='" + columnFamilyName + '\'' +
                ", superColumnName='" + superColumnName + '\'' +
                ", columnName='" + columnName + '\'' +

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Wed Feb 23 19:32:42 2011
@@ -143,4 +143,13 @@ public class SliceQueryFilter implements
                 container.addColumn(column);
         }
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(" +
+               "start=" + start +
+               ", finish=" + finish +
+               ", reversed=" + reversed +
+               ", count=" + count + "]";
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -1,13 +1,12 @@
 package org.apache.cassandra.db.migration;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -52,7 +51,13 @@ public class AddColumnFamily extends Mig
                                                            cfm.tableName));
         else if (!Migration.isLegalName(cfm.cfName))
             throw new ConfigurationException("Invalid column family name: " + cfm.cfName);
-        
+        for (Map.Entry<ByteBuffer, ColumnDefinition> entry : cfm.getColumn_metadata().entrySet())
+        {
+            String indexName = entry.getValue().getIndexName();
+            if (indexName != null && !Migration.isLegalName(indexName))
+                throw new ConfigurationException("Invalid index name: " + indexName);
+        }
+
         // clone ksm but include the new cf def.
         KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
         

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Wed Feb 23 19:32:42 2011
@@ -83,13 +83,16 @@ public abstract class Migration
     protected transient boolean clientMode;
     
     /** Subclasses must have a matching constructor */
-    protected Migration() { /* pass */ }
+    protected Migration() 
+    {
+        clientMode = StorageService.instance.isClientMode();
+    }
 
     Migration(UUID newVersion, UUID lastVersion)
     {
+        this();
         this.newVersion = newVersion;
         this.lastVersion = lastVersion;
-        clientMode = StorageService.instance.isClientMode();
     }
     
     // block compactions and flushing.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -1,13 +1,13 @@
 package org.apache.cassandra.db.migration;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.migration.avro.ColumnDef;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -45,7 +45,15 @@ public class UpdateColumnFamily extends 
         KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cf_def.keyspace.toString());
         if (ksm == null)
             throw new ConfigurationException("No such keyspace: " + cf_def.keyspace.toString());
-        
+        if (cf_def.column_metadata != null)
+        {
+            for (ColumnDef entry : cf_def.column_metadata)
+            {
+                if (entry.index_name != null && !Migration.isLegalName((String) entry.index_name))
+                    throw new ConfigurationException("Invalid index name: " + entry.index_name);
+            }
+        }
+
         CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace.toString(), cf_def.name.toString()));
         
         // create a copy of the old CF meta data. Apply new settings on top of it.

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Feb 23 19:32:42 2011
@@ -861,7 +861,6 @@ public class Gossiper implements IFailur
 
     public void addLocalApplicationState(ApplicationState state, VersionedValue value)
     {
-        assert !StorageService.instance.isClientMode();
         EndpointState epState = endpointStateMap.get(FBUtilities.getLocalAddress());
         assert epState != null;
         epState.addApplicationState(state, value);

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Wed Feb 23 19:32:42 2011
@@ -59,7 +59,7 @@ public class ColumnFamilySplit extends I
     public long getLength()
     {
         // only used for sorting splits. we don't have the capability, yet.
-        return 0;
+        return Long.MAX_VALUE;
     }
 
     public String[] getLocations()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Feb 23 19:32:42 2011
@@ -134,9 +134,9 @@ implements Closeable, ICompactionInfo
         {
             logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
                                       ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
-            return new LazilyCompactedRow(cfs, rows, major, gcBefore);
+            return new LazilyCompactedRow(cfs, rows, major, gcBefore, false);
         }
-        return new PrecompactedRow(cfs, rows, major, gcBefore);
+        return new PrecompactedRow(cfs, rows, major, gcBefore, false);
     }
 
     public void close() throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Feb 23 19:32:42 2011
@@ -59,15 +59,17 @@ public class LazilyCompactedRow extends 
     private final boolean shouldPurge;
     private final int gcBefore;
     private final DataOutputBuffer headerBuffer;
+    private final boolean forceDeserialize;
     private ColumnFamily emptyColumnFamily;
     private LazyColumnIterator iter;
     private int columnCount;
     private long columnSerializedSize;
 
-    public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+    public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
     {
         super(rows.get(0).getKey());
         this.gcBefore = gcBefore;
+        this.forceDeserialize = forceDeserialize;
         this.rows = new ArrayList<SSTableIdentityIterator>(rows);
 
         Set<SSTable> sstables = new HashSet<SSTable>();
@@ -94,7 +96,7 @@ public class LazilyCompactedRow extends 
 
     public void write(DataOutput out) throws IOException
     {
-        if (rows.size() == 1 && !shouldPurge)
+        if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion && !forceDeserialize)
         {
             SSTableIdentityIterator row = rows.get(0);
             out.writeLong(row.dataSize);



Mime
View raw message