cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1145731 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Date Tue, 12 Jul 2011 19:26:11 GMT
Author: jbellis
Date: Tue Jul 12 19:26:11 2011
New Revision: 1145731

URL: http://svn.apache.org/viewvc?rev=1145731&view=rev
Log:
add KeyRangeoption to Hadoop inputformat
patch by Mck SembWever; reviewed by jbellis for CASSANDRA-1125

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jul 12 19:26:11 2011
@@ -24,6 +24,7 @@
  * add option to specify limit for get_slice in the CLI (CASSANDRA-2646)
  * decrease HH page size (CASSANDRA-2832)
  * reset cli keyspace after dropping the current one (CASSANDRA-2763)
+ * add KeyRange option to Hadoop inputformat (CASSANDRA-1125)
 
 
 0.8.1

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Tue Jul 12 19:26:11 2011
@@ -35,8 +35,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.TokenRange;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.hadoop.conf.Configuration;
@@ -102,10 +105,44 @@ public class ColumnFamilyInputFormat ext
         try
         {
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+            IPartitioner partitioner = null;
+            Range jobRange = null;
+            if (jobKeyRange != null)
+            {
+                partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
+                assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..)
can only be used with a order preserving paritioner";
+                assert jobKeyRange.start_key == null : "only start_token supported";
+                assert jobKeyRange.end_key == null : "only end_token supported";
+                jobRange = new Range(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+                                     partitioner.getTokenFactory().fromString(jobKeyRange.end_token),
+                                     partitioner);
+            }
+
             for (TokenRange range : masterRangeNodes)
             {
+                if (jobRange == null)
+                {
                     // for each range, pick a live owner and ask it to compute bite-sized
splits
                     splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                }
+                else
+                {
+                    Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
+                                               partitioner.getTokenFactory().fromString(range.end_token),
+                                               partitioner);
+
+                    if (dhtRange.intersects(jobRange))
+                    {
+                        Set<Range> intersections = dhtRange.intersectionWith(jobRange);
+                        assert intersections.size() == 1 : "wrapping ranges not yet supported";
+                        Range intersection = intersections.iterator().next();
+                        range.start_token = partitioner.getTokenFactory().toString(intersection.left);
+                        range.end_token = partitioner.getTokenFactory().toString(intersection.right);
+                        // for each range, pick a live owner and ask it to compute bite-sized
splits
+                        splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                    }
+                }
             }
 
             // wait until we have all the results back

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Tue Jul 12 19:26:11 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.hadoop;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,6 +43,7 @@ public class ConfigHelper
     private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
     private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
     private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
+    private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
     private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
@@ -195,6 +197,53 @@ public class ConfigHelper
         return predicate;
     }
 
+    /**
+     * Set the KeyRange to limit the rows.
+     * @param conf Job configuration you are about to run
+     */
+    public static void setInputRange(Configuration conf, String startToken, String endToken)
+    {
+        KeyRange range = new KeyRange().setStart_token(startToken).setEnd_token(endToken);
+        conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(range));
+    }
+
+    /** may be null if unset */
+    public static KeyRange getInputKeyRange(Configuration conf)
+    {
+        String str = conf.get(INPUT_KEYRANGE_CONFIG);
+        return null != str ? keyRangeFromString(str) : null;
+    }
+
+    private static String keyRangeToString(KeyRange keyRange)
+    {
+        assert keyRange != null;
+        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            return FBUtilities.bytesToHex(serializer.serialize(keyRange));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static KeyRange keyRangeFromString(String st)
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+        KeyRange keyRange = new KeyRange();
+        try
+        {
+            deserializer.deserialize(keyRange, FBUtilities.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return keyRange;
+    }
+
     public static String getInputKeyspace(Configuration conf)
     {
         return conf.get(INPUT_KEYSPACE_CONFIG);



Mime
View raw message