cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/3] Move consistency level to the protocol level
Date Tue, 16 Oct 2012 16:00:44 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 1810548..8721462 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.34.0";
+  public static final String VERSION = "19.35.0";
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index 0a53222..8c793c2 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -461,8 +461,6 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
-      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bit_vector = new BitSet(1);
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index fe44b54..fb5681b 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -259,11 +259,6 @@ public final class CFMetaData
 
     public volatile CompressionParameters compressionParameters;
 
-    // Default consistency levels for CQL3. The default for those values is ONE,
-    // but we keep the internal default to null as it help handling thrift compatibility
-    private volatile ConsistencyLevel readConsistencyLevel;
-    private volatile ConsistencyLevel writeConsistencyLevel;
-
     // Processed infos used by CQL. This can be fully reconstructed from the CFMedata,
     // so it's not saved on disk. It is however costlyish to recreate for each query
     // so we cache it here (and update on each relevant CFMetadata change)
@@ -287,8 +282,6 @@ public final class CFMetaData
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
     public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
     public CFMetaData caching(Caching prop) {caching = prop; return this;}
-    public CFMetaData defaultReadCL(ConsistencyLevel prop) {readConsistencyLevel = prop; return this;}
-    public CFMetaData defaultWriteCL(ConsistencyLevel prop) {writeConsistencyLevel = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
     {
@@ -454,9 +447,7 @@ public final class CFMetaData
                       .compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
                       .compressionParameters(oldCFMD.compressionParameters)
                       .bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
-                      .caching(oldCFMD.caching)
-                      .defaultReadCL(oldCFMD.readConsistencyLevel)
-                      .defaultWriteCL(oldCFMD.writeConsistencyLevel);
+                      .caching(oldCFMD.caching);
     }
 
     /**
@@ -542,16 +533,6 @@ public final class CFMetaData
         return valueAlias;
     }
 
-    public ConsistencyLevel getReadConsistencyLevel()
-    {
-        return readConsistencyLevel == null ? ConsistencyLevel.ONE : readConsistencyLevel;
-    }
-
-    public ConsistencyLevel getWriteConsistencyLevel()
-    {
-        return writeConsistencyLevel == null ? ConsistencyLevel.ONE : writeConsistencyLevel;
-    }
-
     public CompressionParameters compressionParameters()
     {
         return compressionParameters;
@@ -614,8 +595,6 @@ public final class CFMetaData
             .append(compressionParameters, rhs.compressionParameters)
             .append(bloomFilterFpChance, rhs.bloomFilterFpChance)
             .append(caching, rhs.caching)
-            .append(readConsistencyLevel, rhs.readConsistencyLevel)
-            .append(writeConsistencyLevel, rhs.writeConsistencyLevel)
             .isEquals();
     }
 
@@ -646,8 +625,6 @@ public final class CFMetaData
             .append(compressionParameters)
             .append(bloomFilterFpChance)
             .append(caching)
-            .append(readConsistencyLevel)
-            .append(writeConsistencyLevel)
             .toHashCode();
     }
 
@@ -836,10 +813,6 @@ public final class CFMetaData
         }
         if (cfm.valueAlias != null)
             valueAlias = cfm.valueAlias;
-        if (cfm.readConsistencyLevel != null)
-            readConsistencyLevel = cfm.readConsistencyLevel;
-        if (cfm.writeConsistencyLevel != null)
-            writeConsistencyLevel = cfm.writeConsistencyLevel;
 
         bloomFilterFpChance = cfm.bloomFilterFpChance;
         caching = cfm.caching;
@@ -1331,10 +1304,6 @@ public final class CFMetaData
                                         : Column.create(valueAlias, timestamp, cfName, "value_alias"));
         cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
         cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
-        cf.addColumn(readConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_read_consistency")
-                                                  : Column.create(readConsistencyLevel.toString(), timestamp, cfName, "default_read_consistency"));
-        cf.addColumn(writeConsistencyLevel == null ? DeletedColumn.create(ldt, timestamp, cfName, "default_write_consistency")
-                                                   : Column.create(writeConsistencyLevel.toString(), timestamp, cfName, "default_write_consistency"));
     }
 
     // Package protected for use by tests
@@ -1379,10 +1348,6 @@ public final class CFMetaData
             if (result.has("value_alias"))
                 cfm.valueAlias(result.getBytes("value_alias"));
             cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options")));
-            if (result.has("default_read_consistency"))
-                cfm.defaultReadCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_read_consistency")));
-            if (result.has("default_write_consistency"))
-                cfm.defaultWriteCL(Enum.valueOf(ConsistencyLevel.class, result.getString("default_write_consistency")));
 
             return cfm;
         }
@@ -1546,8 +1511,6 @@ public final class CFMetaData
             .append("compressionOptions", compressionParameters.asThriftOptions())
             .append("bloomFilterFpChance", bloomFilterFpChance)
             .append("caching", caching)
-            .append("readConsistencyLevel", readConsistencyLevel)
-            .append("writeConsistencyLevel", writeConsistencyLevel)
             .toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 00fd30b..6733d0d 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
  */
 public class Attributes
 {
-    public ConsistencyLevel cLevel;
     public Long timestamp;
     public int timeToLive;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index cb78db0..53a5bc6 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -48,8 +48,6 @@ public class CFPropDefs extends PropertyDefinitions
     public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
-    public static final String KW_DEFAULT_R_CONSISTENCY = "default_read_consistency";
-    public static final String KW_DEFAULT_W_CONSISTENCY = "default_write_consistency";
 
     public static final String KW_COMPACTION = "compaction";
     public static final String KW_COMPRESSION = "compression";
@@ -70,8 +68,6 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_BF_FP_CHANCE);
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
-        keywords.add(KW_DEFAULT_W_CONSISTENCY);
-        keywords.add(KW_DEFAULT_R_CONSISTENCY);
 
         obsoleteKeywords.add("compaction_strategy_class");
         obsoleteKeywords.add("compaction_strategy_options");
@@ -143,26 +139,6 @@ public class CFPropDefs extends PropertyDefinitions
 
         if (!getCompressionOptions().isEmpty())
             cfm.compressionParameters(CompressionParameters.create(getCompressionOptions()));
-
-        try
-        {
-            ConsistencyLevel readCL = getConsistencyLevel(KW_DEFAULT_R_CONSISTENCY);
-            if (readCL != null)
-            {
-                readCL.validateForRead(cfm.ksName);
-                cfm.defaultReadCL(readCL);
-            }
-            ConsistencyLevel writeCL = getConsistencyLevel(KW_DEFAULT_W_CONSISTENCY);
-            if (writeCL != null)
-            {
-                writeCL.validateForWrite(cfm.ksName);
-                cfm.defaultWriteCL(writeCL);
-            }
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new ConfigurationException(e.getMessage(), e.getCause());
-        }
     }
 
     public ConsistencyLevel getConsistencyLevel(String key) throws ConfigurationException, SyntaxException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 00fb406..90883e6 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.exceptions.*;
@@ -53,7 +54,7 @@ public interface CQLStatement
      * @param variables the values for bounded variables. The implementation
      * can assume that each bound term have a corresponding value.
      */
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variante of execute used for internal query against the system tables, and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 320e934..76f2509 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -42,7 +42,6 @@ options {
     import org.apache.cassandra.exceptions.InvalidRequestException;
     import org.apache.cassandra.exceptions.SyntaxException;
     import org.apache.cassandra.utils.Pair;
-    import org.apache.cassandra.db.ConsistencyLevel;
 }
 
 @members {
@@ -189,19 +188,16 @@ useStatement returns [UseStatement stmt]
 selectStatement returns [SelectStatement.RawStatement expr]
     @init {
         boolean isCount = false;
-        ConsistencyLevel cLevel = null;
         int limit = 10000;
         Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>();
     }
     : K_SELECT ( sclause=selectClause | (K_COUNT '(' sclause=selectCountClause ')' { isCount = true; }) )
       K_FROM cf=columnFamilyName
-      ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); } )?
       ( K_WHERE wclause=whereClause )?
       ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
       ( K_LIMIT rows=INTEGER { limit = Integer.parseInt($rows.text); } )?
       {
-          SelectStatement.Parameters params = new SelectStatement.Parameters(cLevel,
-                                                                             limit,
+          SelectStatement.Parameters params = new SelectStatement.Parameters(limit,
                                                                              orderings,
                                                                              isCount);
           $expr = new SelectStatement.RawStatement(cf, params, sclause, wclause);
@@ -240,9 +236,8 @@ orderByClause[Map<ColumnIdentifier, Boolean> orderings]
 /**
  * INSERT INTO <CF> (<column>, <column>, <column>, ...)
  * VALUES (<value>, <value>, <value>, ...)
- * USING CONSISTENCY <level> AND TIMESTAMP <long>;
+ * USING TIMESTAMP <long>;
  *
- * Consistency level is set to ONE by default
  */
 insertStatement returns [UpdateStatement expr]
     @init {
@@ -269,8 +264,7 @@ usingClauseDelete[Attributes attrs]
     ;
 
 usingClauseDeleteObjective[Attributes attrs]
-    : K_CONSISTENCY K_LEVEL  { attrs.cLevel = ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); }
-    | K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); }
+    : K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); }
     ;
 
 usingClauseObjective[Attributes attrs]
@@ -727,7 +721,6 @@ collection_type returns [ParsedType pt]
 
 unreserved_keyword returns [String str]
     : k=( K_KEY
-        | K_CONSISTENCY
         | K_CLUSTERING
         | K_LEVEL
         | K_COUNT
@@ -755,7 +748,6 @@ K_UPDATE:      U P D A T E;
 K_WITH:        W I T H;
 K_LIMIT:       L I M I T;
 K_USING:       U S I N G;
-K_CONSISTENCY: C O N S I S T E N C Y;
 K_LEVEL:       ( O N E
                | Q U O R U M
                | A L L

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 58a8f44..ce9dfec 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -122,20 +122,20 @@ public class QueryProcessor
         }
     }
 
-    private static ResultMessage processStatement(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
+    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
         statement.checkAccess(clientState);
         statement.validate(clientState);
-        ResultMessage result = statement.execute(clientState, variables);
+        ResultMessage result = statement.execute(cl, clientState, variables);
         return result == null ? ResultMessage.Void.instance() : result;
     }
 
-    public static ResultMessage process(String queryString, ClientState clientState)
+    public static ResultMessage process(String queryString, ConsistencyLevel cl, ClientState clientState)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("CQL QUERY: {}", queryString);
-        return processStatement(getStatement(queryString, clientState).statement, clientState, Collections.<ByteBuffer>emptyList());
+        return processStatement(getStatement(queryString, clientState).statement, cl, clientState, Collections.<ByteBuffer>emptyList());
     }
 
     public static UntypedResultSet processInternal(String query)
@@ -210,7 +210,7 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
+    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
         // Check to see if there are any bound variables to verify
@@ -228,7 +228,7 @@ public class QueryProcessor
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
-        return processStatement(statement, clientState, variables);
+        return processStatement(statement, cl, clientState, variables);
     }
 
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 92c708b..41f4f76 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.IMutation;
@@ -78,49 +79,28 @@ public class BatchStatement extends ModificationStatement
         }
     }
 
-    @Override
-    public ConsistencyLevel getConsistencyLevel()
-    {
-        // We have validated that either the consistency is set, or all statements have the same default CL (see validate())
-        return isSetConsistencyLevel()
-             ? super.getConsistencyLevel()
-             : (statements.isEmpty() ? ConsistencyLevel.ONE : statements.get(0).getConsistencyLevel());
-    }
-
     public void validate(ClientState state) throws InvalidRequestException
     {
         if (getTimeToLive() != 0)
             throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
 
-        ConsistencyLevel cLevel = null;
         for (ModificationStatement statement : statements)
         {
-            if (statement.isSetConsistencyLevel())
-                throw new InvalidRequestException("Consistency level must be set on the BATCH, not individual statements");
-
             if (isSetTimestamp() && statement.isSetTimestamp())
                 throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
 
             if (statement.getTimeToLive() < 0)
                 throw new InvalidRequestException("A TTL must be greater or equal to 0");
-
-            if (isSetConsistencyLevel())
-            {
-                getConsistencyLevel().validateForWrite(statement.keyspace());
-            }
-            else
-            {
-                // If no consistency is set for the batch, we need all the CF in the batch to have the same default consistency level,
-                // otherwise the batch is invalid (i.e. the user must explicitely set the CL)
-                ConsistencyLevel stmtCL = statement.getConsistencyLevel();
-                if (cLevel != null && cLevel != stmtCL)
-                    throw new InvalidRequestException("The tables involved in the BATCH have different default write consistency, you must explicitely set the BATCH consitency level with USING CONSISTENCY");
-                cLevel = stmtCL;
-            }
         }
     }
 
-    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+    {
+        for (ModificationStatement statement : statements)
+            statement.validateConsistency(cl);
+    }
+
+    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
@@ -130,7 +110,7 @@ public class BatchStatement extends ModificationStatement
                 statement.setTimestamp(getTimestamp(clientState));
 
             // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : statement.getMutations(clientState, variables, local))
+            for (IMutation m : statement.getMutations(clientState, variables, local, cl))
             {
                 if (m instanceof CounterMutation && type != Type.COUNTER)
                     throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
@@ -169,6 +149,6 @@ public class BatchStatement extends ModificationStatement
 
     public String toString()
     {
-        return String.format("BatchStatement(type=%s, statements=%s, consistency=%s)", type, statements, getConsistencyLevel());
+        return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 86af858..c7d32b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.operations.ListOperation;
 import org.apache.cassandra.cql3.operations.MapOperation;
 import org.apache.cassandra.cql3.operations.Operation;
 import org.apache.cassandra.cql3.operations.SetOperation;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.db.RowMutation;
@@ -60,7 +61,15 @@ public class DeleteStatement extends ModificationStatement
         this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size());
     }
 
-    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+    {
+        if (type == Type.COUNTER)
+            cl.validateCounterForWrite(cfDef.cfm);
+        else
+            cl.validateForWrite(cfDef.cfm.ksName);
+    }
+
+    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         // keys
@@ -90,7 +99,7 @@ public class DeleteStatement extends ModificationStatement
             }
         }
 
-        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null;
+        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
 
         Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
         UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
@@ -228,10 +237,9 @@ public class DeleteStatement extends ModificationStatement
 
     public String toString()
     {
-        return String.format("DeleteStatement(name=%s, columns=%s, consistency=%s keys=%s)",
+        return String.format("DeleteStatement(name=%s, columns=%s, keys=%s)",
                              cfName,
                              columns,
-                             getConsistencyLevel(),
                              whereClause);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9f335a7..6e92ec2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -51,19 +51,17 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     protected Type type;
 
-    private final ConsistencyLevel cLevel;
     private Long timestamp;
     private final int timeToLive;
 
     public ModificationStatement(CFName name, Attributes attrs)
     {
-        this(name, attrs.cLevel, attrs.timestamp, attrs.timeToLive);
+        this(name, attrs.timestamp, attrs.timeToLive);
     }
 
-    public ModificationStatement(CFName name, ConsistencyLevel cLevel, Long timestamp, int timeToLive)
+    public ModificationStatement(CFName name, Long timestamp, int timeToLive)
     {
         super(name);
-        this.cLevel = cLevel;
         this.timestamp = timestamp;
         this.timeToLive = timeToLive;
     }
@@ -80,14 +78,18 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
         if (timeToLive > ExpiringColumn.MAX_TTL)
             throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
-
-        getConsistencyLevel().validateForWrite(keyspace());
     }
 
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
+
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
     {
-        Collection<? extends IMutation> mutations = getMutations(state, variables, false);
-        ConsistencyLevel cl = getConsistencyLevel();
+        if (cl == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+
+        validateConsistency(cl);
+
+        Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl);
 
         // The type should have been set by now or we have a bug
         assert type != null;
@@ -111,33 +113,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         return null;
     }
 
-
     public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true))
+        for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null))
             mutation.apply();
         return null;
     }
 
-    public ConsistencyLevel getConsistencyLevel()
-    {
-        if (cLevel != null)
-            return cLevel;
-
-        CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
-        return cfm == null ? ConsistencyLevel.ONE : cfm.getWriteConsistencyLevel();
-    }
-
-    /**
-     * True if an explicit consistency level was parsed from the statement.
-     *
-     * @return true if a consistency was parsed, false otherwise.
-     */
-    public boolean isSetConsistencyLevel()
-    {
-        return cLevel != null;
-    }
-
     public long getTimestamp(ClientState clientState)
     {
         return timestamp == null ? clientState.getTimestamp() : timestamp;
@@ -158,9 +140,18 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         return timeToLive;
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite, boolean local)
+    protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
+        try
+        {
+            cl.validateForRead(keyspace());
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
+        }
+
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
         for (ByteBuffer key : keys)
         {
@@ -177,7 +168,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         {
             List<Row> rows = local
                            ? SelectStatement.readLocally(keyspace(), commands)
-                           : StorageProxy.read(commands, getConsistencyLevel());
+                           : StorageProxy.read(commands, cl);
 
             Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
             for (Row row : rows)
@@ -212,7 +203,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException;
 
     public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
index d3d1c9f..48c2d03 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
@@ -17,7 +17,12 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
@@ -40,6 +45,13 @@ public abstract class PermissionAlteringStatement extends ParsedStatement implem
     public void validate(ClientState state)
     {}
 
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+    {
+        return execute(state, variables);
+    }
+
+    public abstract ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException;
+
     public ResultMessage executeInternal(ClientState state)
     {
         // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index e9d4450..ef0f0a1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -21,18 +21,15 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
@@ -77,7 +74,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public void validate(ClientState state) throws RequestValidationException
     {}
 
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException
     {
         announceMigration();
         String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index ec50ef9..226d004 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -118,13 +118,18 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
     {
+        if (cl == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+
+        cl.validateForRead(keyspace());
+
         try
         {
             List<Row> rows = isKeyRange
-                           ? StorageProxy.getRangeSlice(getRangeCommand(variables), getConsistencyLevel())
-                           : StorageProxy.read(getSliceCommands(variables), getConsistencyLevel());
+                           ? StorageProxy.getRangeSlice(getRangeCommand(variables), cl)
+                           : StorageProxy.read(getSliceCommands(variables), cl);
 
             return processResults(rows, variables);
         }
@@ -322,11 +327,6 @@ public class SelectStatement implements CQLStatement
         return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) ? parameters.limit + 1 : parameters.limit;
     }
 
-    private ConsistencyLevel getConsistencyLevel()
-    {
-        return parameters.consistencyLevel == null ? cfDef.cfm.getReadConsistencyLevel() : parameters.consistencyLevel;
-    }
-
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
@@ -971,8 +971,6 @@ public class SelectStatement implements CQLStatement
         public ParsedStatement.Prepared prepare() throws InvalidRequestException
         {
             CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
-            if (parameters.consistencyLevel != null)
-                parameters.consistencyLevel.validateForRead(keyspace());
 
             if (parameters.limit <= 0)
                 throw new InvalidRequestException("LIMIT must be strictly positive");
@@ -1280,12 +1278,11 @@ public class SelectStatement implements CQLStatement
         @Override
         public String toString()
         {
-            return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s, cLevel=%s, limit=%s]",
+            return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s, limit=%s]",
                     cfName,
                     selectClause,
                     whereClause,
                     parameters.isCount,
-                    parameters.consistencyLevel,
                     parameters.limit);
         }
     }
@@ -1428,13 +1425,11 @@ public class SelectStatement implements CQLStatement
     public static class Parameters
     {
         private final int limit;
-        private final ConsistencyLevel consistencyLevel;
         private final Map<ColumnIdentifier, Boolean> orderings;
         private final boolean isCount;
 
-        public Parameters(ConsistencyLevel consistency, int limit, Map<ColumnIdentifier, Boolean> orderings, boolean isCount)
+        public Parameters(int limit, Map<ColumnIdentifier, Boolean> orderings, boolean isCount)
         {
-            this.consistencyLevel = consistency;
             this.limit = limit;
             this.orderings = orderings;
             this.isCount = isCount;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index e709a06..af4b4e0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
@@ -52,7 +53,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index c3401b2..2065f32 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -96,9 +96,16 @@ public class UpdateStatement extends ModificationStatement
         this.columns = null;
     }
 
+    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+    {
+        if (type == Type.COUNTER)
+            cl.validateCounterForWrite(cfDef.cfm);
+        else
+            cl.validateForWrite(cfDef.cfm.ksName);
+    }
 
     /** {@inheritDoc} */
-    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local)
+    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
@@ -125,13 +132,13 @@ public class UpdateStatement extends ModificationStatement
             }
         }
 
-        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null;
+        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
 
         Collection<IMutation> mutations = new LinkedList<IMutation>();
         UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive());
 
         for (ByteBuffer key: keys)
-            mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key)));
+            mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key), cl));
 
         return mutations;
     }
@@ -196,7 +203,7 @@ public class UpdateStatement extends ModificationStatement
      *
      * @throws InvalidRequestException on the wrong request
      */
-    private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group)
+    private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ColumnGroupMap group, ConsistencyLevel cl)
     throws InvalidRequestException
     {
         validateKey(key);
@@ -252,7 +259,7 @@ public class UpdateStatement extends ModificationStatement
             }
         }
 
-        return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
+        return (hasCounterColumn) ? new CounterMutation(rm, cl) : rm;
     }
 
     private boolean addToMutation(ColumnFamily cf,
@@ -319,9 +326,6 @@ public class UpdateStatement extends ModificationStatement
 
         // Deal here with the keyspace overwrite thingy to avoid mistake
         CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), type == Type.COUNTER);
-        if (type == Type.COUNTER)
-            getConsistencyLevel().validateCounterForWrite(metadata);
-
         cfDef = metadata.getCfDef();
 
         if (columns == null)
@@ -440,11 +444,10 @@ public class UpdateStatement extends ModificationStatement
 
     public String toString()
     {
-        return String.format("UpdateStatement(name=%s, keys=%s, columns=%s, consistency=%s, timestamp=%s, timeToLive=%s)",
+        return String.format("UpdateStatement(name=%s, keys=%s, columns=%s, timestamp=%s, timeToLive=%s)",
                              cfName,
                              whereClause,
                              columns,
-                             getConsistencyLevel(),
                              isSetTimestamp() ? getTimestamp(null) : "<now>",
                              getTimeToLive());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 70e95f6..c381978 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
@@ -48,7 +49,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
     {
         state.setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 6a5ef45..16b953f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1651,11 +1651,41 @@ public class CassandraServer implements Cassandra.Iface
                 logger.debug("execute_cql_query");
             }
 
-            ClientState cState = state();
-            if (cState.getCQLVersion().major == 2)
-                return QueryProcessor.process(queryString, state());
+            return QueryProcessor.process(queryString, state());
+        }
+        catch (RequestExecutionException e)
+        {
+            ThriftConversion.rethrow(e);
+            return null;
+        }
+        catch (RequestValidationException e)
+        {
+            throw ThriftConversion.toThrift(e);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
+    }
+
+    public CqlResult execute_cql3_query(ByteBuffer query, Compression compression, ConsistencyLevel cLevel)
+    throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+    {
+        try
+        {
+            String queryString = uncompress(query, compression);
+            if (startSessionIfRequested())
+            {
+                Tracing.instance().begin("execute_cql3_query",
+                                         ImmutableMap.of("query", queryString));
+            }
             else
-                return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+            {
+                logger.debug("execute_cql3_query");
+            }
+
+            ClientState cState = state();
+            return org.apache.cassandra.cql3.QueryProcessor.process(queryString, ThriftConversion.fromThrift(cLevel), cState).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -1680,13 +1710,27 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
+            ClientState cState = state();
             String queryString = uncompress(query,compression);
+            return QueryProcessor.prepare(queryString, cState);
+        }
+        catch (RequestValidationException e)
+        {
+            throw ThriftConversion.toThrift(e);
+        }
+    }
 
+    public CqlPreparedResult prepare_cql3_query(ByteBuffer query, Compression compression)
+    throws InvalidRequestException, TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("prepare_cql3_query");
+
+        try
+        {
             ClientState cState = state();
-            if (cState.getCQLVersion().major == 2)
-                return QueryProcessor.prepare(queryString, cState);
-            else
-                return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
+            String queryString = uncompress(query,compression);
+            return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
         }
         catch (RequestValidationException e)
         {
@@ -1710,30 +1754,13 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ClientState cState = state();
-            if (cState.getCQLVersion().major == 2)
-            {
-                CQLStatement statement = cState.getPrepared().get(itemId);
+            CQLStatement statement = cState.getPrepared().get(itemId);
 
-                if (statement == null)
-                    throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
-                logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
+            if (statement == null)
+                throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+            logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
 
-                return QueryProcessor.processPrepared(statement, cState, bindVariables);
-            }
-            else
-            {
-                org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
-
-                if (statement == null)
-                    throw new InvalidRequestException(String.format("Prepared query with ID %d not found" +
-                                                                    " (either the query was not prepared on this host (maybe the host has been restarted?)" +
-                                                                    " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
-                                                                    itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
-                logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
-                        statement.getBoundsTerms());
-
-                return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
-            }
+            return QueryProcessor.processPrepared(statement, cState, bindVariables);
         }
         catch (RequestExecutionException e)
         {
@@ -1750,18 +1777,51 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    public void set_cql_version(String version) throws InvalidRequestException
+    public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel)
+    throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
     {
-        logger.debug("set_cql_version: " + version);
+        if (startSessionIfRequested())
+        {
+            // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
+            Tracing.instance().begin("execute_prepared_cql3_query", Collections.<String, String>emptyMap());
+        }
+        else
+        {
+            logger.debug("execute_prepared_cql3_query");
+        }
 
         try
         {
-            state().setCQLVersion(version);
+            ClientState cState = state();
+            org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
+
+            if (statement == null)
+                throw new InvalidRequestException(String.format("Prepared query with ID %d not found" +
+                                                                " (either the query was not prepared on this host (maybe the host has been restarted?)" +
+                                                                " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
+                                                                itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
+            logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
+
+            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState, bindVariables).toThriftResult();
+        }
+        catch (RequestExecutionException e)
+        {
+            ThriftConversion.rethrow(e);
+            return null;
         }
         catch (RequestValidationException e)
         {
             throw ThriftConversion.toThrift(e);
         }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
+    }
+
+    public void set_cql_version(String version) throws InvalidRequestException
+    {
+        // Deprecated, no-op
     }
 
     public ByteBuffer trace_next_query() throws TException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index fe8863a..45affc6 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -31,6 +31,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.util.CharsetUtil;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+
 /**
  * ChannelBuffer utility methods.
  * Note that contrarily to ByteBufferUtil, these method do "read" the
@@ -131,6 +133,29 @@ public abstract class CBUtil
         }
     }
 
+    public static ChannelBuffer consistencyLevelToCB(ConsistencyLevel consistency)
+    {
+        if (consistency == null)
+            return shortToCB(0);
+        else
+            return stringToCB(consistency.toString());
+    }
+
+    public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb)
+    {
+        String cl = CBUtil.readString(cb);
+        try
+        {
+            if (cl.isEmpty())
+                return null;
+            return Enum.valueOf(ConsistencyLevel.class, cl.toUpperCase());
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ProtocolException("Unknown consistency level: " + cl);
+        }
+    }
+
     public static ChannelBuffer longStringToCB(String str)
     {
         ChannelBuffer bytes = bytes(str);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 3b4ace9..46a3297 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -25,8 +25,9 @@ import java.util.*;
 
 import com.google.common.base.Splitter;
 
-import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.utils.Hex;
 
 public class Client extends SimpleClient
@@ -99,7 +100,7 @@ public class Client extends SimpleClient
         else if (msgType.equals("QUERY"))
         {
             String query = line.substring(6);
-            return new QueryMessage(query);
+            return new QueryMessage(query, ConsistencyLevel.ONE);
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -127,7 +128,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(id, values);
+                return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 8132e65..1d67317 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -36,6 +36,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.logging.InternalLoggerFactory;
 import org.jboss.netty.logging.Slf4JLoggerFactory;
 
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.service.ClientState;
 
@@ -112,9 +113,9 @@ public class SimpleClient
         execute(msg);
     }
 
-    public ResultMessage execute(String query)
+    public ResultMessage execute(String query, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new QueryMessage(query));
+        Message.Response msg = execute(new QueryMessage(query, consistency));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -126,9 +127,9 @@ public class SimpleClient
         return (ResultMessage.Prepared)msg;
     }
 
-    public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values)
+    public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(statementId, values));
+        Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 73a965f..b42ba62 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -61,7 +61,7 @@ public class ErrorMessage extends Message.Response
                     break;
                 case UNAVAILABLE:
                     {
-                        ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
+                        ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
                         int required = body.readInt();
                         int alive = body.readInt();
                         te = new UnavailableException(cl, required, alive);
@@ -132,10 +132,8 @@ public class ErrorMessage extends Message.Response
             {
                 case UNAVAILABLE:
                     UnavailableException ue = (UnavailableException)msg.error;
-                    ByteBuffer ueCl = ByteBufferUtil.bytes(ue.consistency.toString());
-
-                    acb = ChannelBuffers.buffer(2 + ueCl.remaining() + 8);
-                    acb.writeShort((short)ueCl.remaining());
+                    ChannelBuffer ueCl = CBUtil.consistencyLevelToCB(ue.consistency);
+                    acb = ChannelBuffers.buffer(ueCl.readableBytes() + 8);
                     acb.writeBytes(ueCl);
                     acb.writeInt(ue.required);
                     acb.writeInt(ue.alive);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 4400d12..842fb22 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -25,6 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
@@ -42,7 +43,8 @@ public class ExecuteMessage extends Message.Request
             for (int i = 0; i < count; i++)
                 values.add(CBUtil.readValue(body));
 
-            return new ExecuteMessage(id, values);
+            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+            return new ExecuteMessage(id, values, consistency);
         }
 
         public ChannelBuffer encode(ExecuteMessage msg)
@@ -53,7 +55,7 @@ public class ExecuteMessage extends Message.Request
             //   - The values
             //   - options
             int vs = msg.values.size();
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
             builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
             builder.add(CBUtil.shortToCB(vs));
 
@@ -61,23 +63,26 @@ public class ExecuteMessage extends Message.Request
             for (ByteBuffer value : msg.values)
                 builder.addValue(value);
 
+            builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
             return builder.build();
         }
     };
 
     public final MD5Digest statementId;
     public final List<ByteBuffer> values;
+    public final ConsistencyLevel consistency;
 
-    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values)
+    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        this(MD5Digest.wrap(statementId), values);
+        this(MD5Digest.wrap(statementId), values, consistency);
     }
 
-    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values)
+    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
         this.values = values;
+        this.consistency = consistency;
     }
 
     public ChannelBuffer encode()
@@ -95,7 +100,7 @@ public class ExecuteMessage extends Message.Request
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
-            return QueryProcessor.processPrepared(statement, c.clientState(), values);
+            return QueryProcessor.processPrepared(statement, consistency, c.clientState(), values);
         }
         catch (Exception e)
         {
@@ -106,6 +111,6 @@ public class ExecuteMessage extends Message.Request
     @Override
     public String toString()
     {
-        return "EXECUTE " + statementId + " with " + values.size() + " values";
+        return "EXECUTE " + statementId + " with " + values.size() + " values at consistency " + consistency;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/297f530c/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 0880ee0..5223528 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -18,8 +18,10 @@
 package org.apache.cassandra.transport.messages;
 
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.transport.*;
 
@@ -33,21 +35,25 @@ public class QueryMessage extends Message.Request
         public QueryMessage decode(ChannelBuffer body)
         {
             String query = CBUtil.readLongString(body);
-            return new QueryMessage(query);
+            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+            return new QueryMessage(query, consistency);
         }
 
         public ChannelBuffer encode(QueryMessage msg)
         {
-            return CBUtil.longStringToCB(msg.query);
+
+            return ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query), CBUtil.consistencyLevelToCB(msg.consistency));
         }
     };
 
     public final String query;
+    public final ConsistencyLevel consistency;
 
-    public QueryMessage(String query)
+    public QueryMessage(String query, ConsistencyLevel consistency)
     {
         super(Message.Type.QUERY);
         this.query = query;
+        this.consistency = consistency;
     }
 
     public ChannelBuffer encode()
@@ -59,7 +65,7 @@ public class QueryMessage extends Message.Request
     {
         try
         {
-            return QueryProcessor.process(query, ((ServerConnection)connection).clientState());
+            return QueryProcessor.process(query, consistency, ((ServerConnection)connection).clientState());
         }
         catch (Exception e)
         {


Mime
View raw message