cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Correctly handle null in conditions with TTL
Date Thu, 06 Feb 2014 07:39:54 GMT
Updated Branches:
  refs/heads/cassandra-2.0 58e948185 -> e59ef16bf


Correctly handle null in conditions with TTL

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6623


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e59ef16b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e59ef16b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e59ef16b

Branch: refs/heads/cassandra-2.0
Commit: e59ef16bfcb3bd019202fc12bedeb04302066540
Parents: 58e9481
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Thu Feb 6 08:36:12 2014 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Feb 6 08:36:12 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/ModificationStatement.java  | 109 +++++++++++++++----
 .../apache/cassandra/service/CASConditions.java |  38 +++++++
 .../apache/cassandra/service/StorageProxy.java  |  75 ++-----------
 .../cassandra/thrift/CassandraServer.java       |  64 ++++++++++-
 5 files changed, 197 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bba5f20..7ba8044 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.6
+ * Correctly handle null with IF conditions and TTL (CASSANDRA-6623)
 Merged from 1.2:
  * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c0bf428..2567043 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,19 +27,20 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.CASConditions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /*
  * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -415,16 +416,17 @@ public abstract class ModificationStatement implements CQLStatement,
MeasurableF
         UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(),
getTimeToLive(variables), null);
         ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams);
 
-        // When building the conditions, we should not use the TTL. It's not useful, and
if a very low ttl (1 seconds) is used, it's possible
-        // for it to expire before actually build the conditions which would break since
we would then test for the presence of tombstones.
-        UpdateParameters condParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(),
0, null);
-        ColumnFamily expected = buildConditions(key, clusteringPrefix, condParams);
+        // It's cleaner to use the query timestamp below, but it's in seconds while the conditions
expects microseconds, so just
+        // put it back in millis (we don't really lose precision because the ultimate consumer,
Column.isLive, re-divide it).
+        long now = queryState.getTimestamp() * 1000;
+        CASConditions conditions = ifNotExists
+                                 ? new NotExistCondition(clusteringPrefix, now)
+                                 : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions,
variables, now);
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
                                                key,
-                                               clusteringPrefix,
-                                               expected,
+                                               conditions,
                                                updates,
                                                options.getSerialConsistency(),
                                                options.getConsistency());
@@ -542,28 +544,91 @@ public abstract class ModificationStatement implements CQLStatement,
MeasurableF
         return isCounter() ? new CounterMutation(rm, cl) : rm;
     }
 
-    private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix,
UpdateParameters params)
-    throws InvalidRequestException
+    private static abstract class CQL3CasConditions implements CASConditions
     {
-        if (ifNotExists)
-            return null;
+        protected final ColumnNameBuilder rowPrefix;
+        protected final long now;
 
-        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+        protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
+        {
+            this.rowPrefix = rowPrefix;
+            this.now = now;
+        }
 
-        // CQL row marker
-        CFDefinition cfDef = cfm.getCfDef();
-        if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
+        public IDiskAtomFilter readFilter()
         {
-            ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
-            cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+            // We always read the row entirely as on CAS failure we want to be able to distinguish
between "row exists
+            // but all values on why there were conditions are null" and "row doesn't exists",
and we can't rely on the
+            // row marker for that (see #6623)
+            return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(),
false, 1, rowPrefix.componentCount());
         }
+    }
 
-        // Conditions
-        for (Operation condition : columnConditions)
-            condition.execute(key, cf, clusteringPrefix.copy(), params);
+    private static class NotExistCondition extends CQL3CasConditions
+    {
+        private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            return current == null || current.hasOnlyTombstones(now);
+        }
+    }
+
+    private static class ColumnsConditions extends CQL3CasConditions
+    {
+        private final ColumnFamily expected;
+
+        private ColumnsConditions(ColumnNameBuilder rowPrefix,
+                                  CFMetaData cfm,
+                                  ByteBuffer key,
+                                  Collection<Operation> conditions,
+                                  List<ByteBuffer> variables,
+                                  long now) throws InvalidRequestException
+        {
+            super(rowPrefix, now);
+            this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
 
-        assert !cf.isEmpty();
-        return cf;
+            // When building the conditions, we should not use a TTL. It's not useful, and
if a very low ttl (1 seconds) is used, it's possible
+            // for it to expire before the actual build of the conditions which would break
since we would then testing for the presence of tombstones.
+            UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
+
+            // Conditions
+            for (Operation condition : conditions)
+                condition.execute(key, expected, rowPrefix.copy(), params);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            if (current == null)
+                return false;
+
+            for (Column e : expected)
+            {
+                Column c = current.getColumn(e.name());
+                if (e.isLive(now))
+                {
+                    if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+                        return false;
+                }
+                else
+                {
+                    // If we have a tombstone in expected, it means the condition tests that
the column is
+                    // null, so check that we have no value
+                    if (c != null && c.isLive(now))
+                        return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public String toString()
+        {
+            return expected.toString();
+        }
     }
 
     public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java
new file mode 100644
index 0000000..d4b3e19
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/CASConditions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+
+/**
+ * Abstract the conditions to be fulfilled by a CAS operation.
+ */
+public interface CASConditions
+{
+    /**
+     * The filter to use to fetch the value to compare for the CAS.
+     */
+    public IDiskAtomFilter readFilter();
+
+    /**
+     * Returns whether the provided CF, that represents the values fetched using the
+     * readFilter(), match the CAS conditions this object stands for.
+     */
+    public boolean appliesTo(ColumnFamily current);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5671655..8d1f913 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -41,11 +41,8 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
@@ -157,7 +154,7 @@ public class StorageProxy implements StorageProxyMBean
 
     /**
      * Apply @param updates if and only if the current values in the row for @param key
-     * match the ones given by @param expected.  The algorithm is "raw" Paxos: that is, Paxos
+     * match the provided @param conditions.  The algorithm is "raw" Paxos: that is, Paxos
      * minus leader election -- any node in the cluster may propose changes for any row,
      * which (that is, the row) is the unit of values being proposed, not single columns.
      *
@@ -189,23 +186,18 @@ public class StorageProxy implements StorageProxyMBean
      * @param keyspaceName the keyspace for the CAS
      * @param cfName the column family for the CAS
      * @param key the row key for the row to CAS
-     * @param prefix a column name prefix that selects the CQL3 row to check if {@code expected}
is null. If {@code expected}
-     * is not null, this is ignored. If {@code expected} is null and this is null, the full
row existing is checked (by querying
-     * the first live column of the row).
-     * @param expected the expected column values. This can be null to check for existence
(see {@code prefix}).
-     * @param updates the value to insert if {@code expected matches the current values}.
+     * @param conditions the conditions for the CAS to apply.
+     * @param updates the value to insert if {@code condtions} matches the current values.
      * @param consistencyForPaxos the consistency for the paxos prepare and propose round.
This can only be either SERIAL or LOCAL_SERIAL.
      * @param consistencyForCommit the consistency for write done during the commit phase.
This can be anything, except SERIAL or LOCAL_SERIAL.
      *
-     * @return null if the operation succeeds in updating the row, or the current values
for the columns contained in
-     * expected (since, if the CAS doesn't succeed, it means the current value do not match
the one in expected). If
-     * expected == null and the CAS is unsuccessfull, the first live column of the CF is
returned.
+     * @return null if the operation succeeds in updating the row, or the current values
corresponding to conditions.
+     * (since, if the CAS doesn't succeed, it means the current value do not match the conditions).
      */
     public static ColumnFamily cas(String keyspaceName,
                                    String cfName,
                                    ByteBuffer key,
-                                   ColumnNameBuilder prefix,
-                                   ColumnFamily expected,
+                                   CASConditions conditions,
                                    ColumnFamily updates,
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit)
@@ -227,27 +219,15 @@ public class StorageProxy implements StorageProxyMBean
 
             UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants,
consistencyForPaxos);
 
-            // read the current value and compare with expected
+            // read the current values and check they validate the conditions
             Tracing.trace("Reading existing values for CAS precondition");
             long timestamp = System.currentTimeMillis();
-            ReadCommand readCommand;
-            if (expected == null || expected.isEmpty())
-            {
-                SliceQueryFilter filter = prefix == null
-                                        ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
-                                        : new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(),
false, 1, prefix.componentCount());
-                readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp,
filter);
-            }
-            else
-            {
-                assert !expected.isEmpty();
-                readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp,
new NamesQueryFilter(ImmutableSortedSet.copyOf(metadata.comparator, expected.getColumnNames())));
-            }
+            ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp,
conditions.readFilter());
             List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos ==
ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
             ColumnFamily current = rows.get(0).cf;
-            if (!casApplies(expected, current))
+            if (!conditions.appliesTo(current))
             {
-                Tracing.trace("CAS precondition {} does not match current values {}", expected,
current);
+                Tracing.trace("CAS precondition {} does not match current values {}", conditions,
current);
                 // We should not return null as this means success
                 return current == null ? EmptyColumns.factory.create(metadata) : current;
             }
@@ -274,41 +254,6 @@ public class StorageProxy implements StorageProxyMBean
         throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
     }
 
-    private static boolean hasLiveColumns(ColumnFamily cf, long now)
-    {
-        return cf != null && !cf.hasOnlyTombstones(now);
-    }
-
-    private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
-    {
-        long now = System.currentTimeMillis();
-
-        if (!hasLiveColumns(expected, now))
-            return !hasLiveColumns(current, now);
-        else if (!hasLiveColumns(current, now))
-            return false;
-
-        // current has been built from expected, so we know that it can't have columns
-        // that excepted don't have. So we just check that for each columns in expected:
-        //   - if it is a tombstone, whether current has no column or a tombstone;
-        //   - otherwise, that current has a live column with the same value.
-        for (Column e : expected)
-        {
-            Column c = current.getColumn(e.name());
-            if (e.isLive(now))
-            {
-                if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
-                    return false;
-            }
-            else
-            {
-                if (c != null && c.isLive(now))
-                    return false;
-            }
-        }
-        return true;
-    }
-
     private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
     {
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index beaae78..ef5eeb8 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,6 +30,7 @@ import java.util.zip.Inflater;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.CASConditions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageProxy;
@@ -768,8 +770,7 @@ public class CassandraServer implements Cassandra.Iface
             ColumnFamily result = StorageProxy.cas(cState.getKeyspace(),
                                                    column_family,
                                                    key,
-                                                   null,
-                                                   cfExpected,
+                                                   new ThriftCASConditions(cfExpected),
                                                    cfUpdates,
                                                    ThriftConversion.fromThrift(serial_consistency_level),
                                                    ThriftConversion.fromThrift(commit_consistency_level));
@@ -2158,5 +2159,62 @@ public class CassandraServer implements Cassandra.Iface
             }
         });
     }
-    // main method moved to CassandraDaemon
+
+    private static class ThriftCASConditions implements CASConditions
+    {
+        private final ColumnFamily expected;
+
+        private ThriftCASConditions(ColumnFamily expected)
+        {
+            this.expected = expected;
+        }
+
+        public IDiskAtomFilter readFilter()
+        {
+            return expected == null || expected.isEmpty()
+                 ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER,
false, 1)
+                 : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getComparator(),
expected.getColumnNames()));
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            long now = System.currentTimeMillis();
+
+            if (!hasLiveColumns(expected, now))
+                return !hasLiveColumns(current, now);
+            else if (!hasLiveColumns(current, now))
+                return false;
+
+            // current has been built from expected, so we know that it can't have columns
+            // that excepted don't have. So we just check that for each columns in expected:
+            //   - if it is a tombstone, whether current has no column or a tombstone;
+            //   - otherwise, that current has a live column with the same value.
+            for (org.apache.cassandra.db.Column e : expected)
+            {
+                org.apache.cassandra.db.Column c = current.getColumn(e.name());
+                if (e.isLive(now))
+                {
+                    if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+                        return false;
+                }
+                else
+                {
+                    if (c != null && c.isLive(now))
+                        return false;
+                }
+            }
+            return true;
+        }
+
+        private static boolean hasLiveColumns(ColumnFamily cf, long now)
+        {
+            return cf != null && !cf.hasOnlyTombstones(now);
+        }
+
+        @Override
+        public String toString()
+        {
+            return expected.toString();
+        }
+    }
 }


Mime
View raw message