incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-91 - Add FileNamingScheme for custom output file names
Date Wed, 10 Oct 2012 11:52:58 GMT
Updated Branches:
  refs/heads/master 255800fb3 -> 3eb2d3f3b


CRUNCH-91 - Add FileNamingScheme for custom output file names


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3eb2d3f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3eb2d3f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3eb2d3f3

Branch: refs/heads/master
Commit: 3eb2d3f3bcf0d2952a8e9a13df8df2500d1e529b
Parents: 255800f
Author: Gabriel Reid <greid@apache.org>
Authored: Sun Oct 7 22:58:54 2012 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Wed Oct 10 10:13:37 2012 +0200

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java    |    1 -
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |    5 +
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |   54 +++++++---
 .../crunch/impl/mr/plan/MSCROutputHandler.java     |    6 +-
 .../org/apache/crunch/io/FileNamingScheme.java     |   58 ++++++++++
 .../main/java/org/apache/crunch/io/PathTarget.java |   12 ++
 .../crunch/io/SequentialFileNamingScheme.java      |   51 +++++++++
 crunch/src/main/java/org/apache/crunch/io/To.java  |    2 +-
 .../crunch/io/avro/AvroFileSourceTarget.java       |    8 ++-
 .../org/apache/crunch/io/avro/AvroFileTarget.java  |    9 ++-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |   11 ++-
 .../io/impl/ReadableSourcePathTargetImpl.java      |    5 +-
 .../crunch/io/impl/SourcePathTargetImpl.java       |   11 ++-
 .../crunch/io/impl/TableSourcePathTargetImpl.java  |    8 ++-
 .../apache/crunch/io/seq/SeqFileSourceTarget.java  |    9 ++-
 .../crunch/io/seq/SeqFileTableSourceTarget.java    |    8 ++-
 .../org/apache/crunch/io/seq/SeqFileTarget.java    |    8 ++-
 .../crunch/io/text/TextFileSourceTarget.java       |    9 ++-
 .../org/apache/crunch/io/text/TextFileTarget.java  |    8 ++-
 .../apache/crunch/impl/mr/exec/CrunchJobTest.java  |   42 +++++++
 .../crunch/io/SequentialFileNamingSchemeTest.java  |   84 +++++++++++++++
 21 files changed, 376 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
index 0865820..8664550 100644
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
 
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index e2dd39f..80f701b 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -24,6 +24,8 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
 import org.apache.hadoop.conf.Configuration;
 
@@ -57,6 +59,8 @@ public class CrunchJobControl implements Runnable {
   private Map<String, CrunchControlledJob> successfulJobs;
   private Map<String, CrunchControlledJob> failedJobs;
 
+  private Log log = LogFactory.getLog(CrunchJobControl.class);
+
   private long nextJobID;
   private String groupName;
   private int jobPollInterval;
@@ -274,6 +278,7 @@ public class CrunchJobControl implements Runnable {
         checkWaitingJobs();
         startReadyJobs();
       } catch (Exception e) {
+        log.error("Error in run loop", e);
         this.runnerState = ThreadState.STOPPED;
       }
       if (this.runnerState != ThreadState.RUNNING

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
index 74c6ff3..b4981db 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
@@ -19,16 +19,21 @@ package org.apache.crunch.impl.mr.exec;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.impl.mr.plan.MSCROutputHandler;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Lists;
@@ -38,7 +43,7 @@ public class CrunchJob extends CrunchControlledJob {
   private final Log log = LogFactory.getLog(CrunchJob.class);
 
   private final Path workingPath;
-  private final Map<Integer, Path> multiPaths;
+  private final Map<Integer, PathTarget> multiPaths;
   private final boolean mapOnlyJob;
 
   public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException
{
@@ -53,20 +58,21 @@ public class CrunchJob extends CrunchControlledJob {
       // Need to handle moving the data from the output directory of the
       // job to the output locations specified in the paths.
       FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
-      for (Map.Entry<Integer, Path> entry : multiPaths.entrySet()) {
+      for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
         final int i = entry.getKey();
-        final Path dst = entry.getValue();
+        final Path dst = entry.getValue().getPath();
+        FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
 
         Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
         Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
-        FileSystem dstFs = dst.getFileSystem(job.getConfiguration());
+        Configuration conf = job.getConfiguration();
+        FileSystem dstFs = dst.getFileSystem(conf);
         if (!dstFs.exists(dst)) {
           dstFs.mkdirs(dst);
         }
         boolean sameFs = isCompatible(srcFs, dst);
-        int minPartIndex = getMinPartIndex(dst, dstFs);
         for (Path s : srcs) {
-          Path d = getDestFile(s, dst, minPartIndex++);
+          Path d = getDestFile(conf, s, dst, fileNamingScheme);
           if (sameFs) {
             srcFs.rename(s, d);
           } else {
@@ -86,17 +92,18 @@ public class CrunchJob extends CrunchControlledJob {
     }
   }
 
-  private Path getDestFile(Path src, Path dir, int index) {
-    String form = "part-%s-%05d";
+  private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
+      throws IOException {
+    String outputFilename = null;
+    if (mapOnlyJob) {
+      outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
+    } else {
+      outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, CrunchJob.extractPartitionNumber(src.getName()));
+    }
     if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-      form = form + org.apache.avro.mapred.AvroOutputFormat.EXT;
+      outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
     }
-    return new Path(dir, String.format(form, mapOnlyJob ? "m" : "r", index));
-  }
-
-  private int getMinPartIndex(Path path, FileSystem fs) throws IOException {
-    // Quick and dirty way to ensure unique naming in the directory
-    return fs.listStatus(path).length;
+    return new Path(dir, outputFilename);
   }
 
   @Override
@@ -134,4 +141,19 @@ public class CrunchJob extends CrunchControlledJob {
       log.info(getMessage());
     }
   }
+
+  /**
+   * Extract the partition number from a raw reducer output filename.
+   * 
+   * @param fileName The raw reducer output file name
+   * @return The partition number encoded in the filename
+   */
+  static int extractPartitionNumber(String reduceOutputFileName) {
+    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+    if (matcher.find()) {
+      return Integer.parseInt(matcher.group(1), 10);
+    } else {
+      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName +
"' cannot be parsed");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
index b6a41da..36c565e 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -36,7 +36,7 @@ public class MSCROutputHandler implements OutputHandler {
   private final boolean mapOnlyJob;
 
   private DoNode workingNode;
-  private Map<Integer, Path> multiPaths;
+  private Map<Integer, PathTarget> multiPaths;
   private int jobCount;
 
   public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
@@ -54,7 +54,7 @@ public class MSCROutputHandler implements OutputHandler {
   public boolean configure(Target target, PType<?> ptype) {
     if (target instanceof MapReduceTarget) {
       if (target instanceof PathTarget) {
-        multiPaths.put(jobCount, ((PathTarget) target).getPath());
+        multiPaths.put(jobCount, (PathTarget) target);
       }
 
       String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
@@ -71,7 +71,7 @@ public class MSCROutputHandler implements OutputHandler {
     return mapOnlyJob;
   }
 
-  public Map<Integer, Path> getMultiPaths() {
+  public Map<Integer, PathTarget> getMultiPaths() {
     return multiPaths;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
new file mode 100644
index 0000000..cf93651
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Encapsulates rules for naming output files. It is the responsibility of
+ * implementors to avoid file name collisions.
+ */
+public interface FileNamingScheme {
+
+  /**
+   * Get the output file name for a map task. Note that the implementation is
+   * responsible for avoiding naming collisions.
+   * 
+   * @param configuration The configuration of the job for which the map output
+   *          is being written
+   * @param outputDirectory The directory where the output will be written
+   * @return The filename for the output of the map task
+   * @throws IOException if an exception occurs while accessing the output file
+   *           system
+   */
+  String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException;
+
+  /**
+   * Get the output file name for a reduce task. Note that the implementation is
+   * responsible for avoiding naming collisions.
+   * 
+   * @param configuration The configuration of the job for which output is being
+   *          written
+   * @param outputDirectory The directory where the file will be written
+   * @param partitionId The partition of the reduce task being output
+   * @return The filename for the output of the reduce task
+   * @throws IOException if an exception occurs while accessing output file
+   *           system
+   */
+  String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId)
throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
index 884ba43..7a35209 100644
--- a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -19,6 +19,18 @@ package org.apache.crunch.io;
 
 import org.apache.hadoop.fs.Path;
 
+/**
+ * A target whose output goes to a given path on a file system.
+ */
 public interface PathTarget extends MapReduceTarget {
+
   Path getPath();
+
+  /**
+   * Get the naming scheme to be used for outputs being written to an output
+   * path.
+   * 
+   * @return the naming scheme to be used
+   */
+  FileNamingScheme getFileNamingScheme();
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
new file mode 100644
index 0000000..0b2affa
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Default {@link FileNamingScheme} that uses an incrementing sequence number in
+ * order to generate unique file names.
+ */
+public class SequentialFileNamingScheme implements FileNamingScheme {
+
+  @Override
+  public String getMapOutputName(Configuration configuration, Path outputDirectory) throws
IOException {
+    return getSequentialFileName(configuration, outputDirectory, "m");
+  }
+
+  @Override
+  public String getReduceOutputName(Configuration configuration, Path outputDirectory, int
partitionId)
+      throws IOException {
+    return getSequentialFileName(configuration, outputDirectory, "r");
+  }
+
+  private String getSequentialFileName(Configuration configuration, Path outputDirectory,
String jobTypeName)
+      throws IOException {
+    FileSystem fileSystem = FileSystem.get(configuration);
+    int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length;
+
+    return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
index faaa4d8..da92727 100644
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ b/crunch/src/main/java/org/apache/crunch/io/To.java
@@ -36,7 +36,7 @@ public class To {
   }
 
   public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass)
{
-    return new FileTargetImpl(path, formatClass);
+    return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
   }
 
   public static Target avroFile(String pathName) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
index 8b6208d..76103e5 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -17,13 +17,19 @@
  */
 package org.apache.crunch.io.avro;
 
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.fs.Path;
 
 public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T>
{
   public AvroFileSourceTarget(Path path, AvroType<T> atype) {
-    super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
+    this(path, atype, new SequentialFileNamingScheme());
+  }
+
+  public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme)
{
+    super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index 91deac4..3a9e42c 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -19,7 +19,9 @@ package org.apache.crunch.io.avro;
 
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.AvroOutputFormat;
@@ -31,12 +33,17 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 
 public class AvroFileTarget extends FileTargetImpl {
+
   public AvroFileTarget(String path) {
     this(new Path(path));
   }
 
   public AvroFileTarget(Path path) {
-    super(path, AvroOutputFormat.class);
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, AvroOutputFormat.class, fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index ecae0de..00df45e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io.impl;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.Converter;
@@ -32,10 +33,13 @@ public class FileTargetImpl implements PathTarget {
 
   protected final Path path;
   private final Class<? extends FileOutputFormat> outputFormatClass;
+  private final FileNamingScheme fileNamingScheme;
 
-  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass)
{
+  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
+      FileNamingScheme fileNamingScheme) {
     this.path = path;
     this.outputFormatClass = outputFormatClass;
+    this.fileNamingScheme = fileNamingScheme;
   }
 
   @Override
@@ -74,6 +78,11 @@ public class FileTargetImpl implements PathTarget {
   }
 
   @Override
+  public FileNamingScheme getFileNamingScheme() {
+    return fileNamingScheme;
+  }
+
+  @Override
   public boolean equals(Object other) {
     if (other == null || !getClass().equals(other.getClass())) {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
index 465797a..6506816 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.impl;
 
 import java.io.IOException;
 
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
@@ -26,8 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 
 public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T>
implements ReadableSourceTarget<T> {
 
-  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target)
{
-    super(source, target);
+  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target,
FileNamingScheme fileNamingScheme) {
+    super(source, target, fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
index 87f4901..c0d7ce0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io.impl;
 
 import org.apache.crunch.Source;
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
@@ -25,8 +26,11 @@ import org.apache.hadoop.mapreduce.Job;
 
 public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget
{
 
-  public SourcePathTargetImpl(Source<T> source, PathTarget target) {
+  private final FileNamingScheme fileNamingScheme;
+
+  public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme
fileNamingScheme) {
     super(source, target);
+    this.fileNamingScheme = fileNamingScheme;
   }
 
   @Override
@@ -38,4 +42,9 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T>
implements Path
   public Path getPath() {
     return ((PathTarget) target).getPath();
   }
+
+  @Override
+  public FileNamingScheme getFileNamingScheme() {
+    return fileNamingScheme;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
index eb500dd..a8ff639 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/TableSourcePathTargetImpl.java
@@ -19,13 +19,19 @@ package org.apache.crunch.io.impl;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.types.PTableType;
 
 public class TableSourcePathTargetImpl<K, V> extends SourcePathTargetImpl<Pair<K,
V>> implements TableSource<K, V> {
 
   public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target) {
-    super(source, target);
+    this(source, target, new SequentialFileNamingScheme());
+  }
+
+  public TableSourcePathTargetImpl(TableSource<K, V> source, PathTarget target, FileNamingScheme
fileNamingScheme) {
+    super(source, target, fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
index f532472..adc739f 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileSourceTarget.java
@@ -17,17 +17,24 @@
  */
 package org.apache.crunch.io.seq;
 
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
 
 public class SeqFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+
   public SeqFileSourceTarget(String path, PType<T> ptype) {
     this(new Path(path), ptype);
   }
 
   public SeqFileSourceTarget(Path path, PType<T> ptype) {
-    super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path));
+    this(path, ptype, new SequentialFileNamingScheme());
+  }
+
+  public SeqFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme)
{
+    super(new SeqFileSource<T>(path, ptype), new SeqFileTarget(path), fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
index a1fccb5..e13de1d 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableSourceTarget.java
@@ -19,6 +19,8 @@ package org.apache.crunch.io.seq;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.fs.Path;
@@ -32,7 +34,11 @@ public class SeqFileTableSourceTarget<K, V> extends ReadableSourcePathTargetImpl
   }
 
   public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType) {
-    super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path));
+    this(path, tableType, new SequentialFileNamingScheme());
+  }
+
+  public SeqFileTableSourceTarget(Path path, PTableType<K, V> tableType, FileNamingScheme
fileNamingScheme) {
+    super(new SeqFileTableSource<K, V>(path, tableType), new SeqFileTarget(path), fileNamingScheme);
     this.tableType = tableType;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
index c03543a..60e4739 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
@@ -18,6 +18,8 @@
 package org.apache.crunch.io.seq;
 
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -30,7 +32,11 @@ public class SeqFileTarget extends FileTargetImpl {
   }
 
   public SeqFileTarget(Path path) {
-    super(path, SequenceFileOutputFormat.class);
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public SeqFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, SequenceFileOutputFormat.class, fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
index 1a3d522..1d1211e 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
@@ -17,17 +17,24 @@
  */
 package org.apache.crunch.io.text;
 
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
 
 public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T>
{
+
   public TextFileSourceTarget(String path, PType<T> ptype) {
     this(new Path(path), ptype);
   }
 
   public TextFileSourceTarget(Path path, PType<T> ptype) {
-    super(new TextFileSource<T>(path, ptype), new TextFileTarget(path));
+    this(path, ptype, new SequentialFileNamingScheme());
+  }
+
+  public TextFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNamingScheme)
{
+    super(new TextFileSource<T>(path, ptype), new TextFileTarget(path), fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index aa2f8e8..c7e06d3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -18,6 +18,8 @@
 package org.apache.crunch.io.text;
 
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
@@ -43,7 +45,11 @@ public class TextFileTarget extends FileTargetImpl {
   }
 
   public <T> TextFileTarget(Path path) {
-    super(path, null);
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public <T> TextFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, null, fileNamingScheme);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
new file mode 100644
index 0000000..00ad830
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class CrunchJobTest {
+
+  @Test
+  public void testExtractPartitionNumber() {
+    assertEquals(0, CrunchJob.extractPartitionNumber("out1-r-00000"));
+    assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010"));
+    assertEquals(99999, CrunchJob.extractPartitionNumber("out3-r-99999"));
+  }
+
+  @Test
+  public void testExtractPartitionNumber_WithSuffix() {
+    assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010.avro"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExtractPartitionNumber_MapOutputFile() {
+    CrunchJob.extractPartitionNumber("out1-m-00000");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3eb2d3f3/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
b/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
new file mode 100644
index 0000000..467da15
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/SequentialFileNamingSchemeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class SequentialFileNamingSchemeTest {
+
+  // The partition id used for testing. This partition id should be ignored by
+  // the SequentialFileNamingScheme.
+  private static final int PARTITION_ID = 42;
+
+  private SequentialFileNamingScheme namingScheme;
+  private Configuration configuration;
+
+  @Rule
+  public TemporaryFolder tmpOutputDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    configuration = new Configuration();
+    namingScheme = new SequentialFileNamingScheme();
+  }
+
+  @Test
+  public void testGetMapOutputName_EmptyDirectory() throws IOException {
+    assertEquals("part-m-00000",
+        namingScheme.getMapOutputName(configuration, new Path(tmpOutputDir.getRoot().getAbsolutePath())));
+  }
+
+  @Test
+  public void testGetMapOutputName_NonEmptyDirectory() throws IOException {
+    File outputDirectory = tmpOutputDir.getRoot();
+
+    new File(outputDirectory, "existing-1").createNewFile();
+    new File(outputDirectory, "existing-2").createNewFile();
+
+    assertEquals("part-m-00002",
+        namingScheme.getMapOutputName(configuration, new Path(outputDirectory.getAbsolutePath())));
+  }
+
+  @Test
+  public void testGetReduceOutputName_EmptyDirectory() throws IOException {
+    assertEquals("part-r-00000", namingScheme.getReduceOutputName(configuration, new Path(tmpOutputDir.getRoot()
+        .getAbsolutePath()), PARTITION_ID));
+  }
+
+  @Test
+  public void testGetReduceOutputName_NonEmptyDirectory() throws IOException {
+    File outputDirectory = tmpOutputDir.getRoot();
+
+    new File(outputDirectory, "existing-1").createNewFile();
+    new File(outputDirectory, "existing-2").createNewFile();
+
+    assertEquals("part-r-00002",
+        namingScheme.getReduceOutputName(configuration, new Path(outputDirectory.getAbsolutePath()),
PARTITION_ID));
+  }
+
+}


Mime
View raw message