cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r992001 - in /cassandra/trunk: interface/cassandra.genavro src/java/org/apache/cassandra/avro/AvroValidation.java src/java/org/apache/cassandra/avro/CassandraServer.java
Date Thu, 02 Sep 2010 16:17:42 GMT
Author: brandonwilliams
Date: Thu Sep  2 16:17:21 2010
New Revision: 992001

URL: http://svn.apache.org/viewvc?rev=992001&view=rev
Log:
avro: get_range_slices implementation. Patch by Jeremy Hanna, reviewed by brandonwilliams

Modified:
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=992001&r1=992000&r2=992001&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Sep  2 16:17:21 2010
@@ -89,6 +89,22 @@ protocol Cassandra {
         int count;
     }
 
+    /**
+     * The semantics of start keys and tokens are slightly different.
+     * Keys are start-inclusive; tokens are start-exclusive.  Token
+     * ranges may also wrap -- that is, the end token may be less
+     * than the start one.  Thus, a range from keyX to keyX is a
+     * one-element range, but a range from tokenY to tokenY is the
+     * full ring.
+     */
+    record KeyRange {
+        union { bytes, null } start_key;
+        union { bytes, null } end_key;
+        union { string, null } start_token;
+        union { string, null } end_token;
+        int count;
+    }
+
     record KeySlice {
         bytes key;
         array<ColumnOrSuperColumn> columns;
@@ -346,4 +362,12 @@ protocol Cassandra {
      */
     array<TokenRange> describe_ring(string keyspace)
     throws InvalidRequestException;
+
+    /**
+     *returns a subset of columns for a contiguous range of keys.
+     */
+    array<KeySlice> get_range_slices(ColumnParent column_parent,
+                                     SlicePredicate predicate,
+                                     KeyRange range,
+                                     union { ConsistencyLevel, null } consistency_level);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=992001&r1=992000&r2=992001&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu Sep  2 16:17:21
2010
@@ -36,6 +36,10 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.avro.ErrorFactory.newInvalidRequestException;
@@ -300,6 +304,42 @@ public class AvroValidation
             validateColumns(keyspace, cp, predicate.column_names);
     }
 
+    public static void validateKeyRange(KeyRange range)
+    throws InvalidRequestException
+    {
+        if ((range.start_key == null) != (range.end_key == null))
+        {
+            throw newInvalidRequestException("start key and end key must either both be non-null,
or both be null");
+        }
+        if ((range.start_token == null) != (range.end_token == null))
+        {
+            throw newInvalidRequestException("start token and end token must either both
be non-null, or both be null");
+        }
+        if ((range.start_key == null) == (range.start_token == null))
+        {
+            throw newInvalidRequestException("exactly one of {start key, end key} or {start
token, end token} must be specified");
+        }
+
+        if (range.start_key != null)
+        {
+            IPartitioner p = StorageService.getPartitioner();
+            Token startToken = p.getToken(range.start_key.array());
+            Token endToken = p.getToken(range.end_key.array());
+            if (startToken.compareTo(endToken) > 0 && !endToken.equals(p.getMinimumToken()))
+            {
+                if (p instanceof RandomPartitioner)
+                    throw newInvalidRequestException("start key's md5 sorts after end key's
md5.  this is not allowed; you probably should not specify end key at all, under RandomPartitioner");
+                else
+                    throw newInvalidRequestException("start key must sort before (or equal
to) finish key in your partitioner!");
+            }
+        }
+
+        if (range.count <= 0)
+        {
+            throw newInvalidRequestException("maxRows must be positive");
+        }
+    }
+
     static void validateIndexClauses(String keyspace, String columnFamily, IndexClause index_clause)
     throws InvalidRequestException
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=992001&r1=992000&r2=992001&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu Sep  2 16:17:21
2010
@@ -43,6 +43,8 @@ import org.apache.avro.util.Utf8;
 import org.apache.cassandra.avro.InvalidRequestException;
 import org.apache.cassandra.db.migration.DropKeyspace;
 import org.apache.cassandra.db.migration.RenameKeyspace;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.thrift.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,8 +67,6 @@ import org.apache.cassandra.db.migration
 import org.apache.cassandra.db.migration.DropColumnFamily;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.db.migration.RenameColumnFamily;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.ClientState;
@@ -1064,6 +1064,71 @@ public class CassandraServer implements 
     }
 
     @Override
+    public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate
slice_predicate, KeyRange range, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, TimedOutException
+    {
+        String keyspace = clientState.getKeyspace();
+        try
+        {
+            clientState.hasKeyspaceAccess(Permission.READ_VALUE);
+        }
+        catch (org.apache.cassandra.thrift.InvalidRequestException thriftE)
+        {
+            throw newInvalidRequestException(thriftE);
+        }
+
+        AvroValidation.validateColumnParent(keyspace, column_parent);
+        AvroValidation.validatePredicate(keyspace, column_parent, slice_predicate);
+        AvroValidation.validateKeyRange(range);
+
+        List<Row> rows;
+        try
+        {
+            IPartitioner p = StorageService.getPartitioner();
+            AbstractBounds bounds;
+            if (range.start_key == null)
+            {
+                Token.TokenFactory tokenFactory = p.getTokenFactory();
+                Token left = tokenFactory.fromString(range.start_token.toString());
+                Token right = tokenFactory.fromString(range.end_token.toString());
+                bounds = new Range(left, right);
+            }
+            else
+            {
+                bounds = new Bounds(p.getToken(range.start_key.array()), p.getToken(range.end_key.array()));
+            }
+            try
+            {
+                schedule();
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
+                                                                        thriftColumnParent(column_parent),
+                                                                        thriftSlicePredicate(slice_predicate),
+                                                                        bounds,
+                                                                        range.count),
+                                                  thriftConsistencyLevel(consistency_level));
+            }
+            catch (org.apache.cassandra.thrift.UnavailableException thriftE)
+            {
+                throw newUnavailableException(thriftE);
+            }
+            finally
+            {
+                release();
+            }
+            assert rows != null;
+        }
+        catch (TimeoutException e)
+        {
+        	throw new TimedOutException();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return avronateKeySlices(rows, column_parent, slice_predicate);
+    }
+
+    @Override
     public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause
index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -1074,7 +1139,8 @@ public class CassandraServer implements 
         {
             clientState.hasKeyspaceAccess(Permission.READ_VALUE);
         }
-        catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) {
+        catch (org.apache.cassandra.thrift.InvalidRequestException thriftE)
+        {
             throw newInvalidRequestException(thriftE);
         }
 
@@ -1115,13 +1181,32 @@ public class CassandraServer implements 
 
         return keySlices;
     }
-    
+
+    private org.apache.cassandra.thrift.ColumnParent thriftColumnParent(ColumnParent avro_column_parent)
+    {
+        org.apache.cassandra.thrift.ColumnParent cp = new org.apache.cassandra.thrift.ColumnParent(avro_column_parent.column_family.toString());
+        if (avro_column_parent.super_column != null)
+            cp.super_column = avro_column_parent.super_column.array();
+
+        return cp;
+    }
+
     private org.apache.cassandra.thrift.SlicePredicate thriftSlicePredicate(SlicePredicate
avro_pred) {
-        List<byte[]> bufs = new ArrayList<byte[]>();
-        for(ByteBuffer buf : avro_pred.column_names)
-            bufs.add(buf.array());
+        // One or the other are set, so check for nulls of either
+
+        List<byte[]> bufs = null;
+        if (avro_pred.column_names != null)
+        {
+            bufs = new ArrayList<byte[]>();
+            for(ByteBuffer buf : avro_pred.column_names)
+                bufs.add(buf.array());
+        }
+
+        org.apache.cassandra.thrift.SliceRange slice_range = (avro_pred.slice_range != null)
+                                                                ? thriftSliceRange(avro_pred.slice_range)
+                                                                : null;
 
-        return new org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(thriftSliceRange(avro_pred.slice_range));
+        return new org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(slice_range);
     }
 
     private org.apache.cassandra.thrift.SliceRange thriftSliceRange(SliceRange avro_range)
{



Mime
View raw message