cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/3] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Date Tue, 18 Mar 2014 21:09:43 GMT
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/triggers/TriggerExecutor.java
	test/unit/org/apache/cassandra/triggers/TriggersTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8440bc55
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8440bc55
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8440bc55

Branch: refs/heads/trunk
Commit: 8440bc550226b20913d2fc77b8c00ff675f4b8cc
Parents: fc33c95 75ff51e
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Mar 19 00:05:49 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Mar 19 00:05:49 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/service/StorageProxy.java  |  10 +
 .../cassandra/triggers/TriggerExecutor.java     |  44 ++++-
 .../apache/cassandra/triggers/TriggersTest.java | 190 ++++++++++++++++++-
 4 files changed, 236 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8440bc55/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c2b3606,9caca38..f03614c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -43,6 -17,8 +43,7 @@@ Merged from 2.0
   * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837)
   * Add paranoid disk failure option (CASSANDRA-6646)
   * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
+  * Extend triggers to support CAS updates (CASSANDRA-6882)
 -Merged from 1.2:
   * add extra SSL cipher suites (CASSANDRA-6613)
   * fix nodetool getsstables for blob PK (CASSANDRA-6803)
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8440bc55/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 64e06fe,fda9819..b316921
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -237,6 -234,16 +237,16 @@@ public class StorageProxy implements St
  
              // finish the paxos round w/ the desired updates
              // TODO turn null updates into delete?
+ 
+             // Apply triggers to cas updates. A consideration here is that
 -            // triggers emit RowMutations, and so a given trigger implementation
++            // triggers emit Mutations, and so a given trigger implementation
+             // may generate mutations for partitions other than the one this
+             // paxos round is scoped for. In this case, TriggerExecutor will
+             // validate that the generated mutations are targetted at the same
+             // partition as the initial updates and reject (via an
+             // InvalidRequestException) any which aren't.
+             updates = TriggerExecutor.instance.execute(key, updates);
+ 
              Commit proposal = Commit.newProposal(key, ballot, updates);
              Tracing.trace("CAS precondition is met; proposing client-requested updates for
{}", ballot);
              if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8440bc55/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 91d0ea0,8ccf937..8a8c51d
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@@ -29,10 -30,14 +30,11 @@@ import com.google.common.collect.Maps
  
  import org.apache.cassandra.config.TriggerDefinition;
  import org.apache.cassandra.cql.QueryProcessor;
 -import org.apache.cassandra.db.ColumnFamily;
 -import org.apache.cassandra.db.CounterMutation;
 -import org.apache.cassandra.db.IMutation;
 -import org.apache.cassandra.db.RowMutation;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.CellName;
+ import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.HeapAllocator;
  
  public class TriggerExecutor
  {
@@@ -60,15 -65,30 +62,33 @@@
          cachedTriggers.clear();
      }
  
+     public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
+     {
 -        List<RowMutation> intermediate = executeInternal(key, updates);
++        List<Mutation> intermediate = executeInternal(key, updates);
+         if (intermediate == null)
+             return updates;
+ 
+         validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key,
intermediate);
+ 
 -        for (RowMutation mutation : intermediate)
++        for (Mutation mutation : intermediate)
++        {
+             for (ColumnFamily cf : mutation.getColumnFamilies())
 -                updates.addAll(cf, HeapAllocator.instance);
 -
++            {
++                updates.addAll(cf);
++            }
++        }
+         return updates;
+     }
+ 
 -    public Collection<RowMutation> execute(Collection<? extends IMutation> updates)
throws InvalidRequestException
 +    public Collection<Mutation> execute(Collection<? extends IMutation> updates)
throws InvalidRequestException
      {
          boolean hasCounters = false;
 -        Collection<RowMutation> tmutations = null;
 +        Collection<Mutation> tmutations = null;
          for (IMutation mutation : updates)
          {
              for (ColumnFamily cf : mutation.getColumnFamilies())
              {
-                 List<Mutation> intermediate = execute(mutation.key(), cf);
 -                List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
++                List<Mutation> intermediate = executeInternal(mutation.key(), cf);
                  if (intermediate == null)
                      continue;
  
@@@ -86,9 -106,27 +106,29 @@@
          return tmutations;
      }
  
+     private void validateForSinglePartition(AbstractType<?> keyValidator,
+                                             UUID cfId,
+                                             ByteBuffer key,
 -                                            Collection<RowMutation> tmutations)
++                                            Collection<Mutation> tmutations)
+     throws InvalidRequestException
+     {
 -        for (RowMutation mutation : tmutations)
++        for (Mutation mutation : tmutations)
+         {
+             if (keyValidator.compare(mutation.key(), key) != 0)
+                 throw new InvalidRequestException("Partition key of additional mutation
does not match primary update key");
+ 
+             for (ColumnFamily cf : mutation.getColumnFamilies())
 -                if (!cf.id().equals(cfId))
++            {
++                if (! cf.id().equals(cfId))
+                     throw new InvalidRequestException("Column family of additional mutation
does not match primary update cf");
++            }
+         }
+         validate(tmutations);
+     }
+ 
 -    private void validate(Collection<RowMutation> tmutations) throws InvalidRequestException
 +    private void validate(Collection<Mutation> tmutations) throws InvalidRequestException
      {
 -        for (RowMutation mutation : tmutations)
 +        for (Mutation mutation : tmutations)
          {
              QueryProcessor.validateKey(mutation.key());
              for (ColumnFamily tcf : mutation.getColumnFamilies())
@@@ -101,7 -139,7 +141,7 @@@
       * Switch class loader before using the triggers for the column family, if
       * not loaded them with the custom class loader.
       */
-     private List<Mutation> execute(ByteBuffer key, ColumnFamily columnFamily)
 -    private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
++    private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
      {
          Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
          if (triggers.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8440bc55/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggersTest.java
index b374759,5b9b27d..79133e2
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@@ -31,22 -31,25 +31,21 @@@ import org.apache.cassandra.config.Sche
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.ArrayBackedSortedColumns;
 -import org.apache.cassandra.db.Column;
 +import org.apache.cassandra.db.Cell;
  import org.apache.cassandra.db.ColumnFamily;
  import org.apache.cassandra.db.ConsistencyLevel;
 -import org.apache.cassandra.db.RowMutation;
 +import org.apache.cassandra.db.Mutation;
- import org.apache.cassandra.db.composites.CellNames;
+ import org.apache.cassandra.exceptions.RequestExecutionException;
  import org.apache.cassandra.service.StorageService;
--import org.apache.cassandra.thrift.Cassandra;
--import org.apache.cassandra.thrift.ColumnOrSuperColumn;
--import org.apache.cassandra.thrift.ColumnParent;
- import org.apache.cassandra.thrift.TFramedTransportFactory;
- import org.apache.cassandra.thrift.ThriftServer;
 -import org.apache.cassandra.thrift.InvalidRequestException;
 -import org.apache.cassandra.thrift.Mutation;
 -import org.apache.cassandra.thrift.TFramedTransportFactory;
 -import org.apache.cassandra.thrift.ThriftServer;
 -import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.thrift.*;
  import org.apache.thrift.protocol.TBinaryProtocol;
  
- import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
++import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.apache.cassandra.utils.ByteBufferUtil.toInt;
 +
  public class TriggersTest extends SchemaLoader
  {
      private static boolean triggerCreated = false;
@@@ -148,6 -155,142 +151,142 @@@
          assertUpdateIsAugmented(3);
      }
  
+     @Test
+     public void executeTriggerOnCqlInsertWithConditions() throws Exception
+     {
+         String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (4, 4) IF NOT EXISTS",
ksName, cfName);
+         QueryProcessor.process(cql, ConsistencyLevel.ONE);
+         assertUpdateIsAugmented(4);
+     }
+ 
+     @Test
+     public void executeTriggerOnCqlBatchWithConditions() throws Exception
+     {
+         String cql = String.format("BEGIN BATCH " +
+                                    "  INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5) IF NOT
EXISTS; " +
+                                    "  INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5); " +
+                                    "APPLY BATCH",
+                                     ksName, cfName);
+         QueryProcessor.process(cql, ConsistencyLevel.ONE);
+         assertUpdateIsAugmented(5);
+     }
+ 
+     @Test
+     public void executeTriggerOnThriftCASOperation() throws Exception
+     {
+         Cassandra.Client client = new Cassandra.Client(
+                 new TBinaryProtocol(
+                         new TFramedTransportFactory().openTransport(
+                                 InetAddress.getLocalHost().getHostName(), 9170)));
+         client.set_keyspace(ksName);
 -        client.cas(ByteBufferUtil.bytes(6),
++        client.cas(bytes(6),
+                    cfName,
 -                   Collections.EMPTY_LIST,
++                   Collections.<Column>emptyList(),
+                    Collections.singletonList(getColumnForInsert("v1", 6)),
+                    org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                    org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+ 
+         assertUpdateIsAugmented(6);
+     }
+ 
+     // Unfortunately, an IRE thrown from StorageProxy.cas
+     // results in a RuntimeException from QueryProcessor.process
+     @Test(expected=RuntimeException.class)
+     public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentPartition() throws
Exception
+     {
+         String cf = "cf" + System.nanoTime();
+         try
+         {
+             setupTableWithTrigger(cf, CrossPartitionTrigger.class);
+             String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (7, 7) IF NOT EXISTS",
ksName, cf);
+             QueryProcessor.process(cql, ConsistencyLevel.ONE);
+         }
+         finally
+         {
+             assertUpdateNotExecuted(cf, 7);
+         }
+     }
+ 
+     // Unfortunately, an IRE thrown from StorageProxy.cas
+     // results in a RuntimeException from QueryProcessor.process
+     @Test(expected=RuntimeException.class)
+     public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentTable() throws
Exception
+     {
+         String cf = "cf" + System.nanoTime();
+         try
+         {
+             setupTableWithTrigger(cf, CrossTableTrigger.class);
+             String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (8, 8) IF NOT EXISTS",
ksName, cf);
+             QueryProcessor.process(cql, ConsistencyLevel.ONE);
+         }
+         finally
+         {
+             assertUpdateNotExecuted(cf, 7);
+         }
+     }
+ 
+     @Test(expected=InvalidRequestException.class)
+     public void onThriftCASRejectGeneratedUpdatesForDifferentPartition() throws Exception
+     {
+         String cf = "cf" + System.nanoTime();
+         try
+         {
+             setupTableWithTrigger(cf, CrossPartitionTrigger.class);
+             Cassandra.Client client = new Cassandra.Client(
+                     new TBinaryProtocol(
+                             new TFramedTransportFactory().openTransport(
+                                     InetAddress.getLocalHost().getHostName(), 9170)));
+             client.set_keyspace(ksName);
 -            client.cas(ByteBufferUtil.bytes(9),
++            client.cas(bytes(9),
+                        cf,
 -                       Collections.EMPTY_LIST,
++                       Collections.<Column>emptyList(),
+                        Collections.singletonList(getColumnForInsert("v1", 9)),
+                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+         }
+         finally
+         {
+             assertUpdateNotExecuted(cf, 9);
+         }
+     }
+ 
+     @Test(expected=InvalidRequestException.class)
+     public void onThriftCASRejectGeneratedUpdatesForDifferentCF() throws Exception
+     {
+         String cf = "cf" + System.nanoTime();
+         try
+         {
+             setupTableWithTrigger(cf, CrossTableTrigger.class);
+             Cassandra.Client client = new Cassandra.Client(
+                     new TBinaryProtocol(
+                             new TFramedTransportFactory().openTransport(
+                                     InetAddress.getLocalHost().getHostName(), 9170)));
+             client.set_keyspace(ksName);
 -            client.cas(ByteBufferUtil.bytes(10),
++            client.cas(bytes(10),
+                        cf,
 -                       Collections.EMPTY_LIST,
++                       Collections.<Column>emptyList(),
+                        Collections.singletonList(getColumnForInsert("v1", 10)),
+                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
+                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+         }
+         finally
+         {
+             assertUpdateNotExecuted(cf, 10);
+         }
+     }
+ 
+     private void setupTableWithTrigger(String cf, Class<? extends ITrigger> triggerImpl)
+     throws RequestExecutionException
+     {
+         String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2
int, PRIMARY KEY (k))", ksName, cf);
+         QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ 
+         // no conditional execution of create trigger stmt yet
+         cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'",
+                             ksName, cf, triggerImpl.getName());
+         QueryProcessor.process(cql, ConsistencyLevel.ONE);
+     }
+ 
      private void assertUpdateIsAugmented(int key)
      {
          UntypedResultSet rs = QueryProcessor.processInternal(
@@@ -166,14 -317,43 +313,43 @@@
  
      public static class TestTrigger implements ITrigger
      {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
 +        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
          {
              ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory,
false);
 -            extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
 -                                             ByteBufferUtil.bytes(999)));
 -            RowMutation rm = new RowMutation(ksName, key);
 -            rm.add(extraUpdate);
 -            return Collections.singletonList(rm);
 +            extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
 +                                           bytes(999)));
 +            Mutation mutation = new Mutation(ksName, key);
 +            mutation.add(extraUpdate);
 +            return Collections.singletonList(mutation);
          }
      }
+ 
+     public static class CrossPartitionTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory,
false);
 -            extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
 -                                             ByteBufferUtil.bytes(999)));
++            extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
++                                           bytes(999)));
+ 
 -            int newKey = ByteBufferUtil.toInt(key) + 1000;
 -            RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
 -            rm.add(extraUpdate);
 -            return Collections.singletonList(rm);
++            int newKey = toInt(key) + 1000;
++            Mutation mutation = new Mutation(ksName, bytes(newKey));
++            mutation.add(extraUpdate);
++            return Collections.singletonList(mutation);
+         }
+     }
+ 
+     public static class CrossTableTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
 -            extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
 -                                             ByteBufferUtil.bytes(999)));
++            extraUpdate.addColumn(new Cell(extraUpdate.metadata().comparator.makeCellName(bytes("v2")),
++                                           bytes(999)));
+ 
 -            RowMutation rm = new RowMutation(ksName, key);
 -            rm.add(extraUpdate);
 -            return Collections.singletonList(rm);
++            Mutation mutation = new Mutation(ksName, key);
++            mutation.add(extraUpdate);
++            return Collections.singletonList(mutation);
+         }
+     }
  }


Mime
View raw message