cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Add CQL support for CAS
Date Tue, 30 Apr 2013 12:13:41 GMT
Updated Branches:
  refs/heads/trunk 851fe6400 -> e431fb722


Add CQL support for CAS

patch by slebresne; reviewed by iamaleksey for CASSANDRA-5443


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e431fb72
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e431fb72
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e431fb72

Branch: refs/heads/trunk
Commit: e431fb722f80d8957a0a7fd2ecf80333e9275c53
Parents: 851fe64
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Thu Apr 25 18:18:37 2013 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Apr 30 14:12:43 2013 +0200

----------------------------------------------------------------------
 src/java/org/apache/cassandra/cql3/Attributes.java |   12 +
 src/java/org/apache/cassandra/cql3/Cql.g           |   41 +-
 src/java/org/apache/cassandra/cql3/ResultSet.java  |    2 +-
 .../cassandra/cql3/statements/BatchStatement.java  |  130 +++--
 .../cassandra/cql3/statements/DeleteStatement.java |  157 ++----
 .../cql3/statements/ModificationStatement.java     |  457 ++++++++++++---
 .../cassandra/cql3/statements/UpdateStatement.java |  340 +++--------
 src/java/org/apache/cassandra/db/Column.java       |    5 +
 .../org/apache/cassandra/db/DeletedColumn.java     |    6 +
 .../org/apache/cassandra/db/ExpiringColumn.java    |    6 +
 .../org/apache/cassandra/service/StorageProxy.java |   44 ++-
 .../org/apache/cassandra/service/paxos/Commit.java |    2 +-
 .../apache/cassandra/thrift/CassandraServer.java   |    4 -
 13 files changed, 702 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index d6fbd1f..62f98b2 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.cql3;
 
+import org.apache.cassandra.db.ExpiringColumn;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
 /**
  * Utility class for the Parser to gather attributes for modification
  * statements.
@@ -25,4 +28,13 @@ public class Attributes
 {
     public Long timestamp;
     public int timeToLive;
+
+    public void validate() throws InvalidRequestException
+    {
+        if (timeToLive < 0)
+            throw new InvalidRequestException("A TTL must be greater or equal to 0");
+
+        if (timeToLive > ExpiringColumn.MAX_TTL)
+            throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 774a0e8..45b6d77 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -273,7 +273,7 @@ orderByClause[Map<ColumnIdentifier, Boolean> orderings]
  * USING TIMESTAMP <long>;
  *
  */
-insertStatement returns [UpdateStatement expr]
+insertStatement returns [UpdateStatement.ParsedInsert expr]
     @init {
         Attributes attrs = new Attributes();
         List<ColumnIdentifier> columnNames  = new ArrayList<ColumnIdentifier>();
@@ -285,7 +285,7 @@ insertStatement returns [UpdateStatement expr]
           '(' v1=term { values.add(v1); } ( ',' vn=term { values.add(vn); } )* ')'
         ( usingClause[attrs] )?
       {
-          $expr = new UpdateStatement(cf, attrs, columnNames, values);
+          $expr = new UpdateStatement.ParsedInsert(cf, attrs, columnNames, values);
       }
     ;
 
@@ -312,27 +312,39 @@ usingClauseObjective[Attributes attrs]
  * SET name1 = value1, name2 = value2
  * WHERE key = value;
  */
-updateStatement returns [UpdateStatement expr]
+updateStatement returns [UpdateStatement.ParsedUpdate expr]
     @init {
         Attributes attrs = new Attributes();
         List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>();
+        boolean ifNotExists = false;
     }
     : K_UPDATE cf=columnFamilyName
       ( usingClause[attrs] )?
       K_SET columnOperation[operations] (',' columnOperation[operations])*
       K_WHERE wclause=whereClause
+      ( K_IF (K_NOT K_EXISTS { ifNotExists = true; } | conditions=updateCondition) )?
       {
-          return new UpdateStatement(cf, operations, wclause, attrs);
+          return new UpdateStatement.ParsedUpdate(cf,
+                                                  attrs,
+                                                  operations,
+                                                  wclause,
+                                                  conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions,
+                                                  ifNotExists);
       }
     ;
 
+updateCondition returns [List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions]
+    @init { conditions = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>(); }
+    : columnOperation[conditions] ( K_AND columnOperation[conditions] )*
+    ;
+
 /**
  * DELETE name1, name2
  * FROM <CF>
  * USING TIMESTAMP <long>
  * WHERE KEY = keyname;
  */
-deleteStatement returns [DeleteStatement expr]
+deleteStatement returns [DeleteStatement.Parsed expr]
     @init {
         Attributes attrs = new Attributes();
         List<Operation.RawDeletion> columnDeletions = Collections.emptyList();
@@ -341,8 +353,13 @@ deleteStatement returns [DeleteStatement expr]
       K_FROM cf=columnFamilyName
       ( usingClauseDelete[attrs] )?
       K_WHERE wclause=whereClause
+      ( K_IF conditions=updateCondition )?
       {
-          return new DeleteStatement(cf, columnDeletions, wclause, attrs);
+          return new DeleteStatement.Parsed(cf,
+                                            attrs,
+                                            columnDeletions,
+                                            wclause,
+                                            conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
       }
     ;
 
@@ -381,10 +398,10 @@ deleteOp returns [Operation.RawDeletion op]
  *   ...
  * APPLY BATCH
  */
-batchStatement returns [BatchStatement expr]
+batchStatement returns [BatchStatement.Parsed expr]
     @init {
         BatchStatement.Type type = BatchStatement.Type.LOGGED;
-        List<ModificationStatement> statements = new ArrayList<ModificationStatement>();
+        List<ModificationStatement.Parsed> statements = new ArrayList<ModificationStatement.Parsed>();
         Attributes attrs = new Attributes();
     }
     : K_BEGIN
@@ -393,11 +410,11 @@ batchStatement returns [BatchStatement expr]
           s1=batchStatementObjective ';'? { statements.add(s1); } ( sN=batchStatementObjective ';'? { statements.add(sN); } )*
       K_APPLY K_BATCH
       {
-          return new BatchStatement(type, statements, attrs);
+          return new BatchStatement.Parsed(type, attrs, statements);
       }
     ;
 
-batchStatementObjective returns [ModificationStatement statement]
+batchStatementObjective returns [ModificationStatement.Parsed statement]
     : i=insertStatement  { $statement = i; }
     | u=updateStatement  { $statement = u; }
     | d=deleteStatement  { $statement = d; }
@@ -873,6 +890,7 @@ unreserved_function_keyword returns [String str]
         | K_SUPERUSER
         | K_NOSUPERUSER
         | K_PASSWORD
+        | K_EXISTS
         ) { $str = $k.text; }
     | t=native_type { $str = t.toString(); }
     ;
@@ -926,6 +944,7 @@ K_ASC:         A S C;
 K_DESC:        D E S C;
 K_ALLOW:       A L L O W;
 K_FILTERING:   F I L T E R I N G;
+K_IF:          I F;
 
 K_GRANT:       G R A N T;
 K_ALL:         A L L;
@@ -963,6 +982,8 @@ K_TOKEN:       T O K E N;
 K_WRITETIME:   W R I T E T I M E;
 
 K_NULL:        N U L L;
+K_NOT:         N O T;
+K_EXISTS:      E X I S T S;
 
 K_MAP:         M A P;
 K_LIST:        L I S T;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 5f0dcf1..1d914da 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -47,7 +47,7 @@ public class ResultSet
         this(new Metadata(metadata), new ArrayList<List<ByteBuffer>>());
     }
 
-    private ResultSet(Metadata metadata, List<List<ByteBuffer>> rows)
+    public ResultSet(Metadata metadata, List<List<ByteBuffer>> rows)
     {
         this.metadata = metadata;
         this.rows = rows;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d5038da..15e9a09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -28,16 +28,26 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
  *
  */
-public class BatchStatement extends ModificationStatement
+public class BatchStatement implements CQLStatement
 {
-    // statements to execute
-    protected final List<ModificationStatement> statements;
+    public static enum Type
+    {
+        LOGGED, UNLOGGED, COUNTER
+    }
+
+    private final int boundTerms;
+    public final Type type;
+    private final List<ModificationStatement> statements;
+    private final Attributes attrs;
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -47,64 +57,53 @@ public class BatchStatement extends ModificationStatement
      * @param statements a list of UpdateStatements
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
-    public BatchStatement(Type type, List<ModificationStatement> statements, Attributes attrs)
+    public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
-        super(null, attrs);
+        this.boundTerms = boundTerms;
         this.type = type;
         this.statements = statements;
+        this.attrs = attrs;
     }
 
-    @Override
-    public void prepareKeyspace(ClientState state) throws InvalidRequestException
+    public int getBoundsTerms()
     {
-        for (ModificationStatement statement : statements)
-            statement.prepareKeyspace(state);
+        return boundTerms;
     }
 
-    @Override
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
         for (ModificationStatement statement : statements)
-            state.hasColumnFamilyAccess(statement.keyspace(), statement.columnFamily(), Permission.MODIFY);
+            statement.checkAccess(state);
     }
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        if (getTimeToLive() != 0)
+        if (attrs.timeToLive != 0)
             throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
 
         for (ModificationStatement statement : statements)
         {
-            if (isSetTimestamp() && statement.isSetTimestamp())
-                throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
+            statement.validate(state);
 
-            if (statement.getTimeToLive() < 0)
-                throw new InvalidRequestException("A TTL must be greater or equal to 0");
+            if (attrs.timestamp != null && statement.isSetTimestamp())
+                throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
         }
     }
 
-    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+    public long getTimestamp(long now)
     {
-        for (ModificationStatement statement : statements)
-            statement.validateConsistency(cl);
+        return attrs.timestamp == null ? now : attrs.timestamp;
     }
 
-    @Override
-    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
         for (ModificationStatement statement : statements)
         {
             // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : statement.getMutationsInternal(variables, local, cl, getTimestamp(now), true))
+            for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true))
             {
-                if (m instanceof CounterMutation && type != Type.COUNTER)
-                    throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
-
-                if (m instanceof RowMutation && type == Type.COUNTER)
-                    throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches");
-
                 Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
                 IMutation existing = mutations.get(key);
 
@@ -114,7 +113,6 @@ public class BatchStatement extends ModificationStatement
                 }
                 else
                 {
-
                     existing.addAll(m);
                 }
             }
@@ -123,30 +121,74 @@ public class BatchStatement extends ModificationStatement
         return mutations.values();
     }
 
-    protected Collection<? extends IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
     {
-        // batch statements should not contain other batches
-        throw new UnsupportedOperationException();
-    }
+        if (cl == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
 
-    public ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException
-    {
-        // XXX: we use our knowledge that Modification don't create new statement upon call to prepare()
-        for (ModificationStatement statement : statements)
-        {
-            statement.prepare(boundNames);
-        }
-        return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
+        Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
+        if (type == Type.LOGGED && mutations.size() > 1)
+            StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl);
+        else
+            StorageProxy.mutate(mutations, cl);
+
+        return null;
     }
 
-    public ParsedStatement.Prepared prepare() throws InvalidRequestException
+    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
-        CFDefinition.Name[] boundNames = new CFDefinition.Name[getBoundsTerms()];
-        return prepare(boundNames);
+        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+            mutation.apply();
+        return null;
     }
 
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
     }
+
+    public static class Parsed extends CFStatement
+    {
+        private final Type type;
+        private final Attributes attrs;
+        private final List<ModificationStatement.Parsed> parsedStatements;
+
+        public Parsed(Type type, Attributes attrs, List<ModificationStatement.Parsed> parsedStatements)
+        {
+            super(null);
+            this.type = type;
+            this.attrs = attrs;
+            this.parsedStatements = parsedStatements;
+        }
+
+        @Override
+        public void prepareKeyspace(ClientState state) throws InvalidRequestException
+        {
+            for (ModificationStatement.Parsed statement : parsedStatements)
+                statement.prepareKeyspace(state);
+        }
+
+        public ParsedStatement.Prepared prepare() throws InvalidRequestException
+        {
+            ColumnSpecification[] boundNames = new ColumnSpecification[getBoundsTerms()];
+
+            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+            for (ModificationStatement.Parsed parsed : parsedStatements)
+            {
+                ModificationStatement stmt = parsed.prepare(boundNames);
+                if (stmt.hasConditions())
+                    throw new InvalidRequestException("Conditional updates are not allowed in batches");
+
+                if (stmt.isCounter() && type != Type.COUNTER)
+                    throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
+
+                if (!stmt.isCounter() && type == Type.COUNTER)
+                    throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches");
+
+                statements.add(stmt);
+            }
+
+            return new ParsedStatement.Prepared(new BatchStatement(getBoundsTerms(), type, statements, attrs), Arrays.<ColumnSpecification>asList(boundNames));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 5e2bb23..c2a7e00 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -27,81 +27,37 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>DELETE</code> parsed from a CQL query statement.
  */
 public class DeleteStatement extends ModificationStatement
 {
-    private CFDefinition cfDef;
-    private final List<Operation.RawDeletion> deletions;
-    private final List<Relation> whereClause;
-
-    private final List<Operation> toRemove;
-    private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
-
-    public DeleteStatement(CFName name, List<Operation.RawDeletion> deletions, List<Relation> whereClause, Attributes attrs)
+    private DeleteStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
     {
-        super(name, attrs);
-
-        this.deletions = deletions;
-        this.whereClause = whereClause;
-        this.toRemove = new ArrayList<Operation>(deletions.size());
+        super(boundTerms, cfm, attrs);
     }
 
-    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
+    protected boolean requireFullClusteringKey()
     {
-        if (type == Type.COUNTER)
-            cl.validateCounterForWrite(cfDef.cfm);
-        else
-            cl.validateForWrite(cfDef.cfm.ksName);
+        return false;
     }
 
-    public Collection<RowMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
-    throws RequestExecutionException, RequestValidationException
+    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    throws InvalidRequestException
     {
-        // keys
-        List<ByteBuffer> keys = UpdateStatement.buildKeyNames(cfDef, processedKeys, variables);
-
-        // columns
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        CFDefinition.Name firstEmpty = UpdateStatement.buildColumnNames(cfDef, processedKeys, builder, variables, false);
+        CFDefinition cfDef = cfm.getCfDef();
+        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+        List<Operation> deletions = getOperations();
 
         boolean fullKey = builder.componentCount() == cfDef.columns.size();
-        boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || toRemove.isEmpty());
-
-        if (!toRemove.isEmpty() && isRange)
-            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", firstEmpty, toRemove.get(0).columnName));
-
-        Set<ByteBuffer> toRead = null;
-        for (Operation op : toRemove)
-        {
-            if (op.requiresRead())
-            {
-                if (toRead == null)
-                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
-                toRead.add(op.columnName.key);
-            }
-        }
-
-        Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
-
-        Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size());
-        UpdateParameters params = new UpdateParameters(cfDef.cfm, variables, getTimestamp(now), -1, rows);
-
-        for (ByteBuffer key : keys)
-            rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, isBatch));
-
-        return rowMutations;
-    }
+        boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || deletions.isEmpty());
 
-    public RowMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, boolean isRange, UpdateParameters params, boolean isBatch)
-    throws InvalidRequestException
-    {
-        QueryProcessor.validateKey(key);
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(cfDef.cfm.ksName, columnFamily()));
+        if (!deletions.isEmpty() && isRange)
+            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
 
-        if (toRemove.isEmpty() && builder.componentCount() == 0)
+        if (deletions.isEmpty() && builder.componentCount() == 0)
         {
             // No columns specified, delete the row
             cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
@@ -110,7 +66,7 @@ public class DeleteStatement extends ModificationStatement
         {
             if (isRange)
             {
-                assert toRemove.isEmpty();
+                assert deletions.isEmpty();
                 ByteBuffer start = builder.build();
                 ByteBuffer end = builder.buildAsEndOfRange();
                 cf.addAtom(params.makeRangeTombstone(start, end));
@@ -125,64 +81,53 @@ public class DeleteStatement extends ModificationStatement
                 }
                 else
                 {
-                    for (Operation op : toRemove)
-                        op.execute(key, cf, builder.copy(), params);
+                    for (Operation deletion : deletions)
+                        deletion.execute(key, cf, builder.copy(), params);
                 }
             }
         }
 
-        RowMutation rm;
-        if (isBatch)
-        {
-            // we might group other mutations together with this one later, so make it mutable
-            rm = new RowMutation(cfDef.cfm.ksName, key);
-            rm.add(cf);
-        }
-        else
-        {
-            rm = new RowMutation(cfDef.cfm.ksName, key, cf);
-        }
-        return rm;
+        return cf;
     }
 
-    public ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException
+    public static class Parsed extends ModificationStatement.Parsed
     {
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
-        type = metadata.getDefaultValidator().isCommutative() ? Type.COUNTER : Type.LOGGED;
-
-        cfDef = metadata.getCfDef();
-        UpdateStatement.processKeys(cfDef, whereClause, processedKeys, boundNames);
-
-        for (Operation.RawDeletion deletion : deletions)
+        private final List<Operation.RawDeletion> deletions;
+        private final List<Relation> whereClause;
+
+        public Parsed(CFName name,
+                      Attributes attrs,
+                      List<Operation.RawDeletion> deletions,
+                      List<Relation> whereClause,
+                      List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions)
         {
-            CFDefinition.Name name = cfDef.get(deletion.affectedColumn());
-            if (name == null)
-                throw new InvalidRequestException(String.format("Unknown identifier %s", deletion.affectedColumn()));
-
-            // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
-            // list. However, we support having the value name for coherence with the static/sparse case
-            if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
-                throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", name));
-
-            Operation op = deletion.prepare(name);
-            op.collectMarkerSpecification(boundNames);
-            toRemove.add(op);
+            super(name, attrs, conditions, false);
+            this.deletions = deletions;
+            this.whereClause = whereClause;
         }
 
-        return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
-    }
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException
+        {
+            DeleteStatement stmt = new DeleteStatement(getBoundsTerms(), cfDef.cfm, attrs);
 
-    public ParsedStatement.Prepared prepare() throws InvalidRequestException
-    {
-        ColumnSpecification[] boundNames = new ColumnSpecification[getBoundsTerms()];
-        return prepare(boundNames);
-    }
+            for (Operation.RawDeletion deletion : deletions)
+            {
+                CFDefinition.Name name = cfDef.get(deletion.affectedColumn());
+                if (name == null)
+                    throw new InvalidRequestException(String.format("Unknown identifier %s", deletion.affectedColumn()));
+
+                // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
+                // list. However, we support having the value name for coherence with the static/sparse case
+                if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+                    throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", name));
+
+                Operation op = deletion.prepare(name);
+                op.collectMarkerSpecification(boundNames);
+                stmt.addOperation(op);
+            }
 
-    public String toString()
-    {
-        return String.format("DeleteStatement(name=%s, columns=%s, keys=%s)",
-                             cfName,
-                             deletions,
-                             whereClause);
+            stmt.processWhereClause(whereClause, boundNames);
+            return stmt;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 7fb7786..542b6d6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,46 +21,84 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.Pair;
 
-/**
- * Abstract class for statements that apply on a given column family.
+/*
+ * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
  */
-public abstract class ModificationStatement extends CFStatement implements CQLStatement
+public abstract class ModificationStatement implements CQLStatement
 {
     public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
+    private static final ColumnIdentifier RESULT_COLUMN = new ColumnIdentifier("result", false);
+
+    private final int boundTerms;
+    public final CFMetaData cfm;
+    private final Attributes attrs;
+
+    private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
+    private final List<Operation> columnOperations = new ArrayList<Operation>();
+
+    private List<Operation> columnConditions;
+    private boolean ifNotExists;
 
-    public static enum Type
+    public ModificationStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
     {
-        LOGGED, UNLOGGED, COUNTER
+        this.boundTerms = boundTerms;
+        this.cfm = cfm;
+        this.attrs = attrs;
     }
 
-    protected Type type;
+    protected abstract boolean requireFullClusteringKey();
+    public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
 
-    private Long timestamp;
-    private final int timeToLive;
+    public int getBoundsTerms()
+    {
+        return boundTerms;
+    }
 
-    public ModificationStatement(CFName name, Attributes attrs)
+    public String keyspace()
     {
-        this(name, attrs.timestamp, attrs.timeToLive);
+        return cfm.ksName;
     }
 
-    public ModificationStatement(CFName name, Long timestamp, int timeToLive)
+    public String columnFamily()
     {
-        super(name);
-        this.timestamp = timestamp;
-        this.timeToLive = timeToLive;
+        return cfm.cfName;
+    }
+
+    public boolean isCounter()
+    {
+        return cfm.getDefaultValidator().isCommutative();
+    }
+
+    public int getTimeToLive()
+    {
+        return attrs.timeToLive;
+    }
+
+    public long getTimestamp(long now)
+    {
+        return attrs.timestamp == null ? now : attrs.timestamp;
+    }
+
+    public boolean isSetTimestamp()
+    {
+        return attrs.timestamp != null;
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -70,74 +108,176 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        if (timeToLive < 0)
-            throw new InvalidRequestException("A TTL must be greater or equal to 0");
-
-        if (timeToLive > ExpiringColumn.MAX_TTL)
-            throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
+        attrs.validate();
     }
 
-    protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
+    public void addOperation(Operation op)
+    {
+        columnOperations.add(op);
+    }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public List<Operation> getOperations()
     {
-        if (cl == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
+        return columnOperations;
+    }
 
-        validateConsistency(cl);
+    public void addCondition(Operation op)
+    {
+        if (columnConditions == null)
+            columnConditions = new ArrayList<Operation>();
 
-        Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
+        columnConditions.add(op);
+    }
 
-        // The type should have been set by now or we have a bug
-        assert type != null;
+    public void setIfNotExistCondition()
+    {
+        ifNotExists = true;
+    }
 
-        switch (type)
-        {
-            case LOGGED:
-                if (mutations.size() > 1)
-                    StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl);
-                else
-                    StorageProxy.mutate(mutations, cl);
-                break;
-            case UNLOGGED:
-            case COUNTER:
-                StorageProxy.mutate(mutations, cl);
-                break;
-            default:
-                throw new AssertionError();
-        }
+    private void addKeyValues(ColumnIdentifier name, List<Term> values) throws InvalidRequestException
+    {
+        if (processedKeys.put(name, values) != null)
+            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+    }
 
-        return null;
+    public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException
+    {
+        addKeyValues(name, Collections.singletonList(value));
     }
 
-    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    public void processWhereClause(List<Relation> whereClause, ColumnSpecification[] names) throws InvalidRequestException
     {
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
-            mutation.apply();
-        return null;
+        CFDefinition cfDef = cfm.getCfDef();
+        for (Relation rel : whereClause)
+        {
+            CFDefinition.Name name = cfDef.get(rel.getEntity());
+            if (name == null)
+                throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
+
+            switch (name.kind)
+            {
+                case KEY_ALIAS:
+                case COLUMN_ALIAS:
+                    List<Term.Raw> rawValues;
+                    if (rel.operator() == Relation.Type.EQ)
+                        rawValues = Collections.singletonList(rel.getValue());
+                    else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
+                        rawValues = rel.getInValues();
+                    else
+                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
+
+                    List<Term> values = new ArrayList<Term>(rawValues.size());
+                    for (Term.Raw raw : rawValues)
+                    {
+                        Term t = raw.prepare(name);
+                        t.collectMarkerSpecification(names);
+                        values.add(t);
+                    }
+                    addKeyValues(name.name, values);
+                    break;
+                case VALUE_ALIAS:
+                case COLUMN_METADATA:
+                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
+            }
+        }
     }
 
-    public long getTimestamp(long now)
+    private List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+    throws InvalidRequestException
     {
-        return timestamp == null ? now : timestamp;
+        CFDefinition cfDef = cfm.getCfDef();
+        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
+        for (CFDefinition.Name name : cfDef.keys.values())
+        {
+            List<Term> values = processedKeys.get(name.name);
+            if (values == null || values.isEmpty())
+                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+
+            if (keyBuilder.remainingCount() == 1)
+            {
+                for (Term t : values)
+                {
+                    ByteBuffer val = t.bindAndGet(variables);
+                    if (val == null)
+                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                    keys.add(keyBuilder.copy().add(val).build());
+                }
+            }
+            else
+            {
+                if (values.size() > 1)
+                    throw new InvalidRequestException("IN is only supported on the last column of the partition key");
+                ByteBuffer val = values.get(0).bindAndGet(variables);
+                if (val == null)
+                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                keyBuilder.add(val);
+            }
+        }
+        return keys;
     }
 
-    public void setTimestamp(long timestamp)
+    private ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+    throws InvalidRequestException
     {
-        this.timestamp = timestamp;
+        CFDefinition cfDef = cfm.getCfDef();
+        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
+        CFDefinition.Name firstEmptyKey = null;
+        for (CFDefinition.Name name : cfDef.columns.values())
+        {
+            List<Term> values = processedKeys.get(name.name);
+            if (values == null || values.isEmpty())
+            {
+                firstEmptyKey = name;
+                if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
+                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+            }
+            else if (firstEmptyKey != null)
+            {
+                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name));
+            }
+            else
+            {
+                assert values.size() == 1; // We only allow IN for row keys so far
+                ByteBuffer val = values.get(0).bindAndGet(variables);
+                if (val == null)
+                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
+                builder.add(val);
+            }
+        }
+        return builder;
     }
 
-    public boolean isSetTimestamp()
+    protected CFDefinition.Name getFirstEmptyKey()
     {
-        return timestamp != null;
+        for (CFDefinition.Name name : cfm.getCfDef().columns.values())
+        {
+            List<Term> values = processedKeys.get(name.name);
+            if (values == null || values.isEmpty())
+                return name;
+        }
+        return null;
     }
 
-    public int getTimeToLive()
+    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+    throws RequestExecutionException, RequestValidationException
     {
-        return timeToLive;
+        // Lists SET operation incurs a read.
+        Set<ByteBuffer> toRead = null;
+        for (Operation op : columnOperations)
+        {
+            if (op.requiresRead())
+            {
+                if (toRead == null)
+                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
+                toRead.add(op.columnName.key);
+            }
+        }
+
+        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -153,13 +293,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         int i = 0;
         for (ByteBuffer name : toRead)
         {
-            ByteBuffer start = builder.copy().add(name).build();
-            ByteBuffer finish = builder.copy().add(name).buildAsEndOfRange();
+            ByteBuffer start = clusteringPrefix.copy().add(name).build();
+            ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
             slices[i++] = new ColumnSlice(start, finish);
         }
 
-        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
-        for (ByteBuffer key : keys)
+        List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
+        for (ByteBuffer key : partitionKeys)
             commands.add(new SliceFromReadCommand(keyspace(),
                                                   key,
                                                   columnFamily(),
@@ -187,6 +327,68 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         return map;
     }
 
+    public boolean hasConditions()
+    {
+        return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
+    }
+
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    throws RequestExecutionException, RequestValidationException
+    {
+        if (cl == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+
+        return hasConditions()
+             ? executeWithCondition(cl, queryState, variables)
+             : executeWithoutCondition(cl, queryState, variables);
+    }
+
+    private ResultMessage executeWithoutCondition(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    throws RequestExecutionException, RequestValidationException
+    {
+        if (isCounter())
+            cl.validateCounterForWrite(cfm);
+        else
+            cl.validateForWrite(cfm.ksName);
+
+        StorageProxy.mutate(getMutations(variables, false, cl, queryState.getTimestamp(), false), cl);
+        return null;
+    }
+
+    public ResultMessage executeWithCondition(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    throws RequestExecutionException, RequestValidationException
+    {
+        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+        // We don't support IN for CAS operation so far
+        if (keys.size() > 1)
+            throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
+
+        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(queryState.getTimestamp()), getTimeToLive(), null);
+
+        ByteBuffer key = keys.get(0);
+        ThriftValidation.validateKey(cfm, key);
+
+        ColumnFamily updates = updateForKey(key, clusteringPrefix, params);
+        ColumnFamily expected = buildConditions(key, clusteringPrefix, params);
+
+        boolean result = StorageProxy.cas(keyspace(), columnFamily(), key, expected, updates);
+
+        ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(new ColumnSpecification(keyspace(), columnFamily(), RESULT_COLUMN, BooleanType.instance)));
+        List<List<ByteBuffer>> newRows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(result)));
+        return new ResultMessage.Rows(new ResultSet(metadata, newRows));
+    }
+
+    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    {
+        if (hasConditions())
+            throw new UnsupportedOperationException();
+
+        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp(), false))
+            mutation.apply();
+        return null;
+    }
+
     /**
      * Convert statement into a list of mutations to apply on the server
      *
@@ -198,16 +400,133 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    protected Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
     throws RequestExecutionException, RequestValidationException
     {
-        return getMutationsInternal(variables, local, cl, now, false);
+        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+
+        // Some lists operation requires reading
+        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
+        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now), getTimeToLive(), rows);
+
+        Collection<IMutation> mutations = new ArrayList<IMutation>();
+        for (ByteBuffer key: keys)
+        {
+            ThriftValidation.validateKey(cfm, key);
+            ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
+            mutations.add(makeMutation(key, cf, cl, isBatch));
+        }
+        return mutations;
+    }
+
+    private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+    {
+        RowMutation rm;
+        if (isBatch)
+        {
+            // we might group other mutations together with this one later, so make it mutable
+            rm = new RowMutation(cfm.ksName, key);
+            rm.add(cf);
+        }
+        else
+        {
+            rm = new RowMutation(cfm.ksName, key, cf);
+        }
+        return isCounter() ? new CounterMutation(rm, cl) : rm;
+    }
+
+    private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params)
+    throws InvalidRequestException
+    {
+        if (ifNotExists)
+            return null;
+
+        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+        for (Operation condition : columnConditions)
+            condition.execute(key, cf, clusteringPrefix.copy(), params);
+        return cf;
     }
 
-    // hack to allow us to special-case RowMutation construction depending on if it's part of a batch
-    // (in which case we need the CF collection to be mutable), or not (in which case we can use more-efficient SingletonMap)
-    protected abstract Collection<? extends IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
-    throws RequestExecutionException, RequestValidationException;
+    public static abstract class Parsed extends CFStatement
+    {
+        protected final Attributes attrs;
+        private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
+        private final boolean ifNotExists;
+
+        protected Parsed(CFName name, Attributes attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists)
+        {
+            super(name);
+            this.attrs = attrs;
+            this.conditions = conditions;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public ParsedStatement.Prepared prepare() throws InvalidRequestException
+        {
+            ColumnSpecification[] boundNames = new ColumnSpecification[getBoundsTerms()];
+            ModificationStatement statement = prepare(boundNames);
+            return new ParsedStatement.Prepared(statement, Arrays.<ColumnSpecification>asList(boundNames));
+        }
 
-    public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException;
+        public ModificationStatement prepare(ColumnSpecification[] boundNames) throws InvalidRequestException
+        {
+            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+            CFDefinition cfDef = metadata.getCfDef();
+
+            ModificationStatement stmt = prepareInternal(cfDef, boundNames);
+
+            if (stmt.isCounter())
+                throw new InvalidRequestException("Conditional updates are not supported on counter tables");
+
+            if (ifNotExists)
+            {
+                // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
+                // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
+                assert conditions.isEmpty();
+                stmt.setIfNotExistCondition();
+            }
+            else
+            {
+                for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
+                {
+                    CFDefinition.Name name = cfDef.get(entry.left);
+                    if (name == null)
+                        throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
+
+                    /*
+                     * Lists column names are based on a server-side generated timeuuid. So we can't allow lists
+                     * operation or that would yield unexpected results (update that should apply wouldn't). So for
+                     * now, we just refuse lists, which also save use from having to bother about the read that some
+                     * list operation involve.
+                     */
+                    if (name.type instanceof ListType)
+                        throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name));
+
+                    Operation condition = entry.right.prepare(name);
+                    assert !condition.requiresRead();
+
+                    condition.collectMarkerSpecification(boundNames);
+
+                    switch (name.kind)
+                    {
+                        case KEY_ALIAS:
+                        case COLUMN_ALIAS:
+                            throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
+                        case VALUE_ALIAS:
+                        case COLUMN_METADATA:
+                            stmt.addCondition(condition);
+                            break;
+                    }
+                }
+            }
+
+            if (stmt.hasConditions() && attrs.timestamp != null)
+                throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+
+            return stmt;
+        }
+
+        protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 3213bd4..a841192 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -24,195 +24,34 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static org.apache.cassandra.cql.QueryProcessor.validateKey;
-import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
-
 /**
  * An <code>UPDATE</code> statement parsed from a CQL query statement.
  *
  */
 public class UpdateStatement extends ModificationStatement
 {
-    private CFDefinition cfDef;
-
-    // Provided for an UPDATE
-    private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations;
-    private final List<Relation> whereClause;
-
-    // Provided for an INSERT
-    private final List<ColumnIdentifier> columnNames;
-    private final List<Term.Raw> columnValues;
-
-    private final List<Operation> processedColumns = new ArrayList<Operation>();
-    private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
-
     private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-    /**
-     * Creates a new UpdateStatement from a column family name, columns map, consistency
-     * level, and key term.
-     *
-     * @param name column family being operated on
-     * @param operations a map of column operations to perform
-     * @param whereClause the where clause
-     * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
-     */
-    public UpdateStatement(CFName name,
-                           List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations,
-                           List<Relation> whereClause,
-                           Attributes attrs)
-    {
-        super(name, attrs);
-        this.operations = operations;
-        this.whereClause = whereClause;
-        this.columnNames = null;
-        this.columnValues = null;
-    }
-
-    /**
-     * Creates a new UpdateStatement from a column family name, a consistency level,
-     * key, and lists of column names and values.  It is intended for use with the
-     * alternate update format, <code>INSERT</code>.
-     *
-     * @param name column family being operated on
-     * @param columnNames list of column names
-     * @param columnValues list of column values (corresponds to names)
-     * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
-     */
-    public UpdateStatement(CFName name,
-                           Attributes attrs,
-                           List<ColumnIdentifier> columnNames,
-                           List<Term.Raw> columnValues)
-    {
-        super(name, attrs);
-        this.columnNames = columnNames;
-        this.columnValues = columnValues;
-        this.operations = null;
-        this.whereClause = null;
-    }
-
-    protected void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException
-    {
-        if (type == Type.COUNTER)
-            cl.validateCounterForWrite(cfDef.cfm);
-        else
-            cl.validateForWrite(cfDef.cfm.ksName);
-    }
-
-    public Collection<IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
-    throws RequestExecutionException, RequestValidationException
-    {
-        List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);
-
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        buildColumnNames(cfDef, processedKeys, builder, variables, true);
-
-        // Lists SET operation incurs a read.
-        Set<ByteBuffer> toRead = null;
-        for (Operation op : processedColumns)
-        {
-            if (op.requiresRead())
-            {
-                if (toRead == null)
-                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
-                toRead.add(op.columnName.key);
-            }
-        }
-
-        Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null;
-
-        Collection<IMutation> mutations = new LinkedList<IMutation>();
-        UpdateParameters params = new UpdateParameters(cfDef.cfm, variables, getTimestamp(now), getTimeToLive(), rows);
-
-        for (ByteBuffer key: keys)
-            mutations.add(mutationForKey(cfDef, key, builder, params, cl, isBatch));
-
-        return mutations;
-    }
-
-    // Returns the first empty component or null if none are
-    static CFDefinition.Name buildColumnNames(CFDefinition cfDef, Map<ColumnIdentifier, List<Term>> processed, ColumnNameBuilder builder, List<ByteBuffer> variables, boolean requireAllComponent)
-    throws InvalidRequestException
+    private UpdateStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
     {
-        CFDefinition.Name firstEmpty = null;
-        for (CFDefinition.Name name : cfDef.columns.values())
-        {
-            List<Term> values = processed.get(name.name);
-            if (values == null || values.isEmpty())
-            {
-                firstEmpty = name;
-                if (requireAllComponent && cfDef.isComposite && !cfDef.isCompact)
-                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
-            }
-            else if (firstEmpty != null)
-            {
-                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmpty.name, name.name));
-            }
-            else
-            {
-                assert values.size() == 1; // We only allow IN for row keys so far
-                ByteBuffer val = values.get(0).bindAndGet(variables);
-                if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
-                builder.add(val);
-            }
-        }
-        return firstEmpty;
+        super(boundTerms, cfm, attrs);
     }
 
-    static List<ByteBuffer> buildKeyNames(CFDefinition cfDef, Map<ColumnIdentifier, List<Term>> processed, List<ByteBuffer> variables)
-    throws InvalidRequestException
+    protected boolean requireFullClusteringKey()
     {
-        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
-        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        for (CFDefinition.Name name : cfDef.keys.values())
-        {
-            List<Term> values = processed.get(name.name);
-            if (values == null || values.isEmpty())
-                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
-
-            if (keyBuilder.remainingCount() == 1)
-            {
-                for (Term t : values)
-                {
-                    ByteBuffer val = t.bindAndGet(variables);
-                    if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                    keys.add(keyBuilder.copy().add(val).build());
-                }
-            }
-            else
-            {
-                if (values.size() > 1)
-                    throw new InvalidRequestException("IN is only supported on the last column of the partition key");
-                ByteBuffer val = values.get(0).bindAndGet(variables);
-                if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                keyBuilder.add(val);
-            }
-        }
-        return keys;
+        return true;
     }
 
-    /**
-     * Compute a row mutation for a single key
-     *
-     * @return row mutation
-     *
-     * @throws InvalidRequestException on the wrong request
-     */
-    private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ConsistencyLevel cl, boolean isBatch)
+    public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
-        validateKey(key);
-
-        QueryProcessor.validateKey(key);
-        ColumnFamily cf = UnsortedColumns.factory.create(Schema.instance.getCFMetaData(cfDef.cfm.ksName, cfDef.cfm.cfName));
+        CFDefinition cfDef = cfm.getCfDef();
+        ColumnFamily cf = UnsortedColumns.factory.create(cfm);
 
         // Inserting the CQL row marker (see #4361)
         // We always need to insert a marker, because of the following situation:
@@ -225,12 +64,14 @@ public class UpdateStatement extends ModificationStatement
         // 'DELETE FROM t WHERE k = 1' does remove the row entirely)
         //
         // We never insert markers for Super CF as this would confuse the thrift side.
-        if (cfDef.isComposite && !cfDef.isCompact && !cfDef.cfm.isSuper())
+        if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
         {
             ByteBuffer name = builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
             cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
         }
 
+        List<Operation> updates = getOperations();
+
         if (cfDef.isCompact)
         {
             if (builder.componentCount() == 0)
@@ -240,52 +81,55 @@ public class UpdateStatement extends ModificationStatement
             {
                 // compact + no compact value implies there is no column outside the PK. So no operation could
                 // have passed through validation
-                assert processedColumns.isEmpty();
+                assert updates.isEmpty();
                 setToEmptyOperation.execute(key, cf, builder.copy(), params);
             }
             else
             {
                 // compact means we don't have a row marker, so don't accept to set only the PK (Note: we
                 // could accept it and use an empty value!?)
-                if (processedColumns.isEmpty())
+                if (updates.isEmpty())
                     throw new InvalidRequestException(String.format("Missing mandatory column %s", cfDef.value));
 
-                for (Operation op : processedColumns)
-                    op.execute(key, cf, builder.copy(), params);
+                for (Operation update : updates)
+                    update.execute(key, cf, builder.copy(), params);
             }
         }
         else
         {
-            for (Operation op : processedColumns)
-                op.execute(key, cf, builder.copy(), params);
+            for (Operation update : updates)
+                update.execute(key, cf, builder.copy(), params);
         }
 
-        RowMutation rm;
-        if (isBatch)
-        {
-            // we might group other mutations together with this one later, so make it mutable
-            rm = new RowMutation(cfDef.cfm.ksName, key);
-            rm.add(cf);
-        }
-        else
-        {
-            rm = new RowMutation(cfDef.cfm.ksName, key, cf);
-        }
-        return type == Type.COUNTER ? new CounterMutation(rm, cl) : rm;
+        return cf;
     }
 
-    public ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException
+    public static class ParsedInsert extends ModificationStatement.Parsed
     {
-        // Deal here with the keyspace overwrite thingy to avoid mistake
-        CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily());
-        cfDef = metadata.getCfDef();
-
-        type = metadata.getDefaultValidator().isCommutative() ? Type.COUNTER : Type.LOGGED;
+        private final List<ColumnIdentifier> columnNames;
+        private final List<Term.Raw> columnValues;
+
+        /**
+         * A parsed <code>INSERT</code> statement.
+         *
+         * @param name column family being operated on
+         * @param columnNames list of column names
+         * @param columnValues list of column values (corresponds to names)
+         * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
+         */
+        public ParsedInsert(CFName name, Attributes attrs, List<ColumnIdentifier> columnNames, List<Term.Raw> columnValues)
+        {
+            super(name, attrs, Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList(), false);
+            this.columnNames = columnNames;
+            this.columnValues = columnValues;
+        }
 
-        if (operations == null)
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException
         {
+            UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), cfDef.cfm, attrs);
+
             // Created from an INSERT
-            if (type == Type.COUNTER)
+            if (stmt.isCounter())
                 throw new InvalidRequestException("INSERT statement are not allowed on counter tables, use UPDATE instead");
             if (columnNames.size() != columnValues.size())
                 throw new InvalidRequestException("Unmatched column names/values");
@@ -298,7 +142,6 @@ public class UpdateStatement extends ModificationStatement
                 if (name == null)
                     throw new InvalidRequestException(String.format("Unknown identifier %s", columnNames.get(i)));
 
-                // For UPDATES, the parser validates we don't set the same value twice but we must check it here for INSERT
                 for (int j = 0; j < i; j++)
                     if (name.name.equals(columnNames.get(j)))
                         throw new InvalidRequestException(String.format("Multiple definitions found for column %s", name));
@@ -311,22 +154,52 @@ public class UpdateStatement extends ModificationStatement
                     case COLUMN_ALIAS:
                         Term t = value.prepare(name);
                         t.collectMarkerSpecification(boundNames);
-                        if (processedKeys.put(name.name, Collections.singletonList(t)) != null)
-                            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+                        stmt.addKeyValue(name.name, t);
                         break;
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
                         Operation operation = new Operation.SetValue(value).prepare(name);
                         operation.collectMarkerSpecification(boundNames);
-                        processedColumns.add(operation);
+                        stmt.addOperation(operation);
                         break;
                 }
             }
+            return stmt;
         }
-        else
+    }
+
+    public static class ParsedUpdate extends ModificationStatement.Parsed
+    {
+        // Provided for an UPDATE
+        private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> updates;
+        private final List<Relation> whereClause;
+
+        /**
+         * Creates a new UpdateStatement from a column family name, columns map, consistency
+         * level, and key term.
+         *
+         * @param name column family being operated on
+         * @param attrs additional attributes for statement (timestamp, timeToLive)
+         * @param updates a map of column operations to perform
+         * @param whereClause the where clause
+         */
+        public ParsedUpdate(CFName name,
+                            Attributes attrs,
+                            List<Pair<ColumnIdentifier, Operation.RawUpdate>> updates,
+                            List<Relation> whereClause,
+                            List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions,
+                            boolean ifNotExists)
+        {
+            super(name, attrs, conditions, ifNotExists);
+            this.updates = updates;
+            this.whereClause = whereClause;
+        }
+
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException
         {
-            // Created from an UPDATE
-            for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : operations)
+            UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), cfDef.cfm, attrs);
+
+            for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
             {
                 CFDefinition.Name name = cfDef.get(entry.left);
                 if (name == null)
@@ -342,68 +215,13 @@ public class UpdateStatement extends ModificationStatement
                         throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
-                        processedColumns.add(operation);
+                        stmt.addOperation(operation);
                         break;
                 }
             }
-            processKeys(cfDef, whereClause, processedKeys, boundNames);
-        }
-
-        return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
-    }
-
-    public ParsedStatement.Prepared prepare() throws InvalidRequestException
-    {
-        ColumnSpecification[] names = new ColumnSpecification[getBoundsTerms()];
-        return prepare(names);
-    }
 
-    // Reused by DeleteStatement
-    static void processKeys(CFDefinition cfDef, List<Relation> keys, Map<ColumnIdentifier, List<Term>> processed, ColumnSpecification[] names) throws InvalidRequestException
-    {
-        for (Relation rel : keys)
-        {
-            CFDefinition.Name name = cfDef.get(rel.getEntity());
-            if (name == null)
-                throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
-
-            switch (name.kind)
-            {
-                case KEY_ALIAS:
-                case COLUMN_ALIAS:
-                    List<Term.Raw> rawValues;
-                    if (rel.operator() == Relation.Type.EQ)
-                        rawValues = Collections.singletonList(rel.getValue());
-                    else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
-                        rawValues = rel.getInValues();
-                    else
-                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
-
-                    List<Term> values = new ArrayList<Term>(rawValues.size());
-                    for (Term.Raw raw : rawValues)
-                    {
-                        Term t = raw.prepare(name);
-                        t.collectMarkerSpecification(names);
-                        values.add(t);
-                    }
-
-                    if (processed.put(name.name, values) != null)
-                        throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
-                    break;
-                case VALUE_ALIAS:
-                case COLUMN_METADATA:
-                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
-            }
+            stmt.processWhereClause(whereClause, boundNames);
+            return stmt;
         }
     }
-
-    public String toString()
-    {
-        return String.format("UpdateStatement(name=%s, keys=%s, columns=%s, timestamp=%s, timeToLive=%s)",
-                             cfName,
-                             whereClause,
-                             operations,
-                             isSetTimestamp() ? getTimestamp(-1) : "<now>",
-                             getTimeToLive());
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index c2ee226..bf4f5b3 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -110,6 +110,11 @@ public class Column implements OnDiskAtom
         return new Column(newName, value, timestamp);
     }
 
+    public Column withUpdatedTimestamp(long newTimestamp)
+    {
+        return new Column(name, value, newTimestamp);
+    }
+
     public ByteBuffer name()
     {
         return name;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
index c1ca18c..ecf7cfd 100644
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ b/src/java/org/apache/cassandra/db/DeletedColumn.java
@@ -44,6 +44,12 @@ public class DeletedColumn extends Column
     }
 
     @Override
+    public Column withUpdatedTimestamp(long newTimestamp)
+    {
+        return new DeletedColumn(name, value, newTimestamp);
+    }
+
+    @Override
     public boolean isMarkedForDelete()
     {
         // We don't rely on the column implementation because it could mistakenly return false if

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/db/ExpiringColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringColumn.java b/src/java/org/apache/cassandra/db/ExpiringColumn.java
index 80631ee..7660bad 100644
--- a/src/java/org/apache/cassandra/db/ExpiringColumn.java
+++ b/src/java/org/apache/cassandra/db/ExpiringColumn.java
@@ -84,6 +84,12 @@ public class ExpiringColumn extends Column
     }
 
     @Override
+    public Column withUpdatedTimestamp(long newTimestamp)
+    {
+        return new ExpiringColumn(name, value, newTimestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
     public int dataSize()
     {
         return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5673063..6e0c9e4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -197,7 +197,7 @@ public class StorageProxy implements StorageProxyMBean
      * @return true if the operation succeeds in updating the row
      */
     public static boolean cas(String table, String cfName, ByteBuffer key, ColumnFamily expected, ColumnFamily updates)
-    throws UnavailableException, IOException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException
+    throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(table, cfName);
 
@@ -214,7 +214,7 @@ public class StorageProxy implements StorageProxyMBean
             // are not large enough to bother with.
             List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive));
             if (liveEndpoints.size() < requiredParticipants)
-               throw new UnavailableException(ConsistencyLevel.SERIAL, requiredParticipants, liveEndpoints.size());
+                throw new UnavailableException(ConsistencyLevel.SERIAL, requiredParticipants, liveEndpoints.size());
 
             // prepare
             logger.debug("Preparing {}", ballot);
@@ -265,12 +265,7 @@ public class StorageProxy implements StorageProxyMBean
                                     : new SliceByNamesReadCommand(table, key, cfName, new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getColumnNames())));
             List<Row> rows = read(Arrays.asList(readCommand), ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
-            if ((current == null) != (expected == null))
-            {
-                logger.debug("CAS precondition {} does not match current values {}", expected, current);
-                return false;
-            }
-            if (current != null && !com.google.common.base.Objects.equal(current.asMap(), expected.asMap()))
+            if (!casApplies(expected, current))
             {
                 logger.debug("CAS precondition {} does not match current values {}", expected, current);
                 return false;
@@ -295,6 +290,39 @@ public class StorageProxy implements StorageProxyMBean
         throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1);
     }
 
+    private static boolean hasLiveColumns(ColumnFamily cf)
+    {
+        return cf != null && !cf.hasOnlyTombstones();
+    }
+
+    private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
+    {
+        if (!hasLiveColumns(expected))
+            return !hasLiveColumns(current);
+        else if (!hasLiveColumns(current))
+            return false;
+
+        // current has been built from expected, so we know that it can't have columns
+        // that excepted don't have. So we just check that for each columns in expected:
+        //   - if it is a tombstone, whether current has no column or a tombstone;
+        //   - otherwise, that current has a live column with the same value.
+        for (Column e : expected)
+        {
+            Column c = current.getColumn(e.name());
+            if (e.isLive())
+            {
+                if (!(c != null && c.isLive() && c.value().equals(e.value())))
+                    return false;
+            }
+            else
+            {
+                if (c != null && c.isLive())
+                    return false;
+            }
+        }
+        return true;
+    }
+
     private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants)
     throws WriteTimeoutException, UnavailableException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index b12a6f7..82a43e6 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -94,7 +94,7 @@ public class Commit
         ColumnFamily cf = updates.cloneMeShallow();
         long t = UUIDGen.microsTimestamp(ballot);
         for (Column column : updates)
-            cf.addColumn(column.name(), column.value(), t);
+            cf.addAtom(column.withUpdatedTimestamp(t));
         return cf;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e431fb72/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 6a70344..6b5eb9a 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -782,10 +782,6 @@ public class CassandraServer implements Cassandra.Iface
             schedule(DatabaseDescriptor.getWriteRpcTimeout());
             return StorageProxy.cas(cState.getKeyspace(), column_family, key, cfExpected, cfUpdates);
         }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
         catch (RequestTimeoutException e)
         {
             throw ThriftConversion.toThrift(e);


Mime
View raw message