cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eev...@apache.org
Subject svn commit: r991701 - in /cassandra/trunk: interface/cassandra.genavro src/java/org/apache/cassandra/avro/AvroRecordFactory.java src/java/org/apache/cassandra/avro/AvroValidation.java src/java/org/apache/cassandra/avro/CassandraServer.java
Date Wed, 01 Sep 2010 20:53:49 GMT
Author: eevans
Date: Wed Sep  1 20:53:49 2010
New Revision: 991701

URL: http://svn.apache.org/viewvc?rev=991701&view=rev
Log:
avro: get_indexed_slices implementation

Patch by Nick Bailey; reviewed by eevans

Modified:
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Wed Sep  1 20:53:49 2010
@@ -73,6 +73,27 @@ protocol Cassandra {
 		array<string> endpoints;
 	}
 
+    enum IndexOperator {
+        EQ, GTE, GT, LTE, LT
+    }
+
+    record IndexExpression {
+        bytes column_name;
+        IndexOperator op;
+        bytes value;
+    }
+
+    record IndexClause {
+        array<IndexExpression> expressions;
+        bytes start_key;
+        int count;
+    }
+
+    record KeySlice {
+        bytes key;
+        array<ColumnOrSuperColumn> columns;
+    }
+
     record Deletion {
         Clock clock;
         union { bytes, null } super_column;
@@ -222,6 +243,16 @@ protocol Cassandra {
                                            ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException;
 
+    /** 
+     * Returns the subset of columns specified in SlicePredicate for 
+     * the rows matching the IndexClause.
+     */
+    array<KeySlice> get_indexed_slices(ColumnParent column_parent,
+                                      IndexClause index_clause,
+                                      SlicePredicate column_predicate,
+                                      ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException;
+
     /**
      * Returns the number of columns matching a predicate for a particular
      * key, ColumnFamily, and optionally SuperColumn.

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java Wed Sep  1 20:53:49
2010
@@ -110,6 +110,15 @@ public class AvroRecordFactory
         entry.columns = columns;
         return entry;
     }
+
+    public static KeySlice newKeySlice(byte[] key, List<ColumnOrSuperColumn> columns)
{
+        KeySlice slice = new KeySlice();
+        ByteBuffer wrappedKey = (key != null) ? ByteBuffer.wrap(key) : null;
+        slice.key = wrappedKey;
+        slice.columns = columns;
+        return slice;
+    }
+
 }
 
 class ErrorFactory

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Wed Sep  1 20:53:49
2010
@@ -24,6 +24,7 @@ package org.apache.cassandra.avro;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Set;
 
 import org.apache.avro.util.Utf8;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -31,6 +32,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.IClock;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TimestampClock;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
@@ -297,4 +299,19 @@ public class AvroValidation
         else
             validateColumns(keyspace, cp, predicate.column_names);
     }
+
+    static void validateIndexClauses(String keyspace, String columnFamily, IndexClause index_clause)
+    throws InvalidRequestException
+    {
+        if (index_clause.expressions.isEmpty())
+            throw newInvalidRequestException("index clause list may not be empty");
+        Set<byte[]> indexedColumns = Table.open(keyspace).getColumnFamilyStore(columnFamily).getIndexedColumns();
+        for (IndexExpression expression : index_clause.expressions)
+        {
+            if (expression.op.equals(IndexOperator.EQ) && indexedColumns.contains(expression.column_name))
+                return;
+        }
+        throw newInvalidRequestException("No indexed columns present in index clause with
operator EQ");
+    }
+
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Wed Sep  1 20:53:49
2010
@@ -1062,4 +1062,93 @@ public class CassandraServer implements 
         }
         return null;
     }
+
+    @Override
+    public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause
index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("scan");
+
+        try
+        {
+            clientState.hasKeyspaceAccess(Permission.READ_VALUE);
+        }
+        catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) {
+            throw newInvalidRequestException(thriftE);
+        }
+
+        String keyspace = clientState.getKeyspace();
+        AvroValidation.validateColumnParent(keyspace, column_parent);
+        AvroValidation.validatePredicate(keyspace, column_parent, column_predicate);
+        AvroValidation.validateIndexClauses(keyspace, column_parent.column_family.toString(),
index_clause);
+
+        List<Row> rows;
+        try
+        {
+            rows = StorageProxy.scan(keyspace.toString(),
+                                     column_parent.column_family.toString(),
+                                     thriftIndexClause(index_clause),
+                                     thriftSlicePredicate(column_predicate),
+                                     thriftConsistencyLevel(consistency_level));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (TimeoutException e)
+        {
+            throw new TimedOutException();
+        }
+        return avronateKeySlices(rows, column_parent, column_predicate);
+    }
+
+    private List<KeySlice> avronateKeySlices(List<Row> rows, ColumnParent column_parent,
SlicePredicate predicate)
+    {
+        List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
+        boolean reversed = predicate.slice_range != null && predicate.slice_range.reversed;
+        for (Row row : rows)
+        {
+            List<ColumnOrSuperColumn> avronatedColumns = avronateColumnFamily(row.cf,
column_parent.super_column != null, reversed);
+            keySlices.add(newKeySlice(row.key.key, avronatedColumns));
+        }
+
+        return keySlices;
+    }
+    
+    private org.apache.cassandra.thrift.SlicePredicate thriftSlicePredicate(SlicePredicate
avro_pred) {
+        List<byte[]> bufs = new ArrayList<byte[]>();
+        for(ByteBuffer buf : avro_pred.column_names)
+            bufs.add(buf.array());
+
+        return new org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(thriftSliceRange(avro_pred.slice_range));
+    }
+
+    private org.apache.cassandra.thrift.SliceRange thriftSliceRange(SliceRange avro_range)
{
+        return new org.apache.cassandra.thrift.SliceRange(avro_range.start.array(), avro_range.finish.array(),
avro_range.reversed, avro_range.count);
+    }
+
+    private org.apache.cassandra.thrift.IndexClause thriftIndexClause(IndexClause avro_clause)
{
+        List<org.apache.cassandra.thrift.IndexExpression> expressions = new ArrayList<org.apache.cassandra.thrift.IndexExpression>();
+        for(IndexExpression exp : avro_clause.expressions)
+            expressions.add(thriftIndexExpression(exp));
+
+        return new org.apache.cassandra.thrift.IndexClause(expressions, avro_clause.start_key.array(),
avro_clause.count);
+    }
+
+    private org.apache.cassandra.thrift.IndexExpression thriftIndexExpression(IndexExpression
avro_exp) {
+        return new org.apache.cassandra.thrift.IndexExpression(avro_exp.column_name.array(),
thriftIndexOperator(avro_exp.op), avro_exp.value.array());
+    }
+
+    private org.apache.cassandra.thrift.IndexOperator thriftIndexOperator(IndexOperator avro_op)
{
+        switch (avro_op)
+        {
+            case EQ: return org.apache.cassandra.thrift.IndexOperator.EQ;
+            case GTE: return org.apache.cassandra.thrift.IndexOperator.GTE;
+            case GT: return org.apache.cassandra.thrift.IndexOperator.GT;
+            case LTE: return org.apache.cassandra.thrift.IndexOperator.LTE;
+            case LT: return org.apache.cassandra.thrift.IndexOperator.LT;
+        }
+        return null;
+    }
 }



Mime
View raw message