cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r881998 [2/2] - in /incubator/cassandra/trunk: ./ interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ test/system/
Date Wed, 18 Nov 2009 23:27:15 GMT
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=881998&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed
Nov 18 23:27:14 2009
@@ -0,0 +1,190 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.ColumnParent;
+import org.apache.cassandra.service.SlicePredicate;
+import org.apache.cassandra.service.SliceRange;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class RangeSliceCommand
+{
+    private static final SliceCommandSerializer serializer = new SliceCommandSerializer();
+    
+    public final String keyspace;
+
+    public final String column_family;
+    public final byte[] super_column;
+
+    public final SlicePredicate predicate;
+
+    public final String start_key;
+    public final String finish_key;
+    public final int max_keys;
+
+    public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate
predicate, String start_key, String finish_key, int max_keys)
+    {
+        this.keyspace = keyspace;
+        column_family = column_parent.getColumn_family();
+        super_column = column_parent.getSuper_column();
+        this.predicate = predicate;
+        this.start_key = start_key;
+        this.finish_key = finish_key;
+        this.max_keys = max_keys;
+    }
+
+    public RangeSliceCommand(RangeSliceCommand cmd, int max_keys)
+    {
+        this(cmd.keyspace,
+             new ColumnParent(cmd.column_family, cmd.super_column),
+             new SlicePredicate(cmd.predicate),
+             cmd.start_key,
+             cmd.finish_key,
+             max_keys);
+
+    }
+
+    public Message getMessage() throws IOException
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        serializer.serialize(this, dob);
+        return new Message(FBUtilities.getLocalAddress(),
+                           StageManager.readStage_,
+                           StorageService.rangeSliceVerbHandler_,
+                           Arrays.copyOf(dob.getData(), dob.getLength()));
+    }
+
+    public static RangeSliceCommand read(Message message) throws IOException
+    {
+        byte[] bytes = message.getMessageBody();
+        DataInputBuffer dib = new DataInputBuffer();
+        dib.reset(bytes, bytes.length);
+        return serializer.deserialize(new DataInputStream(dib));
+    }
+}
+
+class SliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
+{
+    public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(sliceCommand.keyspace);
+        dos.writeUTF(sliceCommand.column_family);
+        dos.writeInt(sliceCommand.super_column == null ? 0 : sliceCommand.super_column.length);
+        if (sliceCommand.super_column != null)
+            dos.write(sliceCommand.super_column);
+
+        TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            byte[] serPred = ser.serialize(sliceCommand.predicate);
+            dos.writeInt(serPred.length);
+            dos.write(serPred);
+        }
+        catch (TException ex)
+        {
+            throw new IOException(ex);
+        }
+
+        dos.writeUTF(sliceCommand.start_key);
+        dos.writeUTF(sliceCommand.finish_key);
+        dos.writeInt(sliceCommand.max_keys);
+    }
+
+    public RangeSliceCommand deserialize(DataInputStream dis) throws IOException
+    {
+        String keyspace = dis.readUTF();
+        String column_family = dis.readUTF();
+
+        int scLength = dis.readInt();
+        byte[] super_column = null;
+        if (scLength > 0)
+            super_column = readBuf(scLength, dis);
+
+        byte[] predBytes = new byte[dis.readInt()];
+        dis.readFully(predBytes);
+        TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
+        SlicePredicate pred =  new SlicePredicate();
+        try
+        {
+            dser.deserialize(pred, predBytes);
+        }
+        catch (TException ex)
+        {
+            throw new IOException(ex);
+        }
+
+        String start_key = dis.readUTF();
+        String finish_key = dis.readUTF();
+        int max_keys = dis.readInt();
+        return new RangeSliceCommand(keyspace,
+                                     new ColumnParent(column_family, super_column),
+                                     pred,
+                                     start_key,
+                                     finish_key,
+                                     max_keys);
+
+    }
+
+    static byte[] readBuf(int len, DataInputStream dis) throws IOException
+    {
+        byte[] buf = new byte[len];
+        int read = 0;
+        while (read < len)
+            read = dis.read(buf, read, len - read);
+        return buf;
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=881998&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Wed Nov
18 23:27:14 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
+public class RangeSliceReply
+{
+    public final List<Row> rows;
+    public final boolean rangeCompletedLocally;
+
+    public RangeSliceReply(List<Row> rows, boolean rangeCompletedLocally)
+    {
+        this.rows = rows;
+        this.rangeCompletedLocally = rangeCompletedLocally;
+    }
+
+    public Message getReply(Message originalMessage) throws IOException
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        dob.writeBoolean(rangeCompletedLocally);
+        dob.writeInt(rows.size());
+        for (Row row : rows)
+        {
+            Row.serializer().serialize(row, dob);
+        }
+        byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
+        return originalMessage.getReply(FBUtilities.getLocalAddress(), data);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "RangeSliceReply{" +
+               "rows=" + StringUtils.join(rows, ",") +
+               ", rangeCompletedLocally=" + rangeCompletedLocally +
+               '}';
+    }
+
+    public static RangeSliceReply read(byte[] body) throws IOException
+    {
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+        boolean completed = bufIn.readBoolean();
+        int rowCount = bufIn.readInt();
+        List<Row> rows = new ArrayList<Row>(rowCount);
+        for (int i = 0; i < rowCount; i++)
+        {
+            rows.add(Row.serializer().deserialize(bufIn));
+        }
+        return new RangeSliceReply(rows, completed);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Nov 18 23:27:14
2009
@@ -118,6 +118,7 @@
         header_.setMessageId(id);
     }    
 
+    // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the
right len
     public Message getReply(InetAddress from, byte[] args)
     {
         Header header = new Header(getMessageId(), from, MessagingService.responseStage_,
MessagingService.responseVerbHandler_);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed
Nov 18 23:27:14 2009
@@ -33,10 +33,8 @@
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.filter.QueryPath;
-import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.Token;
 import org.apache.thrift.TException;
 
 import flexjson.JSONSerializer;
@@ -557,6 +555,60 @@
         return columnFamiliesMap;
     }
 
+    public List<KeySlice> get_range_slice(String keyspace, ColumnParent column_parent,
SlicePredicate predicate, String start_key, String finish_key, int maxRows, int consistency_level)
+    throws InvalidRequestException, UnavailableException, TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("range_slice");
+        if (predicate.getSlice_range() != null)
+            ThriftValidation.validateRange(keyspace, column_parent, predicate.getSlice_range());
+        else
+            ThriftValidation.validateColumns(keyspace, column_parent, predicate.getColumn_names());
+        if (!StorageService.getPartitioner().preservesOrder())
+        {
+            throw new InvalidRequestException("range queries may only be performed against
an order-preserving partitioner");
+        }
+        if (maxRows <= 0)
+        {
+            throw new InvalidRequestException("maxRows must be positive");
+        }
+
+        Map<String, Collection<IColumn>> colMap; // keys are sorted.
+        try
+        {
+            colMap = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent,
predicate, start_key, finish_key, maxRows));
+            if (colMap == null)
+                throw new RuntimeException("KeySlice list should never be null.");
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        List<KeySlice> keySlices = new ArrayList<KeySlice>(colMap.size());
+        for (String key : colMap.keySet())
+        {
+            Collection<IColumn> dbList = colMap.get(key);
+            List<ColumnOrSuperColumn> svcList = new ArrayList<ColumnOrSuperColumn>(dbList.size());
+            for (org.apache.cassandra.db.IColumn col : dbList)
+            {
+                if (col instanceof org.apache.cassandra.db.Column)
+                    svcList.add(new ColumnOrSuperColumn(new org.apache.cassandra.service.Column(col.name(),
col.value(), col.timestamp()), null));
+                else if (col instanceof org.apache.cassandra.db.SuperColumn)
+                {
+                    Collection<IColumn> subICols = col.getSubColumns();
+                    List<org.apache.cassandra.service.Column> subCols = new ArrayList<org.apache.cassandra.service.Column>(subICols.size());
+                    for (IColumn subCol : subICols)
+                        subCols.add(new org.apache.cassandra.service.Column(subCol.name(),
subCol.value(), subCol.timestamp()));
+                    svcList.add(new ColumnOrSuperColumn(null, new org.apache.cassandra.service.SuperColumn(col.name(),
subCols)));
+                }
+            }
+            keySlices.add(new KeySlice(key, svcList));
+        }
+
+        return keySlices;
+    }
+
     public List<String> get_key_range(String tablename, String columnFamily, String
startWith, String stopAt, int maxResults, int consistency_level)
     throws InvalidRequestException, TException, UnavailableException
     {

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=881998&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
Wed Nov 18 23:27:14 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+public class RangeSliceVerbHandler implements IVerbHandler
+{
+
+    private static final Logger logger = Logger.getLogger(RangeSliceVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        try
+        {
+            RangeSliceCommand command = RangeSliceCommand.read(message);
+            RangeSliceReply reply = Table.open(command.keyspace).getColumnFamilyStore(command.column_family).getRangeSlice(
+                    command.super_column,
+                    command.start_key,
+                    command.finish_key,
+                    command.max_keys,
+                    command.predicate.slice_range,
+                    command.predicate.column_names);
+            Message response = reply.getReply(message);
+            if (logger.isDebugEnabled())
+                logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" +
message.getFrom());
+            MessagingService.instance().sendOneWay(response, message.getFrom());
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException(ex);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed
Nov 18 23:27:14 2009
@@ -56,6 +56,7 @@
     private static TimedStatsDeque readStats = new TimedStatsDeque(60000);
     private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000);
     private static TimedStatsDeque writeStats = new TimedStatsDeque(60000);
+
     private StorageProxy() {}
     static
     {
@@ -70,6 +71,23 @@
         }
     }
 
+    private static Comparator<String> keyComparator = new Comparator<String>()
+    {
+        public int compare(String o1, String o2)
+        {
+            IPartitioner p = StorageService.getPartitioner();
+            return p.getDecoratedKeyComparator().compare(p.decorateKey(o1), p.decorateKey(o2));
+        }
+    };
+
+    private static Comparator<Row> rowComparator = new Comparator<Row>()
+    {
+        public int compare(Row r1, Row r2)
+        {
+            return keyComparator.compare(r1.key(), r2.key());
+        }
+    };
+
     /**
      * Use this method to have this RowMutation applied
      * across all replicas. This method will take care
@@ -508,6 +526,95 @@
         return rows;
     }
 
+    static Map<String, Collection<IColumn>> getRangeSlice(RangeSliceCommand rawCommand)
throws IOException, UnavailableException
+    {
+        long startTime = System.currentTimeMillis();
+        TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
+        RangeSliceCommand command = rawCommand;
+
+        InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.start_key);
+        InetAddress startEndpoint = endPoint;
+        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
+
+        TreeSet<Row> allRows = new TreeSet<Row>(rowComparator);
+        do
+        {
+
+            Message message = command.getMessage();
+            if (logger.isDebugEnabled())
+                logger.debug("reading " + command + " from " + message.getMessageId() + "@"
+ endPoint);
+            IAsyncResult iar = MessagingService.instance().sendRR(message, endPoint);
+            byte[] responseBody;
+            try
+            {
+                responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            }
+            catch (TimeoutException ex)
+            {
+                throw new RuntimeException(ex);
+            }
+            RangeSliceReply reply = RangeSliceReply.read(responseBody);
+            List<Row> rangeRows = new ArrayList<Row>(reply.rows);
+
+            // combine these what what has been seen so far.
+            if (rangeRows.size() > 0)
+            {
+                if (allRows.size() > 0)
+                {
+                    if (keyComparator.compare(rangeRows.get(rangeRows.size() - 1).key(),
allRows.first().key()) <= 0)
+                    {
+                        // unlikely, but possible
+                        if (rangeRows.get(rangeRows.size() - 1).equals(allRows.first().key()))
+                        {
+                            rangeRows.remove(rangeRows.size() - 1);
+                        }
+                        // put all from rangeRows into allRows.
+                        allRows.addAll(rangeRows);
+                    }
+                    else if (keyComparator.compare(allRows.last().key(), rangeRows.get(0).key())
<= 0)
+                    {
+                        // common case. deal with simple start/end key overlaps
+                        if (allRows.last().key().equals(rangeRows.get(0)))
+                        {
+                            allRows.remove(allRows.last().key());
+                        }
+                        allRows.addAll(rangeRows); // todo: check logic.
+                    }
+                    else
+                    {
+                        // deal with potential large overlap from scanning the first endpoint,
which contains
+                        // both the smallest and largest keys
+                        allRows.addAll(rangeRows); // todo: check logic.
+                    }
+                }
+                else
+                    allRows.addAll(rangeRows); // todo: check logic.
+            }
+
+            if (allRows.size() >= rawCommand.max_keys || reply.rangeCompletedLocally)
+                break;
+
+            do
+            {
+                endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the
Strategies & modify for RackAwareStrategy
+            }
+            while (!FailureDetector.instance().isAlive(endPoint));
+            int maxResults = endPoint == wrapEndpoint ? rawCommand.max_keys : rawCommand.max_keys
- allRows.size();
+            command = new RangeSliceCommand(command, maxResults);
+        }
+        while (!endPoint.equals(startEndpoint));
+
+        Map<String, Collection<IColumn>> results = new TreeMap<String, Collection<IColumn>>();
+        for (Row row : allRows)
+        {
+            // for now, assume only one cf per row, since that is all we can specify in the
Command.
+            ColumnFamily cf = row.getColumnFamilies().iterator().next();
+            results.put(row.key(),cf.getSortedColumns());
+        }
+        rangeStats.add(System.currentTimeMillis() - startTime);
+        return results;
+    }
+
     static List<String> getKeyRange(RangeCommand rawCommand) throws IOException, UnavailableException
     {
         long startTime = System.currentTimeMillis();
@@ -544,16 +651,7 @@
             {
                 if (allKeys.size() > 0)
                 {
-                    Comparator<String> comparator = new Comparator<String>()
-                    {
-                        public int compare(String o1, String o2)
-                        {
-                            IPartitioner p = StorageService.getPartitioner();
-                            return p.getDecoratedKeyComparator().compare(p.decorateKey(o1),
p.decorateKey(o2));
-                        }
-                    };
-
-                    if (comparator.compare(rangeKeys.get(rangeKeys.size() - 1), allKeys.get(0))
<= 0)
+                    if (keyComparator.compare(rangeKeys.get(rangeKeys.size() - 1), allKeys.get(0))
<= 0)
                     {
                         // unlikely, but possible
                         if (rangeKeys.get(rangeKeys.size() - 1).equals(allKeys.get(0)))
@@ -563,7 +661,7 @@
                         rangeKeys.addAll(allKeys);
                         allKeys = rangeKeys;
                     }
-                    else if (comparator.compare(allKeys.get(allKeys.size() - 1), rangeKeys.get(0))
<= 0)
+                    else if (keyComparator.compare(allKeys.get(allKeys.size() - 1), rangeKeys.get(0))
<= 0)
                     {
                         // common case. deal with simple start/end key overlaps
                         if (allKeys.get(allKeys.size() - 1).equals(rangeKeys.get(0)))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed
Nov 18 23:27:14 2009
@@ -76,6 +76,7 @@
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
     public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
+    public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
 
     private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
@@ -223,6 +224,7 @@
         MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
         MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler()
);
         MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
+        MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new
BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new
BootstrapMetadataVerbHandler() );
@@ -275,7 +277,7 @@
             while (isBootstrapMode)
             {
                 try
-                {
+                {   
                     Thread.sleep(100);
                 }
                 catch (InterruptedException e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
Wed Nov 18 23:27:14 2009
@@ -155,6 +155,8 @@
                 throw new InvalidRequestException("supercolumn name length must not be greater
than " + IColumn.MAX_NAME_LENGTH);
             if (superColumnName.length == 0)
                 throw new InvalidRequestException("supercolumn name must not be empty");
+            if (!DatabaseDescriptor.getColumnFamilyType(keyspace, columnFamilyName).equals("Super"))
+                throw new InvalidRequestException("supercolumn specified to ColumnFamily
" + columnFamilyName + " containing normal columns");
         }
         AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, columnFamilyName,
superColumnName);
         for (byte[] name : column_names)

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=881998&r1=881997&r2=881998&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Wed Nov 18 23:27:14 2009
@@ -512,13 +512,68 @@
         assert L == ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18'], L
 
     def test_get_slice_range(self):
-	_insert_range()
-	_verify_range()
+        _insert_range()
+        _verify_range()
         
     def test_get_slice_super_range(self):
-	_insert_super_range()
-	_verify_super_range()
+        _insert_super_range()
+        _verify_super_range()
+
+    def test_get_range_slice_super(self):
+        for key in ['key1', 'key2', 'key3', 'key4', 'key5']:
+            for cname in ['col1', 'col2', 'col3', 'col4', 'col5']:
+                client.insert('Keyspace2', key, ColumnPath('Super3', 'sc1', cname), 'v-'
+ cname, 0, ConsistencyLevel.ONE)
+
+        cp = ColumnParent('Super3', 'sc1')
+        result = client.get_range_slice("Keyspace2", cp, SlicePredicate(column_names=['col1',
'col3']), 'key2', 'key4', 5, ConsistencyLevel.ONE)
+        assert len(result) == 3
+        sc = result[0].columns[0].super_column
+        assert sc.columns[0].name == 'col1'
+        assert sc.columns[1].name == 'col3'
+
+        cp = ColumnParent('Super3')
+        result = client.get_range_slice("Keyspace2", cp, SlicePredicate(column_names=['sc1']),
'key2', 'key4', 5, ConsistencyLevel.ONE)
+        assert len(result) == 3
+        assert list(set(row.columns[0].super_column.name for row in result))[0] == 'sc1'
         
+    def test_get_range_slice(self):
+        for key in ['key1', 'key2', 'key3', 'key4', 'key5']:
+            for cname in ['col1', 'col2', 'col3', 'col4', 'col5']:
+                client.insert('Keyspace1', key, ColumnPath('Standard1', column=cname), 'v-'
+ cname, 0, ConsistencyLevel.ONE)
+        cp = ColumnParent('Standard1')
+
+        # test column_names predicate
+        result = client.get_range_slice("Keyspace1", cp, SlicePredicate(column_names=['col1',
'col3']), 'key2', 'key4', 5, ConsistencyLevel.ONE)
+        assert len(result) == 3
+        assert result[0].columns[0].column.name == 'col1'
+        assert result[0].columns[1].column.name == 'col3'
+
+        # row limiting via count.
+        result = client.get_range_slice("Keyspace1", cp, SlicePredicate(column_names=['col1',
'col3']), 'key2', 'key4', 1, ConsistencyLevel.ONE)
+        assert len(result) == 1
+
+        # test column slice predicate
+        result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2',
finish='col4', reversed=False, count=5)), 'key1', 'key2', 5, ConsistencyLevel.ONE)
+        assert len(result) == 2
+        assert result[0].key == 'key1'
+        assert result[1].key == 'key2'
+        assert len(result[0].columns) == 3
+        assert result[0].columns[0].column.name == 'col2'
+        assert result[0].columns[2].column.name == 'col4'
+
+        # col limiting via count
+        result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2',
finish='col4', reversed=False, count=2)), 'key1', 'key2', 5, ConsistencyLevel.ONE)
+        assert len(result[0].columns) == 2
+
+        # and reversed 
+        result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col4',
finish='col2', reversed=True, count=5)), 'key1', 'key2', 5, ConsistencyLevel.ONE)
+        assert result[0].columns[0].column.name == 'col2'
+        assert result[0].columns[2].column.name == 'col4'
+
+        # row limiting via count
+        result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2',
finish='col4', reversed=False, count=5)), 'key1', 'key2', 1, ConsistencyLevel.ONE)
+        assert len(result) == 1
+    
     def test_get_slice_by_names(self):
         _insert_range()
         p = SlicePredicate(column_names=['c1', 'c2'])
@@ -536,17 +591,17 @@
     def test_multiget(self):
         """Insert multiple keys and retrieve them using the multiget interface"""
 
-        """Generate a list of 10 keys and insert them"""
+        # Generate a list of 10 keys and insert them
         num_keys = 10
         keys = ['key'+str(i) for i in range(1, num_keys+1)]
         _insert_multi(keys)
 
-        """Retrieve all 10 keys"""
+        # Retrieve all 10 keys
         rows = client.multiget('Keyspace1', keys, ColumnPath('Standard1', column='c1'), ConsistencyLevel.ONE)
         keys1 = rows.keys().sort()
         keys2 = keys.sort()
 
-        """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn
is what was inserted"""
+        # Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn
is what was inserted
         for key in keys:
             assert rows.has_key(key) == True
             assert rows[key] == ColumnOrSuperColumn(column=Column(timestamp=0, name='c1',
value='value1'))
@@ -554,18 +609,18 @@
     def test_multiget_slice(self):
         """Insert multiple keys and retrieve them using the multiget_slice interface"""
 
-        """Generate a list of 10 keys and insert them"""
+        # Generate a list of 10 keys and insert them
         num_keys = 10
         keys = ['key'+str(i) for i in range(1, num_keys+1)]
         _insert_multi(keys)
 
-        """Retrieve all 10 key slices"""
+        # Retrieve all 10 key slices
         rows = _big_multislice('Keyspace1', keys, ColumnParent('Standard1'))
         keys1 = rows.keys().sort()
         keys2 = keys.sort()
 
         columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]
-        """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn
is what was inserted"""
+        # Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn
is what was inserted
         for key in keys:
             assert rows.has_key(key) == True
             assert columns == rows[key]



Mime
View raw message