cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r906209 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/
Date Wed, 03 Feb 2010 20:00:47 GMT
Author: jbellis
Date: Wed Feb  3 20:00:46 2010
New Revision: 906209

URL: http://svn.apache.org/viewvc?rev=906209&view=rev
Log:
allow wrapped range queries.  patch by jbellis; reviewed by stuhood for CASSANDRA-758

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreUtils.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Feb  3 20:00:46 2010
@@ -415,7 +415,7 @@
         return switchMemtable(memtable_, true);
     }
 
-    void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
+    public void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
     {
         Future<?> future = forceFlush();
         if (future != null)
@@ -937,9 +937,11 @@
        range_slice.  still opens one randomaccessfile per key, which sucks.  something like
compactioniterator
        would be better.
      */
-    private RangeReply getKeyRange(final DecoratedKey startWith, final DecoratedKey stopAt,
int maxResults)
+    private boolean getKeyRange(List<String> keys, final DecoratedKey startWith, final
DecoratedKey stopAt, int maxResults)
     throws IOException, ExecutionException, InterruptedException
     {
+        // getKeyRange requires start <= stop.  getRangeSlice handles range wrapping if
necessary.
+        assert stopAt.isEmpty() || startWith.compareTo(stopAt) <= 0;
         // create a CollatedIterator that will return unique keys from different sources
         // (current memtable, historical memtables, and SSTables) in the correct order.
         List<Iterator<DecoratedKey>> iterators = new ArrayList<Iterator<DecoratedKey>>();
@@ -1009,23 +1011,20 @@
         try
         {
             // pull keys out of the CollatedIterator
-            List<String> keys = new ArrayList<String>();
             boolean rangeCompletedLocally = false;
             for (DecoratedKey current : reduced)
             {
                 if (!stopAt.isEmpty() && stopAt.compareTo(current) < 0)
                 {
-                    rangeCompletedLocally = true;
-                    break;
+                    return true;
                 }
                 keys.add(current.key);
                 if (keys.size() >= maxResults)
                 {
-                    rangeCompletedLocally = true;
-                    break;
+                    return true;
                 }
             }
-            return new RangeReply(keys, rangeCompletedLocally);
+            return false;
         }
         finally
         {
@@ -1054,19 +1053,34 @@
     public RangeSliceReply getRangeSlice(byte[] super_column, final DecoratedKey startKey,
final DecoratedKey finishKey, int keyMax, SliceRange sliceRange, List<byte[]> columnNames)
     throws IOException, ExecutionException, InterruptedException
     {
-        RangeReply rr = getKeyRange(startKey, finishKey, keyMax);
-        List<Row> rows = new ArrayList<Row>(rr.keys.size());
+        List<String> keys = new ArrayList<String>();
+        boolean completed;
+        if (finishKey.isEmpty() || startKey.compareTo(finishKey) <= 0)
+        {
+            completed = getKeyRange(keys, startKey, finishKey, keyMax);
+        }
+        else
+        {
+            // wrapped range
+            DecoratedKey emptyKey = new DecoratedKey(StorageService.getPartitioner().getMinimumToken(),
null);
+            completed = getKeyRange(keys, startKey, emptyKey, keyMax);
+            if (!completed)
+            {
+                completed = getKeyRange(keys, emptyKey, finishKey, keyMax);
+            }
+        }
+        List<Row> rows = new ArrayList<Row>(keys.size());
         final QueryPath queryPath =  new QueryPath(columnFamily_, super_column, null);
         final SortedSet<byte[]> columnNameSet = new TreeSet<byte[]>(getComparator());
         if (columnNames != null)
             columnNameSet.addAll(columnNames);
-        for (String key : rr.keys)
+        for (String key : keys)
         {
             QueryFilter filter = sliceRange == null ? new NamesQueryFilter(key, queryPath,
columnNameSet) : new SliceQueryFilter(key, queryPath, sliceRange.start, sliceRange.finish,
sliceRange.reversed, sliceRange.count);
             rows.add(new Row(key, getColumnFamily(filter)));
         }
 
-        return new RangeSliceReply(rows, rr.rangeCompletedLocally);
+        return new RangeSliceReply(rows, completed);
     }
 
     public AbstractType getComparator()

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java Wed Feb  3 20:00:46
2010
@@ -71,4 +71,26 @@
                                  new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY,
false, 10000),
                                  null);
     }
+
+    /**
+     * Writes out a bunch of rows for a single column family.
+     *
+     * @param rows A group of RowMutations for the same table and column family.
+     * @return The ColumnFamilyStore that was used.
+     */
+    public static ColumnFamilyStore writeColumnFamily(List<RowMutation> rms) throws
IOException, ExecutionException, InterruptedException
+    {
+        RowMutation first = rms.get(0);
+        String tablename = first.getTable();
+        String cfname = first.columnFamilyNames().iterator().next();
+
+        Table table = Table.open(tablename);
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+        for (RowMutation rm : rms)
+            rm.apply();
+
+        store.forceBlockingFlush();
+        return store;
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Feb  3 20:00:46 2010
@@ -28,6 +28,9 @@
 
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import java.net.InetAddress;
@@ -62,7 +65,7 @@
         rm.add(new QueryPath("Standard1", null, "Column1".getBytes()), "asdf".getBytes(),
0);
         rm.add(new QueryPath("Standard1", null, "Column2".getBytes()), "asdf".getBytes(),
0);
         rms.add(rm);
-        ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        ColumnFamilyStore store = Util.writeColumnFamily(rms);
 
         Table table = Table.open("Keyspace1");
         List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
@@ -111,7 +114,7 @@
             rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
             rms.add(rm);
         }
-        ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        ColumnFamilyStore store = Util.writeColumnFamily(rms);
 
         List<Range> ranges  = new ArrayList<Range>();
         IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
@@ -126,5 +129,25 @@
     public void testAntiCompaction1() throws IOException, ExecutionException, InterruptedException
     {
         testAntiCompaction("Standard1", 100);
-    }    
+    }
+
+    @Test
+    public void testWrappedRangeQuery() throws IOException, ExecutionException, InterruptedException
+    {
+        List<RowMutation> rms = new LinkedList<RowMutation>();
+        RowMutation rm;
+        rm = new RowMutation("Keyspace2", "key1");
+        rm.add(new QueryPath("Standard1", null, "Column1".getBytes()), "asdf".getBytes(),
0);
+        rms.add(rm);
+        Util.writeColumnFamily(rms);
+
+        rm = new RowMutation("Keyspace2", "key2");
+        rm.add(new QueryPath("Standard1", null, "Column1".getBytes()), "asdf".getBytes(),
0);
+        rms.add(rm);
+        ColumnFamilyStore cfs = Util.writeColumnFamily(rms);
+
+        IPartitioner p = StorageService.getPartitioner();
+        RangeSliceReply result = cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY, p.decorateKey("key2"),
p.decorateKey("key1"), 10, null, Arrays.asList("asdf".getBytes()));
+        assertEquals(2, result.rows.size());
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Wed Feb  3 20:00:46 2010
@@ -38,6 +38,7 @@
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.DatabaseDescriptorTest;
+import org.apache.cassandra.Util;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -111,7 +112,7 @@
         rm = new RowMutation(tablename, "key1");
         rm.add(new QueryPath(cfname, null, "Column1".getBytes()), "asdf".getBytes(), 0);
         rms.add(rm);
-        ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        Util.writeColumnFamily(rms);
 
         // sample
         validator = new Validator(new CFPair(tablename, cfname));
@@ -170,8 +171,8 @@
         rm.add(new QueryPath(cfname, null, "Column1".getBytes()), "asdf".getBytes(), 0);
         rms.add(rm);
         // with two SSTables
-        ColumnFamilyStoreUtils.writeColumnFamily(rms);
-        ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        Util.writeColumnFamily(rms);
+        ColumnFamilyStore store = Util.writeColumnFamily(rms);
         
         TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
         // force a readonly compaction, and wait for it to finish



Mime
View raw message