cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject svn commit: r955286 - in /cassandra/trunk: interface/cassandra.avpr interface/cassandra.genavro src/java/org/apache/cassandra/avro/AvroValidation.java src/java/org/apache/cassandra/avro/CassandraServer.java test/system/test_avro_server.py
Date Wed, 16 Jun 2010 16:16:25 GMT
Author: eevans
Date: Wed Jun 16 16:16:25 2010
New Revision: 955286

URL: http://svn.apache.org/viewvc?rev=955286&view=rev
Log:
working batch_mutate() for avro server w/ tests

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.avpr
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.avpr
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=955286&r1=955285&r2=955286&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Wed Jun 16 16:16:25 2010
@@ -99,6 +99,16 @@
             {"name": "cf_defs", "type": {"type": "array", "items": "CfDef"}}
           ]
       },
+      {"name": "MutationsMapEntry", "type": "record",
+          "fields": [
+            {"name": "key", "type" : "bytes"},
+            {"name": "mutations", "type":
+                {"type": "map",
+                    "values": {"type": "array", "items": "Mutation"}
+                }
+            }
+          ]
+      },
       {"name": "ConsistencyLevel", "type": "enum",
           "symbols": [
             "ZERO", "ONE", "QUORUM", "DCQUORUM", "DCQUORUMSYNC", "ALL"
@@ -161,14 +171,8 @@
     },
     "batch_mutate": {
         "request": [
-            {"name": "keyspace", "type": "string"},
-            {"name": "mutation_map",
-                "type": {
-                    "type": "map", "values": {
-                        "type": "map", "values": {
-                            "type": "array", "items": "Mutation"}
-                    }
-                }
+            {"name": "mutation_map", "type":
+                {"type": "array", "items": "MutationsMapEntry"}
             },
             {"name": "consistency_level", "type": "ConsistencyLevel"}
         ],

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=955286&r1=955285&r2=955286&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Wed Jun 16 16:16:25 2010
@@ -82,7 +82,12 @@ protocol Cassandra {
         string strategy_class;
         int replication_factor;
         array<CfDef> cf_defs;
-    } 
+    }
+    
+    record MutationsMapEntry {
+        bytes key;
+        map<array<Mutation>> mutations;
+    }
 
     enum ConsistencyLevel {
         ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL
@@ -122,8 +127,7 @@ protocol Cassandra {
                 ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException;
 
-    void batch_mutate(string keyspace,
-                      map<map<array<Mutation>>> mutation_map,
+    void batch_mutate(array<MutationsMapEntry> mutation_map,
                       ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=955286&r1=955285&r2=955286&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Wed Jun 16 16:16:25
2010
@@ -236,7 +236,8 @@ public class AvroValidation {
 
         if (del.predicate != null)
         {
-            validateSlicePredicate(keyspace, cfName, del.super_column.array(), del.predicate);
+            byte[] superName = del.super_column == null ? null : del.super_column.array();
+            validateSlicePredicate(keyspace, cfName, superName, del.predicate);
             if (del.predicate.slice_range != null)
                 throw newInvalidRequestException("Deletion does not yet support SliceRange
predicates.");
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=955286&r1=955285&r2=955286&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Wed Jun 16 16:16:25
2010
@@ -326,29 +326,28 @@ public class CassandraServer implements 
         }
     }
 
-    public Void batch_mutate(Utf8 keyspace, Map<Utf8, Map<Utf8, GenericArray<Mutation>>>
mutationMap, ConsistencyLevel consistencyLevel)
-    throws AvroRemoteException, UnavailableException, TimedOutException
+    @Override
+    public Void batch_mutate(GenericArray<MutationsMapEntry> mutationMap, ConsistencyLevel
consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
     {
         if (logger.isDebugEnabled())
             logger.debug("batch_mutate");
         
-        String keyspaceString = keyspace.toString();
-        
         List<RowMutation> rowMutations = new ArrayList<RowMutation>();
-        for (Map.Entry<Utf8, Map<Utf8, GenericArray<Mutation>>> mutationEntry:
mutationMap.entrySet())
+        
+        for (MutationsMapEntry pair: mutationMap)
         {
-            String key = mutationEntry.getKey().toString();
-            AvroValidation.validateKey(key);
+            AvroValidation.validateKey(pair.key.array());
+            Map<Utf8, GenericArray<Mutation>> cfToMutations = pair.mutations;
             
-            Map<Utf8, GenericArray<Mutation>> cfToMutations = mutationEntry.getValue();
             for (Map.Entry<Utf8, GenericArray<Mutation>> cfMutations : cfToMutations.entrySet())
             {
                 String cfName = cfMutations.getKey().toString();
                 
                 for (Mutation mutation : cfMutations.getValue())
-                    AvroValidation.validateMutation(keyspaceString, cfName, mutation);
+                    AvroValidation.validateMutation(curKeyspace.get(), cfName, mutation);
             }
-            rowMutations.add(getRowMutationFromMutations(keyspaceString, key, cfToMutations));
+            rowMutations.add(getRowMutationFromMutations(curKeyspace.get(), pair.key.array(),
cfToMutations));
         }
         
         if (consistencyLevel == ConsistencyLevel.ZERO)
@@ -381,10 +380,9 @@ public class CassandraServer implements 
     }
     
     // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses
Thrift types directly).
-    private static RowMutation getRowMutationFromMutations(String keyspace, String key, Map<Utf8,
GenericArray<Mutation>> cfMap)
+    private static RowMutation getRowMutationFromMutations(String keyspace, byte[] key, Map<Utf8,
GenericArray<Mutation>> cfMap)
     {
-        // FIXME: string key
-        RowMutation rm = new RowMutation(keyspace, key.trim().getBytes(UTF8));
+        RowMutation rm = new RowMutation(keyspace, key);
         
         for (Map.Entry<Utf8, GenericArray<Mutation>> entry : cfMap.entrySet())
         {
@@ -419,6 +417,8 @@ public class CassandraServer implements 
     // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses
Thrift types directly).
     private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm, String cfName,
Deletion del)
     {
+        byte[] superName = del.super_column == null ? null : del.super_column.array();
+        
         if (del.predicate != null && del.predicate.column_names != null)
         {
             for (ByteBuffer col : del.predicate.column_names)
@@ -426,12 +426,12 @@ public class CassandraServer implements 
                 if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(rm.getTable(),
cfName) == ColumnFamilyType.Super)
                     rm.delete(new QueryPath(cfName, col.array()), unavronateClock(del.clock));
                 else
-                    rm.delete(new QueryPath(cfName, del.super_column.array(), col.array()),
unavronateClock(del.clock));
+                    rm.delete(new QueryPath(cfName, superName, col.array()), unavronateClock(del.clock));
             }
         }
         else
         {
-            rm.delete(new QueryPath(cfName, del.super_column.array()), unavronateClock(del.clock));
+            rm.delete(new QueryPath(cfName, superName), unavronateClock(del.clock));
         }
     }
     

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=955286&r1=955285&r2=955286&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Wed Jun 16 16:16:25 2010
@@ -23,6 +23,18 @@ import struct
 def i64(i):
     return struct.pack('>q', i)
 
+def timestamp():
+    return long(time() * 1e6)
+
+def new_column(suffix, stamp=None, ttl=0):
+    ts = isinstance(stamp, (long,int)) and stamp or timestamp()
+    column = dict()
+    column['name'] = 'name-%s' % suffix
+    column['value'] = 'value-%s' % suffix
+    column['clock'] = {'timestamp': ts}
+    column['ttl'] = ttl
+    return column
+
 def assert_columns_match(colA, colB):
     assert colA['name'] == colB['name'], \
             "column name mismatch: %s != %s" % (colA['name'], colB['name'])
@@ -33,7 +45,12 @@ def assert_cosc(thing, with_supercolumn=
     containing = with_supercolumn and 'super_column' or 'column'
     assert isinstance(thing, dict), "Expected dict, got %s" % type(thing)
     assert thing.has_key(containing) and thing[containing].has_key('name'), \
-            "Invalid or missing \"%s\"" % containing
+            "Invalid or missing \"%s\" member" % containing
+
+def assert_raises(excClass, func, *args, **kwargs):
+    try: r = func(*args, **kwargs)
+    except excClass: pass
+    else: raise Exception('expected %s; got %s' % (excClass.__name__, r))
 
 class TestRpcOperations(AvroTester):
     def test_insert_simple(self):       # Also tests get
@@ -125,6 +142,58 @@ class TestRpcOperations(AvroTester):
         except AvroRemoteException, err: pass
         else: assert False, "Expected exception, returned %s instead" % cosc
 
+    def test_batch_mutate(self):
+        "batching addition/removal mutations"
+        self.__set_keyspace('Keyspace1')
+
+        mutations = list()
+       
+        # New column mutations
+        for i in range(3):
+            cosc = {'column': new_column(i)}
+            mutation = {'column_or_supercolumn': cosc}
+            mutations.append(mutation)
+
+        map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations}}
+
+        params = dict()
+        params['mutation_map'] = [map_entry]
+        params['consistency_level'] = 'ONE'
+
+        self.client.request('batch_mutate', params)
+
+        # Verify that new columns were added
+        for i in range(3):
+            column = new_column(i)
+            cosc = self.__get('key1', 'Standard1', None, column['name'])
+            assert_cosc(cosc)
+            assert_columns_match(cosc['column'], column)
+
+        # Add one more column; remove one column
+        extra_column = new_column(3); remove_column = new_column(0)
+        mutations = [{'column_or_supercolumn': {'column': extra_column}}]
+        deletion = dict()
+        deletion['clock'] = {'timestamp': timestamp()}
+        deletion['predicate'] = {'column_names': [remove_column['name']]}
+        mutations.append({'deletion': deletion})
+
+        map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations}}
+
+        params = dict()
+        params['mutation_map'] = [map_entry]
+        params['consistency_level'] = 'ONE'
+
+        self.client.request('batch_mutate', params)
+
+        # Ensure successful column removal
+        assert_raises(AvroRemoteException,
+                self.__get, 'key1', 'Standard1', None, remove_column['name'])
+
+        # Ensure successful column addition
+        cosc = self.__get('key1', 'Standard1', None, extra_column['name'])
+        assert_cosc(cosc)
+        assert_columns_match(cosc['column'], extra_column)
+
     def test_describe_keyspaces(self):
         "retrieving a list of all keyspaces"
         keyspaces = self.client.request('describe_keyspaces', {})
@@ -143,4 +212,27 @@ class TestRpcOperations(AvroTester):
         assert len(segs) == 3 and len([i for i in segs if i.isdigit()]) == 3, \
                "incorrect api version format: " + vers
 
+    def __get(self, key, cf, super_name, col_name, consistency_level='ONE'):
+        """
+        Given arguments for the key, column family, super column name,
+        column name, and consistency level, returns a dictionary 
+        representing a ColumnOrSuperColumn record.
+
+        Raises an AvroRemoteException if the column is not found.
+        """
+        params = dict()
+        params['key'] = key
+        params['column_path'] = dict()
+        params['column_path']['column_family'] = cf
+        params['column_path']['column'] = col_name
+        params['consistency_level'] = consistency_level
+
+        if (super_name):
+            params['super_column'] = super_name
+
+        return self.client.request('get', params)
+
+    def __set_keyspace(self, keyspace_name):
+        self.client.request('set_keyspace', {'keyspace': keyspace_name})
+
 # vi:ai sw=4 ts=4 tw=0 et



Mime
View raw message