cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject svn commit: r930903 - in /cassandra/trunk: interface/ src/java/org/apache/cassandra/avro/ test/system/
Date Mon, 05 Apr 2010 17:01:29 GMT
Author: eevans
Date: Mon Apr  5 17:01:29 2010
New Revision: 930903

URL: http://svn.apache.org/viewvc?rev=930903&view=rev
Log:
batch_mutate() rpc implementation

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.avpr
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.avpr
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Mon Apr  5 17:01:29 2010
@@ -131,7 +131,6 @@
     "batch_mutate": {
         "request": [
             {"name": "keyspace", "type": "string"},
-            /* Map<String, Map<String, List<Mutation>>> mutation_map */
             {"name": "mutation_map",
                 "type": {
                     "type": "map", "values": {

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java Mon Apr  5 17:01:29
2010
@@ -131,6 +131,11 @@ class ErrorFactory
     {
         return newTimedOutException(new Utf8(why));
     }
+
+    static TimedOutException newTimedOutException()
+    {
+        return newTimedOutException(new Utf8());
+    }
     
     static UnavailableException newUnavailableException(Utf8 why)
     {
@@ -143,4 +148,9 @@ class ErrorFactory
     {
         return newUnavailableException(new Utf8(why));
     }
+    
+    static UnavailableException newUnavailableException()
+    {
+        return newUnavailableException(new Utf8());
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Mon Apr  5 17:01:29
2010
@@ -21,7 +21,10 @@ package org.apache.cassandra.avro;
  */
 
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Comparator;
+
 import org.apache.avro.util.Utf8;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -88,13 +91,13 @@ public class AvroValidation {
         }
          
         if (column != null)
-            validateColumns(keyspace, column_family, super_column, Arrays.asList(column));
+            validateColumns(keyspace, column_family, super_column, Arrays.asList(cp.column));
         if (super_column != null)
-            validateColumns(keyspace, column_family, null, Arrays.asList(super_column));
+            validateColumns(keyspace, column_family, null, Arrays.asList(cp.super_column));
     }
     
     // FIXME: could use method in ThriftValidation
-    static void validateColumns(String keyspace, String cfName, byte[] superColumnName, Iterable<byte[]>
columnNames)
+    static void validateColumns(String keyspace, String cfName, byte[] superColumnName, Iterable<ByteBuffer>
columnNames)
     throws InvalidRequestException
     {
         if (superColumnName != null)
@@ -108,8 +111,10 @@ public class AvroValidation {
         }
         
         AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, cfName, superColumnName);
-        for (byte[] name : columnNames)
+        for (ByteBuffer buff : columnNames)
         {
+            byte[] name = buff.array();
+
             if (name.length > IColumn.MAX_NAME_LENGTH)
                 throw newInvalidRequestException("column name length must not be greater
than " + IColumn.MAX_NAME_LENGTH);
             if (name.length == 0)
@@ -139,4 +144,77 @@ public class AvroValidation {
         if ((cosc.column == null) && (cosc.super_column == null))
             throw newInvalidRequestException("ColumnOrSuperColumn must have one or both of
Column or SuperColumn");
     }
+
+    static void validateRange(String keyspace, String cfName, byte[] superName, SliceRange
range)
+    throws InvalidRequestException
+    {
+        AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, cfName, superName);
+        byte[] start = range.start.array();
+        byte[] finish = range.finish.array();
+
+        try
+        {
+            comparator.validate(start);
+            comparator.validate(finish);
+        }
+        catch (MarshalException me)
+        {
+            throw newInvalidRequestException(me.getMessage());
+        }
+
+        if (range.count < 0)
+            throw newInvalidRequestException("Ranges require a non-negative count.");
+
+        Comparator<byte[]> orderedComparator = range.reversed ? comparator.getReverseComparator()
: comparator;
+        if (start.length > 0 && finish.length > 0 && orderedComparator.compare(start,
finish) > 0)
+            throw newInvalidRequestException("range finish must come after start in the order
of traversal");
+    }
+
+    static void validateSlicePredicate(String keyspace, String cfName, byte[] superName,
SlicePredicate predicate)
+    throws InvalidRequestException
+    {
+        if (predicate.column_names == null && predicate.slice_range == null)
+            throw newInvalidRequestException("A SlicePredicate must be given a list of Columns,
a SliceRange, or both");
+
+        if (predicate.slice_range != null)
+            validateRange(keyspace, cfName, superName, predicate.slice_range);
+
+        if (predicate.column_names != null)
+            validateColumns(keyspace, cfName, superName, predicate.column_names);
+    }
+
+    static void validateDeletion(String keyspace, String  cfName, Deletion del) throws InvalidRequestException
+    {
+        if (del.super_column == null && del.predicate == null)
+            throw newInvalidRequestException("A Deletion must have a SuperColumn, a SlicePredicate,
or both.");
+
+        if (del.predicate != null)
+        {
+            validateSlicePredicate(keyspace, cfName, del.super_column.array(), del.predicate);
+            if (del.predicate.slice_range != null)
+                throw newInvalidRequestException("Deletion does not yet support SliceRange
predicates.");
+        }
+    }
+
+    static void validateMutation(String keyspace, String cfName, Mutation mutation) throws
InvalidRequestException
+    {
+        ColumnOrSuperColumn cosc = mutation.column_or_supercolumn;
+        Deletion del = mutation.deletion;
+
+        if (cosc != null && del != null)
+            throw newInvalidRequestException("Mutation may have either a ColumnOrSuperColumn
or a Deletion, but not both");
+
+        if (cosc != null)
+        {
+            validateColumnOrSuperColumn(keyspace, cfName, cosc);
+        }
+        else if (del != null)
+        {
+            validateDeletion(keyspace, cfName, del);
+        }
+        else
+        {
+            throw newInvalidRequestException("Mutation must have one ColumnOrSuperColumn,
or one Deletion");
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon Apr  5 17:01:29
2010
@@ -30,12 +30,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.util.Utf8;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
@@ -326,6 +326,110 @@ public class CassandraServer implements 
         return rm;
     }
 
+    @Override
+    public Void batch_mutate(Utf8 keyspace, Map<Utf8, Map<Utf8, GenericArray<Mutation>>>
mutationMap, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("batch_mutate");
+        
+        String keyspaceString = keyspace.toString();
+        
+        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        for (Map.Entry<Utf8, Map<Utf8, GenericArray<Mutation>>> mutationEntry:
mutationMap.entrySet())
+        {
+            String key = mutationEntry.getKey().toString();
+            AvroValidation.validateKey(key);
+            
+            Map<Utf8, GenericArray<Mutation>> cfToMutations = mutationEntry.getValue();
+            for (Map.Entry<Utf8, GenericArray<Mutation>> cfMutations : cfToMutations.entrySet())
+            {
+                String cfName = cfMutations.getKey().toString();
+                
+                for (Mutation mutation : cfMutations.getValue())
+                    AvroValidation.validateMutation(keyspaceString, cfName, mutation);
+            }
+            rowMutations.add(getRowMutationFromMutations(keyspaceString, key, cfToMutations));
+        }
+        
+        if (consistencyLevel == ConsistencyLevel.ZERO)
+        {
+            StorageProxy.mutate(rowMutations);
+        }
+        else
+        {
+            try
+            {
+                StorageProxy.mutateBlocking(rowMutations, thriftConsistencyLevel(consistencyLevel));
+            }
+            catch (TimeoutException te)
+            {
+                throw newTimedOutException();
+            }
+            // FIXME: StorageProxy.mutateBlocking throws Thrift's UnavailableException
+            catch (org.apache.cassandra.thrift.UnavailableException ue)
+            {
+                throw newUnavailableException();
+            }
+        }
+        
+        return null;
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses
Thrift types directly).
+    private static RowMutation getRowMutationFromMutations(String keyspace, String key, Map<Utf8,
GenericArray<Mutation>> cfMap)
+    {
+        RowMutation rm = new RowMutation(keyspace, key.trim());
+        
+        for (Map.Entry<Utf8, GenericArray<Mutation>> entry : cfMap.entrySet())
+        {
+            String cfName = entry.getKey().toString();
+            
+            for (Mutation mutation : entry.getValue())
+            {
+                if (mutation.deletion != null)
+                    deleteColumnOrSuperColumnToRowMutation(rm, cfName, mutation.deletion);
+                else
+                    addColumnOrSuperColumnToRowMutation(rm, cfName, mutation.column_or_supercolumn);
+            }
+        }
+        
+        return rm;
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses
Thrift types directly).
+    private static void addColumnOrSuperColumnToRowMutation(RowMutation rm, String cfName,
ColumnOrSuperColumn cosc)
+    {
+        if (cosc.column == null)
+        {
+            for (Column column : cosc.super_column.columns)
+                rm.add(new QueryPath(cfName, cosc.super_column.name.array(), column.name.array()),
column.value.array(), column.timestamp);
+        }
+        else
+        {
+            rm.add(new QueryPath(cfName, null, cosc.column.name.array()), cosc.column.value.array(),
cosc.column.timestamp);
+        }
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses
Thrift types directly).
+    private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm, String cfName,
Deletion del)
+    {
+        if (del.predicate != null && del.predicate.column_names != null)
+        {
+            for (ByteBuffer col : del.predicate.column_names)
+            {
+                if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(rm.getTable(),
cfName).equals("Super"))
+                    rm.delete(new QueryPath(cfName, col.array()), del.timestamp);
+                else
+                    rm.delete(new QueryPath(cfName, del.super_column.array(), col.array()),
del.timestamp);
+            }
+        }
+        else
+        {
+            rm.delete(new QueryPath(cfName, del.super_column.array()), del.timestamp);
+        }
+    }
+    
     private org.apache.cassandra.thrift.ConsistencyLevel thriftConsistencyLevel(ConsistencyLevel
consistency)
     {
         switch (consistency)

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Mon Apr  5 17:01:29 2010
@@ -17,6 +17,7 @@
 from . import AvroTester
 from time import time
 from random import randint
+from avro.ipc import AvroRemoteException
 
 COLUMNS = [
     dict(name="c0", value="v0", timestamp=1L),
@@ -142,6 +143,39 @@ class TestRpcOperations(AvroTester):
         for i in range(0,3):
             assert_cosc(_get_column(self.client, COLUMNS[i]['name']))
 
+    def test_batch_mutate(self):
+        "performing batch mutation operations"
+        params = dict()
+        params['keyspace'] = 'Keyspace1'
+        params['consistency_level'] = 'ONE'
+
+        mutation_map = dict()
+        mutation_map['key1'] = dict(Standard1=[
+            dict(column_or_supercolumn=dict(column=COLUMNS[0])),
+            dict(column_or_supercolumn=dict(column=COLUMNS[1])),
+            dict(column_or_supercolumn=dict(column=COLUMNS[2]))
+        ])
+
+        params['mutation_map'] = mutation_map
+
+        self.client.request('batch_mutate', params)
+
+        for i in range(0,3):
+            cosc = _get_column(self.client, COLUMNS[i]['name'])
+            assert_cosc(cosc)
+            assert_columns_match(cosc['column'], COLUMNS[i])
+
+        # FIXME: still need to apply a mutation that deletes
+
+        #try:
+        #    assert not _get_column(self.client, COLUMNS[1]['name']), \
+        #        "Mutation did not delete column %s" % COLUMNS[1]['name']
+        #    assert not _get_column(self.client, COLUMNS[2]['name']), \
+        #        "Mutation did not delete column %s" % COLUMNS[2]['name']
+        #except AvroRemoteException:
+        #    pass
+
+
     def test_get_api_version(self):
         "getting the remote api version string"
         vers = self.client.request('get_api_version', {})



Mime
View raw message