crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-335: Better Configuration compression for multipath input sources. Contributed by Maxim Gurevich.
Date Wed, 29 Jan 2014 05:02:45 GMT
Updated Branches:
  refs/heads/master 234b51aa2 -> ffd8375a2


CRUNCH-335: Better Configuration compression for multipath input sources. Contributed by Maxim
Gurevich.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ffd8375a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ffd8375a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ffd8375a

Branch: refs/heads/master
Commit: ffd8375a22ab42cfc28372516a367e6b7839a5d9
Parents: 234b51a
Author: Josh Wills <jwills@apache.org>
Authored: Tue Jan 28 20:46:47 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Jan 28 21:01:17 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/io/CrunchInputs.java | 22 ++++++++++++++++++--
 .../apache/crunch/io/impl/FileSourceImpl.java   |  4 +---
 2 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ffd8375a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
index bcdcb55..27bd696 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch.io;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -28,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -41,12 +44,23 @@ public class CrunchInputs {
 
   private static final char RECORD_SEP = ',';
   private static final char FIELD_SEP = ';';
+  private static final char PATH_SEP = '|';
   private static final Joiner JOINER = Joiner.on(FIELD_SEP);
   private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
 
   public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex)
{
+    addInputPaths(job, Collections.singleton(path), inputBundle, nodeIndex);
+  }
+
+  public static void addInputPaths(Job job, Collection<Path> paths, FormatBundle inputBundle,
int nodeIndex) {
     Configuration conf = job.getConfiguration();
-    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+    List<String> pathStrs = Lists.newArrayListWithExpectedSize(paths.size());
+    for (Path path : paths) {
+      String pathStr = path.toString();
+      Preconditions.checkArgument(pathStr.indexOf(RECORD_SEP) == -1 && pathStr.indexOf(FIELD_SEP)
== -1 && pathStr.indexOf(PATH_SEP) == -1);
+      pathStrs.add(pathStr);
+    }
+    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), Joiner.on(PATH_SEP).join(pathStrs));
     String existing = conf.get(CRUNCH_INPUTS);
     conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
   }
@@ -68,7 +82,11 @@ public class CrunchInputs {
       if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
         formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
       }
-      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
+      List<Path> formatNodePaths = formatNodeMap.get(inputBundle).get(nodeIndex);
+      String paths = fields.get(2);
+      for (String path : Splitter.on(PATH_SEP).split(paths)) {
+        formatNodePaths.add(new Path(path));
+      }
     }
     return formatNodeMap;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ffd8375a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 766b9b0..1151ad5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -100,9 +100,7 @@ public class FileSourceImpl<T> implements Source<T> {
   public void configureSource(Job job, int inputId) throws IOException {
     // Use Crunch to handle the combined input splits
     job.setInputFormatClass(CrunchInputFormat.class);
-    for (Path path : paths) {
-      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
-    }
+    CrunchInputs.addInputPaths(job, paths, inputBundle, inputId);
   }
 
   public FormatBundle<? extends InputFormat> getBundle() {


Mime
View raw message