cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject git commit: Extend triggers to support CAS updates
Date Tue, 18 Mar 2014 20:45:19 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 5b3b52f59 -> 75ff51e12


Extend triggers to support CAS updates

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6882


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75ff51e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75ff51e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75ff51e1

Branch: refs/heads/cassandra-2.0
Commit: 75ff51e12485c16f3c408d40f357e07bb26905ea
Parents: 5b3b52f
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Tue Mar 18 23:42:33 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Tue Mar 18 23:44:53 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/service/StorageProxy.java  |  10 ++
 .../cassandra/triggers/TriggerExecutor.java     |  40 ++++-
 .../apache/cassandra/triggers/TriggersTest.java | 180 +++++++++++++++++++
 4 files changed, 229 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e738a2e..9caca38 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837)
  * Add paranoid disk failure option (CASSANDRA-6646)
  * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
+ * Extend triggers to support CAS updates (CASSANDRA-6882)
 Merged from 1.2:
  * add extra SSL cipher suites (CASSANDRA-6613)
  * fix nodetool getsstables for blob PK (CASSANDRA-6803)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index a5542e6..fda9819 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -234,6 +234,16 @@ public class StorageProxy implements StorageProxyMBean
 
             // finish the paxos round w/ the desired updates
             // TODO turn null updates into delete?
+
+            // Apply triggers to cas updates. A consideration here is that
+            // triggers emit RowMutations, and so a given trigger implementation
+            // may generate mutations for partitions other than the one this
+            // paxos round is scoped for. In this case, TriggerExecutor will
+            // validate that the generated mutations are targetted at the same
+            // partition as the initial updates and reject (via an
+            // InvalidRequestException) any which aren't.
+            updates = TriggerExecutor.instance.execute(key, updates);
+
             Commit proposal = Commit.newProposal(key, ballot, updates);
             Tracing.trace("CAS precondition is met; proposing client-requested updates for
{}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 4b3c24a..8ccf937 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -33,8 +34,10 @@ import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HeapAllocator;
 
 public class TriggerExecutor
 {
@@ -62,6 +65,21 @@ public class TriggerExecutor
         cachedTriggers.clear();
     }
 
+    public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
+    {
+        List<RowMutation> intermediate = executeInternal(key, updates);
+        if (intermediate == null)
+            return updates;
+
+        validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key,
intermediate);
+
+        for (RowMutation mutation : intermediate)
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+                updates.addAll(cf, HeapAllocator.instance);
+
+        return updates;
+    }
+
     public Collection<RowMutation> execute(Collection<? extends IMutation> updates)
throws InvalidRequestException
     {
         boolean hasCounters = false;
@@ -70,7 +88,7 @@ public class TriggerExecutor
         {
             for (ColumnFamily cf : mutation.getColumnFamilies())
             {
-                List<RowMutation> intermediate = execute(mutation.key(), cf);
+                List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
                 if (intermediate == null)
                     continue;
 
@@ -88,6 +106,24 @@ public class TriggerExecutor
         return tmutations;
     }
 
+    private void validateForSinglePartition(AbstractType<?> keyValidator,
+                                            UUID cfId,
+                                            ByteBuffer key,
+                                            Collection<RowMutation> tmutations)
+    throws InvalidRequestException
+    {
+        for (RowMutation mutation : tmutations)
+        {
+            if (keyValidator.compare(mutation.key(), key) != 0)
+                throw new InvalidRequestException("Partition key of additional mutation does
not match primary update key");
+
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+                if (!cf.id().equals(cfId))
+                    throw new InvalidRequestException("Column family of additional mutation
does not match primary update cf");
+        }
+        validate(tmutations);
+    }
+
     private void validate(Collection<RowMutation> tmutations) throws InvalidRequestException
     {
         for (RowMutation mutation : tmutations)
@@ -103,7 +139,7 @@ public class TriggerExecutor
      * Switch class loader before using the triggers for the column family, if
      * not loaded them with the custom class loader.
      */
-    private List<RowMutation> execute(ByteBuffer key, ColumnFamily columnFamily)
+    private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
     {
         Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
         if (triggers.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 6ca3880..5b9b27d 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -35,10 +35,12 @@ import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.TFramedTransportFactory;
 import org.apache.cassandra.thrift.ThriftServer;
@@ -46,6 +48,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TriggersTest extends SchemaLoader
 {
@@ -54,6 +57,7 @@ public class TriggersTest extends SchemaLoader
 
     private static String ksName = "triggers_test_ks";
     private static String cfName = "test_table";
+    private static String otherCf = "other_table";
 
     @Before
     public void setup() throws Exception
@@ -73,6 +77,9 @@ public class TriggersTest extends SchemaLoader
         cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY
KEY (k))", ksName, cfName);
         QueryProcessor.process(cql, ConsistencyLevel.ONE);
 
+        cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY
KEY (k))", ksName, otherCf);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
         // no conditional execution of create trigger stmt yet
         if (! triggerCreated)
         {
@@ -148,13 +155,157 @@ public class TriggersTest extends SchemaLoader
         assertUpdateIsAugmented(3);
     }
 
+    @Test
+    public void executeTriggerOnCqlInsertWithConditions() throws Exception
+    {
+        String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (4, 4) IF NOT EXISTS",
ksName, cfName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        assertUpdateIsAugmented(4);
+    }
+
+    @Test
+    public void executeTriggerOnCqlBatchWithConditions() throws Exception
+    {
+        String cql = String.format("BEGIN BATCH " +
+                                   "  INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5) IF NOT
EXISTS; " +
+                                   "  INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5); " +
+                                   "APPLY BATCH",
+                                    ksName, cfName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        assertUpdateIsAugmented(5);
+    }
+
+    @Test
+    public void executeTriggerOnThriftCASOperation() throws Exception
+    {
+        Cassandra.Client client = new Cassandra.Client(
+                new TBinaryProtocol(
+                        new TFramedTransportFactory().openTransport(
+                                InetAddress.getLocalHost().getHostName(), 9170)));
+        client.set_keyspace(ksName);
+        client.cas(ByteBufferUtil.bytes(6),
+                   cfName,
+                   Collections.EMPTY_LIST,
+                   Collections.singletonList(getColumnForInsert("v1", 6)),
+                   org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                   org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+        assertUpdateIsAugmented(6);
+    }
+
+    // Unfortunately, an IRE thrown from StorageProxy.cas
+    // results in a RuntimeException from QueryProcessor.process
+    @Test(expected=RuntimeException.class)
+    public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentPartition() throws
Exception
+    {
+        String cf = "cf" + System.nanoTime();
+        try
+        {
+            setupTableWithTrigger(cf, CrossPartitionTrigger.class);
+            String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (7, 7) IF NOT EXISTS",
ksName, cf);
+            QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        }
+        finally
+        {
+            assertUpdateNotExecuted(cf, 7);
+        }
+    }
+
+    // Unfortunately, an IRE thrown from StorageProxy.cas
+    // results in a RuntimeException from QueryProcessor.process
+    @Test(expected=RuntimeException.class)
+    public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentTable() throws
Exception
+    {
+        String cf = "cf" + System.nanoTime();
+        try
+        {
+            setupTableWithTrigger(cf, CrossTableTrigger.class);
+            String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (8, 8) IF NOT EXISTS",
ksName, cf);
+            QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        }
+        finally
+        {
+            assertUpdateNotExecuted(cf, 7);
+        }
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void onThriftCASRejectGeneratedUpdatesForDifferentPartition() throws Exception
+    {
+        String cf = "cf" + System.nanoTime();
+        try
+        {
+            setupTableWithTrigger(cf, CrossPartitionTrigger.class);
+            Cassandra.Client client = new Cassandra.Client(
+                    new TBinaryProtocol(
+                            new TFramedTransportFactory().openTransport(
+                                    InetAddress.getLocalHost().getHostName(), 9170)));
+            client.set_keyspace(ksName);
+            client.cas(ByteBufferUtil.bytes(9),
+                       cf,
+                       Collections.EMPTY_LIST,
+                       Collections.singletonList(getColumnForInsert("v1", 9)),
+                       org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                       org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+        }
+        finally
+        {
+            assertUpdateNotExecuted(cf, 9);
+        }
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void onThriftCASRejectGeneratedUpdatesForDifferentCF() throws Exception
+    {
+        String cf = "cf" + System.nanoTime();
+        try
+        {
+            setupTableWithTrigger(cf, CrossTableTrigger.class);
+            Cassandra.Client client = new Cassandra.Client(
+                    new TBinaryProtocol(
+                            new TFramedTransportFactory().openTransport(
+                                    InetAddress.getLocalHost().getHostName(), 9170)));
+            client.set_keyspace(ksName);
+            client.cas(ByteBufferUtil.bytes(10),
+                       cf,
+                       Collections.EMPTY_LIST,
+                       Collections.singletonList(getColumnForInsert("v1", 10)),
+                       org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                       org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+        }
+        finally
+        {
+            assertUpdateNotExecuted(cf, 10);
+        }
+    }
+
+    private void setupTableWithTrigger(String cf, Class<? extends ITrigger> triggerImpl)
+    throws RequestExecutionException
+    {
+        String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int,
PRIMARY KEY (k))", ksName, cf);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+        // no conditional execution of create trigger stmt yet
+        cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'",
+                            ksName, cf, triggerImpl.getName());
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+    }
+
     private void assertUpdateIsAugmented(int key)
     {
         UntypedResultSet rs = QueryProcessor.processInternal(
                                 String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName,
key));
+        assertTrue(String.format("Expected value (%s) for augmented cell v2 was not found",
key), rs.one().has("v2"));
         assertEquals(999, rs.one().getInt("v2"));
     }
 
+    private void assertUpdateNotExecuted(String cf, int key)
+    {
+        UntypedResultSet rs = QueryProcessor.processInternal(
+                String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cf, key));
+        assertTrue(rs.isEmpty());
+    }
+
     private org.apache.cassandra.thrift.Column getColumnForInsert(String columnName, int
value)
     {
         org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
@@ -176,4 +327,33 @@ public class TriggersTest extends SchemaLoader
             return Collections.singletonList(rm);
         }
     }
+
+    public static class CrossPartitionTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory,
false);
+            extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
+                                             ByteBufferUtil.bytes(999)));
+
+            int newKey = ByteBufferUtil.toInt(key) + 1000;
+            RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
+            rm.add(extraUpdate);
+            return Collections.singletonList(rm);
+        }
+    }
+
+    public static class CrossTableTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
+            extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
+                                             ByteBufferUtil.bytes(999)));
+
+            RowMutation rm = new RowMutation(ksName, key);
+            rm.add(extraUpdate);
+            return Collections.singletonList(rm);
+        }
+    }
 }


Mime
View raw message