cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jo...@apache.org
Subject svn commit: r924340 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/hadoop/
Date Wed, 17 Mar 2010 15:50:00 GMT
Author: johan
Date: Wed Mar 17 15:49:59 2010
New Revision: 924340

URL: http://svn.apache.org/viewvc?rev=924340&view=rev
Log:
Get Hadoop input format sub splits in parallel. Patch by johan, review by jbellis. CASSANDRA-890

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-924331
+/cassandra/branches/cassandra-0.6:922689-924337
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-924331
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-924337
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-924331
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-924337
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-924331
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-924337
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-924331
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-924337
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Mar 17 15:49:59 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-924331
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-924337
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=924340&r1=924339&r2=924340&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed
Mar 17 15:49:59 2010
@@ -23,6 +23,10 @@ package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,34 +87,75 @@ public class ColumnFamilyInputFormat ext
 
         int splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration());
         
-        // cannonical ranges, split into pieces:
-        // for each range, pick a live owner and ask it to compute bite-sized splits
-        // TODO parallelize this thread-per-range
-        Map<TokenRange, List<String>> splitRanges = new HashMap<TokenRange,
List<String>>();
-        for (TokenRange range : masterRangeNodes)
+        // cannonical ranges, split into pieces, fetching the splits in parallel 
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+
+        try
+        {
+            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+            for (TokenRange range : masterRangeNodes)
+            {
+                // for each range, pick a live owner and ask it to compute bite-sized splits
+                splitfutures.add(executor.submit(new SplitCallable(range, splitsize)));
+            }
+    
+            // wait until we have all the results back
+            for (Future<List<InputSplit>> futureInputSplits : splitfutures)
+            {
+                try
+                {
+                    splits.addAll(futureInputSplits.get());
+                } 
+                catch (Exception e)
+                {
+                    throw new IOException("Could not get input splits", e);
+                } 
+            }
+        } 
+        finally
+        {
+            executor.shutdownNow();
+        }
+
+        assert splits.size() > 0;
+        
+        return splits;
+    }
+
+    /**
+     * Gets a token range and splits it up according to the suggested
+     * size into input splits that Hadoop can use. 
+     */
+    class SplitCallable implements Callable<List<InputSplit>>
+    {
+
+        private TokenRange range;
+        private int splitsize;
+        
+        public SplitCallable(TokenRange tr, int splitsize)
         {
-            splitRanges.put(range, getSubSplits(range, splitsize));
+            this.range = tr;
+            this.splitsize = splitsize;
         }
 
-        // turn the sub-ranges into InputSplits
-        ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-        for (Map.Entry<TokenRange, List<String>> entry : splitRanges.entrySet())
+        @Override
+        public List<InputSplit> call() throws Exception
         {
-            TokenRange range = entry.getKey();
-            List<String> tokens = entry.getValue();
+            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+            List<String> tokens = getSubSplits(range, splitsize);
+
+            // turn the sub-ranges into InputSplits
             String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
 
-            int i = 1;
-            for ( ; i < tokens.size(); i++)
+            for (int i = 1; i < tokens.size(); i++)
             {
                 ColumnFamilySplit split = new ColumnFamilySplit(tokens.get(i - 1), tokens.get(i),
endpoints);
                 logger.debug("adding " + split);
                 splits.add(split);
             }
+            return splits;
         }
-        assert splits.size() > 0;
-        
-        return splits;
     }
 
     private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException



Mime
View raw message