cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1043262 - in /cassandra/trunk: ./ conf/ contrib/cassandra_browser/ contrib/maven/ contrib/mutex/ contrib/property_snitch/ debian/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/cli/ src/java/org/apache...
Date Wed, 08 Dec 2010 01:08:33 GMT
Author: jbellis
Date: Wed Dec  8 01:08:32 2010
New Revision: 1043262

URL: http://svn.apache.org/viewvc?rev=1043262&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
      - copied unchanged from r1043200, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Removed:
    cassandra/trunk/contrib/cassandra_browser/
    cassandra/trunk/contrib/maven/
    cassandra/trunk/contrib/mutex/
    cassandra/trunk/contrib/property_snitch/
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/build.xml
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/debian/changelog
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyType.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1041242
-/cassandra/branches/cassandra-0.7:1026517-1041243
+/cassandra/branches/cassandra-0.7:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Dec  8 01:08:32 2010
@@ -1,4 +1,13 @@
-dev
+0.8-dev
+ * avoid double RowMutation serialization on write path (CASSANDRA-1800)
+
+
+0.7-dev
+ * expose getNaturalEndpoints in StorageServiceMBean taking byte[]
+   key; RMI cannot serialize ByteBuffer (CASSANDRA-1833)
+
+
+0.7.0-rc2
  * fix live-column-count of slice ranges including tombstoned supercolumn 
    with live subcolumn (CASSANDRA-1591)
  * rename o.a.c.internal.AntientropyStage -> AntiEntropyStage,
@@ -23,7 +32,17 @@ dev
  * close file handle used for post-flush truncate (CASSANDRA-1790)
  * various code cleanup (CASSANDRA-1793, -1794, -1795)
  * fix range queries against wrapped range (CASSANDRA-1781)
- * avoid double RowMutation serialization on write path (CASSANDRA-1800)
+ * fix consistencylevel calculations for NetworkTopologyStrategy
+   (CASSANDRA-1804)
+ * cli support index type enum names (CASSANDRA-1810)
+ * improved validation of column_metadata (CASSANDRA-1813)
+ * reads at ConsistencyLevel > 1 throw UnavailableException
+   immediately if insufficient live nodes exist (CASSANDRA-1803)
+ * copy bytebuffers for local writes to avoid retaining the entire
+   Thrift frame (CASSANDRA-1801)
+ * fix NPE adding index to column w/o prior metadata (CASSANDRA-1764)
+ * reduce fat client timeout (CASSANDRA-1730)
+ * fix botched merge of CASSANDRA-1316
 
 
 0.7.0-rc1

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Dec  8 01:08:32 2010
@@ -47,7 +47,7 @@
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
-    <property name="base.version" value="0.7.0-beta3"/>
+    <property name="base.version" value="0.7.0-rc2"/>
     <condition property="version" value="${base.version}">
       <isset property="release"/>
     </condition>

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Wed Dec  8 01:08:32 2010
@@ -78,7 +78,7 @@ if [ "`uname`" = "Linux" ] ; then
     JVM_OPTS="$JVM_OPTS -Xss128k"
 fi
 
-# GC tuning options.
+# GC tuning options
 JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" 
 JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" 
 JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" 
@@ -87,6 +87,14 @@ JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThres
 JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
 JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
 
+# GC logging options -- uncomment to enable
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
+# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
+# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log"
+
 # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See 
 # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
 # comment out this entry to enable IPv6 support).

Modified: cassandra/trunk/debian/changelog
URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/changelog?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/debian/changelog (original)
+++ cassandra/trunk/debian/changelog Wed Dec  8 01:08:32 2010
@@ -1,3 +1,9 @@
+cassandra (0.7.0~rc2) unstable; urgency=low
+
+  * Release candidate release.
+
+ -- Eric Evans <eevans@apache.org>  Mon, 06 Dec 2010 11:19:40 -0600
+
 cassandra (0.7.0~rc1) unstable; urgency=low
 
   * Release candidate release.

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1041242
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1041243
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1041242
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1041243
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1041242
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1041243
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1041242
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1041243
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  8 01:08:32 2010
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1041242
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1041243
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1043201
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Wed Dec  8 01:08:32 2010
@@ -468,7 +468,7 @@ public class CliClient extends CliUserHe
             return;
 
         IndexClause clause = new IndexClause();
-        String columnFamily = statement.getChild(0).getText();
+        String columnFamily = CliCompiler.getColumnFamily(statement, keyspacesMap.get(keySpace).cf_defs);
         // ^(CONDITIONS ^(CONDITION $column $value) ...)
         Tree conditions = statement.getChild(1);
         
@@ -1414,20 +1414,28 @@ public class CliClient extends CliUserHe
      */
     private IndexType getIndexTypeFromString(String indexTypeAsString)
     {
-        Integer indexTypeId;
         IndexType indexType;
 
-        try {
-            indexTypeId = new Integer(indexTypeAsString);
+        try
+        {
+            indexType = IndexType.findByValue(new Integer(indexTypeAsString));
         }
-        catch (NumberFormatException e) {
-            throw new RuntimeException("Could not convert " + indexTypeAsString + " into Integer.");
+        catch (NumberFormatException e)
+        {
+            try
+            {
+                // if this is not an integer lets try to get IndexType by name
+                indexType = IndexType.valueOf(indexTypeAsString);
+            }
+            catch (IllegalArgumentException ie)
+            {
+                throw new RuntimeException("IndexType '" + indexTypeAsString + "' is unsupported.");
+            }
         }
 
-        indexType = IndexType.findByValue(indexTypeId);
-
-        if (indexType == null) {
-            throw new RuntimeException(indexTypeAsString + " is unsupported.");
+        if (indexType == null)
+        {
+            throw new RuntimeException("IndexType '" + indexTypeAsString + "' is unsupported.");
         }
 
         return indexType;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Dec  8 01:08:32 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.T
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang.builder.EqualsBuilder;
@@ -751,7 +752,7 @@ public final class CFMetaData
             org.apache.cassandra.avro.ColumnDef tcd = new org.apache.cassandra.avro.ColumnDef();
             tcd.index_name = cd.getIndexName();
             tcd.index_type = org.apache.cassandra.avro.IndexType.valueOf(cd.getIndexType().name());
-            tcd.name = cd.name;
+            tcd.name = ByteBufferUtil.clone(cd.name);
             tcd.validation_class = cd.validator.getClass().getName();
             column_meta.add(tcd);
         }
@@ -786,7 +787,7 @@ public final class CFMetaData
         for (org.apache.cassandra.thrift.ColumnDef cdef : def.getColumn_metadata())
         {
             org.apache.cassandra.avro.ColumnDef tdef = new org.apache.cassandra.avro.ColumnDef();
-            tdef.name = cdef.BufferForName();
+            tdef.name = ByteBufferUtil.clone(cdef.BufferForName());
             tdef.validation_class = cdef.getValidation_class();
             tdef.index_name = cdef.getIndex_name();
             tdef.index_type = cdef.getIndex_type() == null ? null : org.apache.cassandra.avro.IndexType.valueOf(cdef.getIndex_type().name());

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java Wed Dec  8 01:08:32 2010
@@ -31,6 +31,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class ColumnDefinition {
@@ -103,8 +104,7 @@ public class ColumnDefinition {
 
     public static ColumnDefinition fromColumnDef(ColumnDef thriftColumnDef) throws ConfigurationException
     {
-        validateIndexType(thriftColumnDef);
-        return new ColumnDefinition(thriftColumnDef.name, thriftColumnDef.validation_class, thriftColumnDef.index_type, thriftColumnDef.index_name);
+        return new ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name), thriftColumnDef.validation_class, thriftColumnDef.index_type, thriftColumnDef.index_name);
     }
     
     public static ColumnDefinition fromColumnDef(org.apache.cassandra.avro.ColumnDef avroColumnDef) throws ConfigurationException
@@ -123,10 +123,7 @@ public class ColumnDefinition {
 
         Map<ByteBuffer, ColumnDefinition> cds = new TreeMap<ByteBuffer, ColumnDefinition>();
         for (ColumnDef thriftColumnDef : thriftDefs)
-        {
-            validateIndexType(thriftColumnDef);
-            cds.put(thriftColumnDef.name, fromColumnDef(thriftColumnDef));
-        }
+            cds.put(ByteBufferUtil.clone(thriftColumnDef.name), fromColumnDef(thriftColumnDef));
 
         return Collections.unmodifiableMap(cds);
     }
@@ -146,12 +143,6 @@ public class ColumnDefinition {
         return Collections.unmodifiableMap(cds);
     }
 
-    public static void validateIndexType(org.apache.cassandra.thrift.ColumnDef thriftColumnDef) throws ConfigurationException
-    {
-        if ((thriftColumnDef.index_name != null) && (thriftColumnDef.index_type == null))
-            throw new ConfigurationException("index_name cannot be set if index_type is not also set");
-    }
-
     public static void validateIndexType(org.apache.cassandra.avro.ColumnDef avroColumnDef) throws ConfigurationException
     {
         if ((avroColumnDef.index_name != null) && (avroColumnDef.index_type == null))

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Dec  8 01:08:32 2010
@@ -816,11 +816,6 @@ public class    DatabaseDescriptor
         return conf.rpc_port;
     }
 
-    public static int getReplicationFactor(String table)
-    {
-        return tables.get(table).replicationFactor;
-    }
-
     public static long getRpcTimeout()
     {
         return conf.rpc_timeout_in_ms;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Dec  8 01:08:32 2010
@@ -210,6 +210,12 @@ public class Column implements IColumn
         return result;
     }
 
+    @Override
+    public IColumn deepCopy()
+    {
+        return new Column(ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp);
+    }
+    
     public String getString(AbstractType comparator)
     {
         StringBuilder sb = new StringBuilder();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyType.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyType.java Wed Dec  8 01:08:32 2010
@@ -25,11 +25,12 @@ public enum ColumnFamilyType
     Standard,
     Super;
 
-    public final static ColumnFamilyType create(String name)
+    public static ColumnFamilyType create(String name)
     {
         try
         {
-            return name == null ? null : ColumnFamilyType.valueOf(name);
+            // TODO thrift optional parameter in CfDef is leaking down here which it shouldn't
+            return name == null ? ColumnFamilyType.Standard : ColumnFamilyType.valueOf(name);
         }
         catch (IllegalArgumentException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java Wed Dec  8 01:08:32 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,4 +56,10 @@ public class DeletedColumn extends Colum
     {
        return value.getInt(value.position()+value.arrayOffset()	);
     }
+    
+    @Override
+    public IColumn deepCopy()
+    {
+        return new DeletedColumn(ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Wed Dec  8 01:08:32 2010
@@ -24,6 +24,7 @@ import java.security.MessageDigest;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -105,6 +106,12 @@ public class ExpiringColumn extends Colu
     }
 
     @Override
+    public IColumn deepCopy()
+    {
+        return new ExpiringColumn(ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+    }
+    
+    @Override
     public String getString(AbstractType comparator)
     {
         StringBuilder sb = new StringBuilder();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Dec  8 01:08:32 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
-            MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
+            MessagingService.instance.sendRR(message, Arrays.asList(endpoint), responseHandler);
             try
             {
                 responseHandler.get();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Wed Dec  8 01:08:32 2010
@@ -46,6 +46,9 @@ public interface IColumn
     public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
     public String getString(AbstractType comparator);
 
+    /** clones the column, making copies of any underlying byte buffers */
+    IColumn deepCopy();
+
     /**
      * For a simple column, live == !isMarkedForDelete.
      * For a supercolumn, live means it has at least one subcolumn whose timestamp is greater than the

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Dec  8 01:08:32 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.Deletion;
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.StringUtils;
 
@@ -317,6 +318,21 @@ public class RowMutation
         rm.preserializedBuffer = raw;
         return rm;
     }
+
+    public RowMutation deepCopy()
+    {
+        RowMutation rm = new RowMutation(table_, ByteBufferUtil.clone(key_));
+
+        for (Map.Entry<Integer, ColumnFamily> e : modifications_.entrySet())
+        {
+            ColumnFamily cf = e.getValue().cloneMeShallow();
+            for (Map.Entry<ByteBuffer, IColumn> ce : e.getValue().getColumnsMap().entrySet())
+                cf.addColumn(ce.getValue().deepCopy());
+            rm.modifications_.put(e.getKey(), cf);
+        }
+
+        return rm;
+    }
 }
 
 class RowMutationSerializer implements ICompactSerializer<RowMutation>

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Wed Dec  8 01:08:32 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -286,6 +288,20 @@ public class SuperColumn implements ICol
         this.localDeletionTime.set(localDeleteTime);
         this.markedForDeleteAt.set(timestamp);
     }
+    
+    public IColumn deepCopy()
+    {
+        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
+        sc.localDeletionTime = localDeletionTime;
+        sc.markedForDeleteAt = markedForDeleteAt;
+        
+        for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet())
+        {
+            sc.addColumn(c.getValue().deepCopy());
+        }
+        
+        return sc;
+    }
 
     public IColumn reconcile(IColumn c)
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Dec  8 01:08:32 2010
@@ -125,10 +125,9 @@ public abstract class AbstractReplicatio
         return WriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistencyLevel, table);
     }
 
-    // instance method so test subclasses can override it
-    int getReplicationFactor()
+    public int getReplicationFactor()
     {
-       return DatabaseDescriptor.getReplicationFactor(table);
+        return DatabaseDescriptor.getTableDefinition(table).replicationFactor;
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Dec  8 01:08:32 2010
@@ -29,9 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -226,7 +224,7 @@ public class MessagingService implements
      * @return an reference to an IAsyncResult which can be queried for the
      * response
      */
-    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
+    public String sendRR(Message message, Collection<InetAddress> to, IAsyncCallback cb)
     {
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
@@ -273,18 +271,16 @@ public class MessagingService implements
      *           suggest that a timeout occured to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
+    public String sendRR(Message[] messages, List<InetAddress> to, IAsyncCallback cb)
     {
-        if ( messages.length != to.length )
-        {
+        if (messages.length != to.size())
             throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
-        }
         String groupId = GuidGenerator.guid();
         addCallback(cb, groupId);
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            sendOneWay(messages[i], to[i]);
+            sendOneWay(messages[i], to.get(i));
         }
         return groupId;
     } 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Wed Dec  8 01:08:32 2010
@@ -156,7 +156,6 @@ class ConsistencyChecker implements Runn
 
 	static class DataRepairHandler implements IAsyncCallback
 	{
-		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
 		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
@@ -167,7 +166,6 @@ class ConsistencyChecker implements Runn
             // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
             Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
-            responses_.add(fakeMessage);
             readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
         }
 
@@ -176,15 +174,14 @@ class ConsistencyChecker implements Runn
 		{
 			if (logger_.isDebugEnabled())
 			  logger_.debug("Received response in DataRepairHandler : " + message.toString());
-			responses_.add(message);
             readResponseResolver_.preprocess(message);
-            if (responses_.size() == majority_)
+            if (readResponseResolver_.getMessageCount() == majority_)
             {
                 Runnable runnable = new WrappedRunnable()
                 {
                     public void runMayThrow() throws IOException, DigestMismatchException
                     {
-                        readResponseResolver_.resolve(responses_);
+                        readResponseResolver_.resolve();
                     }
                 };
                 // give remaining replicas until timeout to reply and get added to responses_

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Wed Dec  8 01:08:32 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.service;
  */
 
 
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -29,6 +32,7 @@ import org.apache.cassandra.locator.IEnd
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -49,14 +53,14 @@ public class DatacenterQuorumResponseHan
     @Override
     public void response(Message message)
     {
-        responses.add(message); // we'll go ahead and resolve a reply from anyone, even if it's not from this dc
+        resolver.preprocess(message);
 
         int n;
         n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
                 ? localResponses.decrementAndGet()
                 : localResponses.get();
 
-        if (n == 0 && responseResolver.isDataPresent(responses))
+        if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
@@ -68,4 +72,18 @@ public class DatacenterQuorumResponseHan
         NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
 		return (stategy.getReplicationFactor(localdc) / 2) + 1;
 	}
+
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    {
+        int localEndpoints = 0;
+        for (InetAddress endpoint : endpoints)
+        {
+            if (localdc.equals(snitch.getDatacenter(endpoint)))
+                localEndpoints++;
+        }
+        
+        if(localEndpoints < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Dec  8 01:08:32 2010
@@ -34,8 +34,10 @@ public interface IResponseResolver<T> {
 	 * repairs . Hence you need to derive a response resolver based on your
 	 * needs from this interface.
 	 */
-	public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
-	public boolean isDataPresent(Collection<Message> responses);
+	public T resolve() throws DigestMismatchException, IOException;
+	public boolean isDataPresent();
 
     public void preprocess(Message message);
+    public Iterable<Message> getMessages();
+    public int getMessageCount();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Dec  8 01:08:32 2010
@@ -18,19 +18,20 @@
 
 package org.apache.cassandra.service;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,19 +39,20 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
-    protected IResponseResolver<T> responseResolver;
+    protected final IResponseResolver<T> resolver;
     private final long startTime;
-    protected int blockfor;
+    protected final int blockfor;
     
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public QuorumResponseHandler(IResponseResolver<T> responseResolver, ConsistencyLevel consistencyLevel, String table)
+    public QuorumResponseHandler(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
     {
         this.blockfor = determineBlockFor(consistencyLevel, table);
-        this.responseResolver = responseResolver;
+        this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
+
+        logger.debug("QuorumResponseHandler blocking for {} responses", blockfor);
     }
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
@@ -71,35 +73,31 @@ public class QuorumResponseHandler<T> im
             if (!success)
             {
                 StringBuilder sb = new StringBuilder("");
-                for (Message message : responses)
+                for (Message message : resolver.getMessages())
                 {
                     sb.append(message.getFrom());
                 }
-                throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " .");
+                throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
             }
         }
         finally
         {
-            for (Message response : responses)
+            for (Message response : resolver.getMessages())
             {
                 MessagingService.removeRegisteredCallback(response.getMessageId());
             }
         }
 
-        return responseResolver.resolve(responses);
+        return resolver.resolve();
     }
     
     public void response(Message message)
     {
-        responses.add(message);
-        responseResolver.preprocess(message);
-        if (responses.size() < blockfor) {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() < blockfor)
             return;
-        }
-        if (responseResolver.isDataPresent(responses))
-        {
+        if (resolver.isDataPresent())
             condition.signal();
-        }
     }
     
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
@@ -110,11 +108,17 @@ public class QuorumResponseHandler<T> im
             case ANY:
                 return 1;
             case QUORUM:
-                return (DatabaseDescriptor.getReplicationFactor(table) / 2) + 1;
+                return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
             case ALL:
-                return DatabaseDescriptor.getReplicationFactor(table);
+                return Table.open(table).getReplicationStrategy().getReplicationFactor();
             default:
-                throw new UnsupportedOperationException("invalid consistency level: " + table.toString());
+                throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel);
         }
     }
+
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    {
+        if (endpoints.size() < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Dec  8 01:08:32 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class RangeSliceResponseResolver 
     private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
     private final String table;
     private final List<InetAddress> sources;
+    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
@@ -53,7 +55,7 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
-    public List<Row> resolve(Collection<Message> responses) throws DigestMismatchException, IOException
+    public List<Row> resolve() throws DigestMismatchException, IOException
     {
         CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
         {
@@ -110,11 +112,12 @@ public class RangeSliceResponseResolver 
 
     public void preprocess(Message message)
     {
+        responses.add(message);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
     {
-        return responses.size() >= sources.size();
+        return !responses.isEmpty();
     }
 
     private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>>
@@ -134,4 +137,14 @@ public class RangeSliceResponseResolver 
             return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), source) : endOfData();
         }
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return responses;
+    }
+
+    public int getMessageCount()
+    {
+        return responses.size();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Dec  8 01:08:32 2010
@@ -58,14 +58,14 @@ public class ReadResponseResolver implem
       * repair request should be scheduled.
       *
       */
-	public Row resolve(Collection<Message> responses) throws DigestMismatchException, IOException
+	public Row resolve() throws DigestMismatchException, IOException
     {
         if (logger_.isDebugEnabled())
-            logger_.debug("resolving " + responses.size() + " responses");
+            logger_.debug("resolving " + results.size() + " responses");
 
         long startTime = System.currentTimeMillis();
-		List<ColumnFamily> versions = new ArrayList<ColumnFamily>(responses.size());
-		List<InetAddress> endpoints = new ArrayList<InetAddress>(responses.size());
+		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+		List<InetAddress> endpoints = new ArrayList<InetAddress>();
 		DecoratedKey key = null;
 		ByteBuffer digest = FBUtilities.EMPTY_BYTE_BUFFER;
 		boolean isDigestQuery = false;
@@ -76,11 +76,10 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-		for (Message message : responses)
-		{
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived after quorum already achieved
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            Message message = entry.getKey();
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -187,6 +186,8 @@ public class ReadResponseResolver implem
         try
         {
             ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            if (logger_.isDebugEnabled())
+                logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
             results.put(message, result);
         }
         catch (IOException e)
@@ -201,16 +202,23 @@ public class ReadResponseResolver implem
         results.put(message, result);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
 	{
-        for (Message message : responses)
+        for (ReadResponse result : results.values())
         {
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived concurrently
             if (!result.isDigestQuery())
                 return true;
         }
         return false;
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return results.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return results.size();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Dec  8 01:08:32 2010
@@ -201,7 +201,7 @@ public class StorageProxy implements Sto
         {
             public void runMayThrow() throws IOException
             {
-                rm.apply();
+                rm.deepCopy().apply();
                 responseHandler.response(null);
             }
         };
@@ -315,7 +315,7 @@ public class StorageProxy implements Sto
     private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-        List<InetAddress[]> commandEndpoints = new ArrayList<InetAddress[]>();
+        List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
 
         // send out read requests
@@ -328,25 +328,25 @@ public class StorageProxy implements Sto
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
             InetAddress dataPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
-            List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
 
-            InetAddress[] endpoints = new InetAddress[endpointList.size()];
-            Message messages[] = new Message[endpointList.size()];
+            AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
+            QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level);
+            handler.assureSufficientLiveNodes(endpoints);
+
+            Message messages[] = new Message[endpoints.size()];
             // data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             int n = 0;
-            for (InetAddress endpoint : endpointList)
+            for (InetAddress endpoint : endpoints)
             {
                 Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
-                endpoints[n] = endpoint;
                 messages[n++] = m;
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-            QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level);
-            MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler);
-            quorumResponseHandlers.add(quorumResponseHandler);
+            MessagingService.instance.sendRR(messages, endpoints, handler);
+            quorumResponseHandlers.add(handler);
             commandEndpoints.add(endpoints);
         }
 
@@ -370,14 +370,14 @@ public class StorageProxy implements Sto
             catch (DigestMismatchException ex)
             {
                 AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-                QuorumResponseHandler<Row> qrhRepair = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), ConsistencyLevel.QUORUM);
+                QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
                 Message messageRepair = command.makeReadMessage();
-                MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i), qrhRepair);
+                MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i), handler);
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-                repairResponseHandlers.add(qrhRepair);
+                repairResponseHandlers.add(handler);
             }
         }
 
@@ -499,7 +499,7 @@ public class StorageProxy implements Sto
         final Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
         // an empty message acts as a request to the SchemaCheckVerbHandler.
-        MessagingService.instance.sendRR(msg, liveHosts.toArray(new InetAddress[]{}), new IAsyncCallback() 
+        MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback()
         {
             public void response(Message msg)
             {
@@ -774,7 +774,7 @@ public class StorageProxy implements Sto
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
         Message message = truncation.makeTruncationMessage();
-        MessagingService.instance.sendRR(message, allEndpoints.toArray(new InetAddress[]{}), responseHandler);
+        MessagingService.instance.sendRR(message, allEndpoints, responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec  8 01:08:32 2010
@@ -1738,7 +1738,7 @@ public class StorageService implements I
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
             // if the replication factor is 1 the data is lost so we shouldn't wait for confirmation
-            if (DatabaseDescriptor.getReplicationFactor(table) == 1)
+            if (Table.open(table).getReplicationStrategy().getReplicationFactor() == 1)
                 continue;
 
             // get all ranges that change ownership (that is, a node needs

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Wed Dec  8 01:08:32 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.Atomi
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
@@ -93,9 +94,9 @@ public class WriteResponseHandler extend
         }
         // at most one node per range can bootstrap at a time, and these will be added to the write until
         // bootstrap finishes (at which point we no longer need to write to the old ones).
-        assert 1 <= blockFor && blockFor <= 2 * DatabaseDescriptor.getReplicationFactor(table)
+        assert 1 <= blockFor && blockFor <= 2 * Table.open(table).getReplicationStrategy().getReplicationFactor()
             : String.format("invalid response count %d for replication factor %d",
-                            blockFor, DatabaseDescriptor.getReplicationFactor(table));
+                            blockFor, Table.open(table).getReplicationStrategy().getReplicationFactor());
         return blockFor;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Dec  8 01:08:32 2010
@@ -138,6 +138,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e) 
         {
+            logger.debug("... timed out");
         	throw new TimedOutException();
         }
         catch (IOException e)
@@ -260,8 +261,7 @@ public class CassandraServer implements 
     public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("get_slice");
+        logger.debug("get_slice");
         
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
         return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
@@ -270,8 +270,7 @@ public class CassandraServer implements 
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("multiget_slice");
+        logger.debug("multiget_slice");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
 
@@ -309,8 +308,7 @@ public class CassandraServer implements 
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("get");
+        logger.debug("get");
         
         state().hasColumnFamilyAccess(column_path.column_family, Permission.READ);
         String keyspace = state().getKeyspace();
@@ -338,8 +336,7 @@ public class CassandraServer implements 
     public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("get_count");
+        logger.debug("get_count");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
 
@@ -349,8 +346,7 @@ public class CassandraServer implements 
     public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("multiget_count");
+        logger.debug("multiget_count");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
         String keyspace = state().getKeyspace();
@@ -367,8 +363,7 @@ public class CassandraServer implements 
     public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("insert");
+        logger.debug("insert");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
 
@@ -391,8 +386,7 @@ public class CassandraServer implements 
     public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("batch_mutate");
+        logger.debug("batch_mutate");
         
         List<String> cfamsSeen = new ArrayList<String>();
 
@@ -428,8 +422,7 @@ public class CassandraServer implements 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("remove");
+        logger.debug("remove");
 
         state().hasColumnFamilyAccess(column_path.column_family, Permission.WRITE);
 
@@ -450,11 +443,12 @@ public class CassandraServer implements 
 
             try
             {
-              StorageProxy.mutate(mutations, consistency_level);
+                StorageProxy.mutate(mutations, consistency_level);
             }
             catch (TimeoutException e)
             {
-              throw new TimedOutException();
+                logger.debug("... timed out");
+                throw new TimedOutException();
             }
         }
         finally
@@ -482,8 +476,7 @@ public class CassandraServer implements 
     public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TException, TimedOutException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("range_slice");
+        logger.debug("range_slice");
 
         String keyspace = state().getKeyspace();
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
@@ -521,6 +514,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
         	throw new TimedOutException();
         }
         catch (IOException e)
@@ -546,8 +540,7 @@ public class CassandraServer implements 
 
     public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("scan");
+        logger.debug("scan");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
         String keyspace = state().getKeyspace();
@@ -566,6 +559,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
             throw new TimedOutException();
         }
         return thriftifyKeySlices(rows, column_parent, column_predicate);
@@ -703,6 +697,7 @@ public class CassandraServer implements 
 
     public String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException
     {
+        logger.debug("add_column_family");
         state().hasColumnFamilyListAccess(Permission.WRITE);
         ThriftValidation.validateCfDef(cf_def);
         try
@@ -726,6 +721,7 @@ public class CassandraServer implements 
 
     public String system_drop_column_family(String column_family) throws InvalidRequestException, TException
     {
+        logger.debug("drop_column_family");
         state().hasColumnFamilyListAccess(Permission.WRITE);
         
         try
@@ -749,6 +745,7 @@ public class CassandraServer implements 
 
     public String system_add_keyspace(KsDef ks_def) throws InvalidRequestException, TException
     {
+        logger.debug("add_keyspace");
         state().hasKeyspaceListAccess(Permission.WRITE);
         
         // generate a meaningful error if the user setup keyspace and/or column definition incorrectly
@@ -765,6 +762,7 @@ public class CassandraServer implements 
             Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
             for (CfDef cfDef : ks_def.cf_defs)
             {
+                ThriftValidation.validateCfDef(cfDef);
                 cfDefs.add(convertToCFMetaData(cfDef));
             }
 
@@ -792,6 +790,7 @@ public class CassandraServer implements 
     
     public String system_drop_keyspace(String keyspace) throws InvalidRequestException, TException
     {
+        logger.debug("drop_keyspace");
         state().hasKeyspaceListAccess(Permission.WRITE);
         
         try
@@ -816,6 +815,7 @@ public class CassandraServer implements 
     /** update an existing keyspace, but do not allow column family modifications. */
     public String system_update_keyspace(KsDef ks_def) throws InvalidRequestException, TException
     {
+        logger.debug("update_keyspace");
         state().hasKeyspaceListAccess(Permission.WRITE);
 
         ThriftValidation.validateTable(ks_def.name);
@@ -848,6 +848,7 @@ public class CassandraServer implements 
 
     public String system_update_column_family(CfDef cf_def) throws InvalidRequestException, TException
     {
+        logger.debug("update_column_family");
         state().hasColumnFamilyListAccess(Permission.WRITE);
         
         if (cf_def.keyspace == null || cf_def.name == null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Wed Dec  8 01:08:32 2010
@@ -383,14 +383,20 @@ public class ThriftValidation
     {
         try
         {
+            ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
+            if (cfType == null)
+                throw new InvalidRequestException("invalid column type " + cf_def.column_type);
+
             DatabaseDescriptor.getComparator(cf_def.comparator_type);
             DatabaseDescriptor.getComparator(cf_def.subcomparator_type);
             DatabaseDescriptor.getComparator(cf_def.default_validation_class);
+            if (cfType != ColumnFamilyType.Super && cf_def.subcomparator_type != null)
+                throw new InvalidRequestException("subcomparator_type is invalid for standard columns");
 
             if (cf_def.column_metadata == null)
                 return;
 
-            AbstractType comparator = cf_def.subcomparator_type == null
+            AbstractType comparator = cfType == ColumnFamilyType.Standard
                                     ? DatabaseDescriptor.getComparator(cf_def.comparator_type)
                                     : DatabaseDescriptor.getComparator(cf_def.subcomparator_type);
             for (ColumnDef c : cf_def.column_metadata)
@@ -406,6 +412,12 @@ public class ThriftValidation
                     throw new InvalidRequestException(String.format("Column name %s is not valid for comparator %s",
                                                                     FBUtilities.bytesToHex(c.name), cf_def.comparator_type));
                 }
+
+                if ((c.index_name != null) && (c.index_type == null))
+                    throw new ConfigurationException("index_name cannot be set without index_type");
+
+                if (cfType == ColumnFamilyType.Super && c.index_type != null)
+                    throw new InvalidRequestException("Secondary indexes are not supported on supercolumns");
             }
         }
         catch (ConfigurationException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Wed Dec  8 01:08:32 2010
@@ -60,8 +60,8 @@ import java.nio.charset.Charset;
  * }
  *
  */
-public class ByteBufferUtil {
-
+public class ByteBufferUtil
+{
     public static int compareUnsigned(ByteBuffer o1, ByteBuffer o2)
     {
         return FBUtilities.compareUnsigned(o1.array(), o2.array(), o1.arrayOffset()+o1.position(), o2.arrayOffset()+o2.position(), o1.limit()+o1.arrayOffset(), o2.limit()+o2.arrayOffset());
@@ -98,4 +98,14 @@ public class ByteBufferUtil {
            throw new RuntimeException(e);
         } 
     }
+    
+    public static ByteBuffer clone(ByteBuffer o)
+    {
+        ByteBuffer clone = ByteBuffer.allocate(o.remaining());
+        o.mark();
+        clone.put(o);
+        o.reset();
+        clone.flip();
+        return clone;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Wed Dec  8 01:08:32 2010
@@ -36,11 +36,13 @@ public class CliTest extends CleanupHelp
     // please add new statements here so they could be auto-runned by this test.
     private String[] statements = {
         "use TestKeySpace;",
-        "create column family CF1 with comparator=UTF8Type and column_metadata=[{ column_name:world, validation_class:IntegerType, index_type:0, index_name:IdxName }, { column_name:world2, validation_class:LongType, index_type:0, index_name:LongIdxName}];",
+        "create column family CF1 with comparator=UTF8Type and column_metadata=[{ column_name:world, validation_class:IntegerType, index_type:0, index_name:IdxName }, { column_name:world2, validation_class:LongType, index_type:KEYS, index_name:LongIdxName}];",
         "set CF1[hello][world] = 123848374878933948398384;",
         "get CF1[hello][world];",
         "set CF1[hello][world2] = 15;",
         "get CF1 where world2 = long(15);",
+        "get cF1 where world2 = long(15);",
+        "get Cf1 where world2 = long(15);",
         "set CF1['hello'][time_spent_uuid] = timeuuid(a8098c1a-f86e-11da-bd1a-00112444be1e);",
         "create column family CF2 with comparator=IntegerType;",
         "set CF2['key'][98349387493847748398334] = 'some text';",

Modified: cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed Dec  8 01:08:32 2010
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
@@ -146,7 +147,7 @@ public class BootStrapperTest extends Cl
         final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100};
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
+            int replicationFactor = Table.open(table).getReplicationStrategy().getReplicationFactor();
             for (int clusterSize : clusterSizes)
                 if (clusterSize >= replicationFactor)
                     testSourceTargetComputation(table, clusterSize, replicationFactor);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java Wed Dec  8 01:08:32 2010
@@ -30,11 +30,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.dht.BigIntegerToken;
 import org.apache.cassandra.dht.Token;
 
-public class OldNetworkTopologyStrategyTest
+public class OldNetworkTopologyStrategyTest extends SchemaLoader
 {
     private List<Token> endpointTokens;
     private List<Token> keyTokens;
@@ -71,7 +72,7 @@ public class OldNetworkTopologyStrategyT
         expectedResults.put("25", buildResult("254.0.0.4", "254.0.0.1", "254.0.0.2"));
         expectedResults.put("35", buildResult("254.0.0.1", "254.0.0.2", "254.0.0.3"));
 
-        runTestForReplicatedTables(strategy);
+        testGetEndpoints(strategy, keyTokens.toArray(new Token[0]));
     }
 
     /**
@@ -96,7 +97,7 @@ public class OldNetworkTopologyStrategyT
         expectedResults.put("25", buildResult("254.0.0.4", "254.1.0.3", "254.0.0.1"));
         expectedResults.put("35", buildResult("254.0.0.1", "254.1.0.3", "254.0.0.2"));
 
-        runTestForReplicatedTables(strategy);
+        testGetEndpoints(strategy, keyTokens.toArray(new Token[0]));
     }
 
     /**
@@ -122,16 +123,7 @@ public class OldNetworkTopologyStrategyT
         expectedResults.put("25", buildResult("254.1.0.4", "254.0.0.1", "254.0.0.2"));
         expectedResults.put("35", buildResult("254.0.0.1", "254.0.1.3", "254.1.0.4"));
 
-        runTestForReplicatedTables(strategy);
-    }
-
-    private void runTestForReplicatedTables(AbstractReplicationStrategy strategy) throws UnknownHostException
-    {
-        for (String table : DatabaseDescriptor.getNonSystemTables())
-        {
-            if (DatabaseDescriptor.getReplicationFactor(table) == 3)
-                testGetEndpoints(strategy, keyTokens.toArray(new Token[0]), table);
-        }
+        testGetEndpoints(strategy, keyTokens.toArray(new Token[0]));
     }
 
     private ArrayList<InetAddress> buildResult(String... addresses) throws UnknownHostException
@@ -156,7 +148,7 @@ public class OldNetworkTopologyStrategyT
         tmd.updateNormalToken(endpointToken, ep);
     }
 
-    private void testGetEndpoints(AbstractReplicationStrategy strategy, Token[] keyTokens, String table) throws UnknownHostException
+    private void testGetEndpoints(AbstractReplicationStrategy strategy, Token[] keyTokens) throws UnknownHostException
     {
         for (Token keyToken : keyTokens)
         {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java Wed Dec  8 01:08:32 2010
@@ -94,7 +94,7 @@ public class SimpleStrategyTest extends 
             for (int i = 0; i < keyTokens.length; i++)
             {
                 List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
-                assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size());
+                assertEquals(strategy.getReplicationFactor(), endpoints.size());
                 List<InetAddress> correctEndpoints = new ArrayList<InetAddress>();
                 for (int j = 0; j < endpoints.size(); j++)
                     correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
@@ -140,7 +140,7 @@ public class SimpleStrategyTest extends 
 
             StorageService.calculatePendingRanges(strategy, table);
 
-            int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
+            int replicationFactor = strategy.getReplicationFactor();
 
             for (int i = 0; i < keyTokens.length; i++)
             {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Wed Dec  8 01:08:32 2010
@@ -171,7 +171,7 @@ public class AntiEntropyServiceTest exte
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
-        Set<InetAddress> expected = addTokens(1 + DatabaseDescriptor.getReplicationFactor(tablename));
+        Set<InetAddress> expected = addTokens(1 + Table.open(tablename).getReplicationStrategy().getReplicationFactor());
         expected.remove(FBUtilities.getLocalAddress());
         assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
     }
@@ -182,7 +182,7 @@ public class AntiEntropyServiceTest exte
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
-        addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename));
+        addTokens(2 * Table.open(tablename).getReplicationStrategy().getReplicationFactor());
         AbstractReplicationStrategy ars = Table.open(tablename).getReplicationStrategy();
         Set<InetAddress> expected = new HashSet<InetAddress>();
         for (Range replicaRange : ars.getAddressRanges().get(FBUtilities.getLocalAddress()))

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=1043262&r1=1043261&r2=1043262&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Wed Dec  8 01:08:32 2010
@@ -92,7 +92,7 @@ public class MoveTest extends CleanupHel
             strategy = getStrategy(table, tmd);
             for (Token token : keyTokens)
             {
-                int replicationFactor = DatabaseDescriptor.getReplicationFactor(table);
+                int replicationFactor = strategy.getReplicationFactor();
 
                 HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, strategy.calculateNaturalEndpoints(token, tmd)));
                 HashSet<InetAddress> expected = new HashSet<InetAddress>();
@@ -217,7 +217,7 @@ public class MoveTest extends CleanupHel
             }
 
             // just to be sure that things still work according to the old tests, run them:
-            if (DatabaseDescriptor.getReplicationFactor(table) != 3)
+            if (strategy.getReplicationFactor() != 3)
                 continue;
             // tokens 5, 15 and 25 should go three nodes
             for (int i=0; i<3; ++i)
@@ -334,7 +334,7 @@ public class MoveTest extends CleanupHel
                 assertTrue(expectedEndpoints.get(table).get(keyTokens.get(i)).containsAll(endpoints));
             }
 
-            if (DatabaseDescriptor.getReplicationFactor(table) != 3)
+            if (strategy.getReplicationFactor() != 3)
                 continue;
             // leave this stuff in to guarantee the old tests work the way they were supposed to.
             // tokens 5, 15 and 25 should go three nodes



Mime
View raw message