tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-1840. Document TezTaskOutput. (sseth)
Date Tue, 16 Dec 2014 20:43:19 GMT
Repository: tez
Updated Branches:
  refs/heads/master edb59a97a -> 0f462d23a


TEZ-1840. Document TezTaskOutput. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0f462d23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0f462d23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0f462d23

Branch: refs/heads/master
Commit: 0f462d23a22814bd32057249d5dd8f5bba18db9e
Parents: edb59a9
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Dec 16 12:43:00 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Dec 16 12:43:00 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../shuffle/orderedgrouped/MapOutput.java       |   2 +
 .../local/output/TezLocalTaskOutputFiles.java   |  23 +--
 .../common/task/local/output/TezTaskOutput.java | 105 ++++++-----
 .../task/local/output/TezTaskOutputFiles.java   | 173 ++++++++++++++-----
 5 files changed, 191 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0f462d23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4b8e33c..b81cfd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1840. Document TezTaskOutput.
   TEZ-1576. Class level comment in {{MiniTezCluster}} ends abruptly.
   TEZ-1838. tez-ui/src/main/webapp/bower.json gets updated after compiling source code.
   TEZ-1789. Move speculator processing off the central dispatcher.

http://git-wip-us.apache.org/repos/asf/tez/blob/0f462d23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
index 231975b..49baf70 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapOutput.java
@@ -108,6 +108,8 @@ class MapOutput {
     FileSystem fs = FileSystem.getLocal(conf);
     Path outputpath = mapOutputFile.getInputFileForWrite(
         attemptIdentifier.getInputIdentifier().getInputIndex(), size);
+    // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
+    // otherwise fetches for the same task but from different attempts would clobber each
other.
     Path tmpOuputPath = outputpath.suffix(String.valueOf(fetcher));
     long offset = 0;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0f462d23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
index e02011f..f8cd2d3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -45,7 +44,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
     super(conf, uniqueId);
   }
 
-  private LocalDirAllocator lDirAlloc =
+  private final LocalDirAllocator lDirAlloc =
     new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
 
 
@@ -232,24 +231,4 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
         Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR,
         spillNum));
   }
-
-  /** Removes all of the files related to a task. */
-  @Override
-  public void removeAll()
-      throws IOException {
-    deleteLocalFiles(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
-  }
-
-  private String[] getLocalDirs() throws IOException {
-    return conf.getStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-  }
-
-  @SuppressWarnings("deprecation")
-  private void deleteLocalFiles(String subdir) throws IOException {
-    String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0f462d23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index e34e399..1c08f28 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -27,39 +27,46 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 /**
- * Manipulate the working area for the transient store for maps and reduces.
+ * Manipulate the working area for the transient store for components in tez-runtime-library
  *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
+ * This class is used by Inputs and Outputs in tez-runtime-library to identify the directories
+ * that they need to write to / read from for intermediate files.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public abstract class TezTaskOutput {
 
-  protected Configuration conf;
-  protected String uniqueId;
+  protected final Configuration conf;
+  protected final String uniqueId;
 
+  /**
+   * @param conf     the configuration from which local-dirs will be picked up
+   * @param uniqueId a unique identifier for the specific input / output. This is expected
to be
+   *                 unique for all the Inputs / Outputs within a container - i.e. even if
the
+   *                 container is used for multiple tasks, this id should be unique for inputs
/
+   *                 outputs spanning across tasks. This is also expected to be unique across
all
+   *                 tasks for a vertex.
+   */
   public TezTaskOutput(Configuration conf, String uniqueId) {
     this.conf = conf;
     this.uniqueId = uniqueId;
   }
 
   /**
-   * Return the path to local map output file created earlier
+   * Return the path to local output file created earlier.
+   *
+   * TODO TEZ-1855: Remove this. Leads to an extra localdir scan just to update counters.
    *
-   * @return path
+   * @return path the path of the local output file
    * @throws IOException
    */
   public abstract Path getOutputFile() throws IOException;
 
   /**
-   * Create a local map output file name.
+   * Create a local output file name.
    *
    * @param size the size of the file
-   * @return path
+   * @return path the path to write to
    * @throws IOException
    */
   public abstract Path getOutputFileForWrite(long size) throws IOException;
@@ -68,105 +75,117 @@ public abstract class TezTaskOutput {
    * Create a local output file name. This method is meant to be used *only* if
    * the size of the file is not know up front.
    * 
-   * @return path
+   * @return path the path to write to
    * @throws IOException
    */
   public abstract Path getOutputFileForWrite() throws IOException;
-  
+
   /**
-   * Create a local map output file name on the same volume.
+   * Create a local output file name on the same volume.
+   * This is only meant to be used to rename temporary files to their final destination within
the
+   * same volume.
+   *
+   * @return path the path of the output file within the same volume
    */
   public abstract Path getOutputFileForWriteInVolume(Path existing);
 
   /**
-   * Return the path to a local map output index file created earlier
+   * Return the path to a local output index file created earlier
    *
-   * @return path
+   * TODO TEZ-1855: Remove this. Leads to an additional scan to find empty partitions.
+   *
+   * @return path the path of the index file
    * @throws IOException
    */
   public abstract Path getOutputIndexFile() throws IOException;
 
   /**
-   * Create a local map output index file name.
+   * Create a local output index file name.
    *
    * @param size the size of the file
-   * @return path
+   * @return path the path to write the index file to
    * @throws IOException
    */
   public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
 
   /**
-   * Create a local map output index file name on the same volume.
+   * Create a local output index file name on the same volume.
+   * The intended usage of this method is to write the index file on the same volume as the
+   * associated data file.
+   * @return path the path of the index file within the same volume
    */
   public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
 
   /**
-   * Return a local map spill file created earlier.
+   * Return a local output spill file created earlier.
    *
-   * @param spillNumber the number
-   * @return path
+   * @param spillNumber the spill number
+   * @return path the path of the previously written spill file corresponding to the spillNumber
    * @throws IOException
+   * // KKK Try removing this. Unnecessary file scans - can be stored in memory instead.
    */
   public abstract Path getSpillFile(int spillNumber) throws IOException;
 
   /**
-   * Create a local map spill file name.
+   * Create a local output spill file name.
    *
-   * @param spillNumber the number
+   * @param spillNumber the spill number
    * @param size the size of the file
-   * @return path
+   * @return path the path to write the spill file for the specific spillNumber
    * @throws IOException
    */
   public abstract Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException;
 
   /**
-   * Return a local map spill index file created earlier
+   * Return a local output spill index file created earlier
+   *
+   * TODO TEZ-1855: Remove this. Should be possible to cache this instead of requiring a
directory scan.
    *
-   * @param spillNumber the number
-   * @return path
+   * @param spillNumber the spill number
+   * @return path the path of the previously written spill index file corresponding to the
spillNumber
    * @throws IOException
    */
   public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
 
   /**
-   * Create a local map spill index file name.
+   * Create a local output spill index file name.
    *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
+   * @param spillNumber the spill number
+   * @param size the size of the spill file
+   * @return path the path to write the spill index file for the specific spillNumber
    * @throws IOException
    */
   public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
       throws IOException;
 
   /**
-   * Return a local reduce input file created earlier
+   * Return a local input file created earlier
+   *
+   * TODO: TEZ-1855. Remove this.
    *
-   * @param attemptIdentifier The identifier for the source task
-   * @return path
+   * @param attemptIdentifier The identifier for the source
+   * @return path the path to the input file
    * @throws IOException
    */
   public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
 
   /**
-   * Create a local reduce input file name.
+   * Create a local input file name.
    *
-   * @param taskIdentifier The identifier for the source task
+   * @param srcIdentifier The identifier for the source
    * @param size the size of the file
-   * @return path
+   * @return path the path to the input file.
    * @throws IOException
    */
   public abstract Path getInputFileForWrite(
-      int taskIdentifier, long size) throws IOException;
+      int srcIdentifier, long size) throws IOException;
 
   /**
    * Construct a spill file name, given a spill number
    * @param spillNum
-   * @return
+   * @return a spill file name independent of the unique identifier and local directories
    */
   public abstract String getSpillFileName(int spillNum);
 
-  /** Removes all of the files related to a task. */
-  public abstract void removeAll() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0f462d23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 86a83ac..be7e4ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -32,15 +32,11 @@ import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 /**
- * Manipulate the working area for the transient store for maps and reduces.
+ * Manipulate the working area for the transient store for components in tez-runtime-library
  *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
+ * This class is used by Inputs and Outputs in tez-runtime-library to identify the directories
+ * that they need to write to / read from for intermediate files.
  */
-
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TezTaskOutputFiles extends TezTaskOutput {
@@ -55,13 +51,17 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
       + ".index";
 
-  
-
-  // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc =
+  /*
+  Under YARN, this defaults to one or more of the local directories, along with the appId
in the path.
+  Note: The containerId is not part of this.
+  ${yarnLocalDir}/usercache/${user}/appcache/${applicationId}. (Referred to as ${appDir}
later in the docs
+   */
+  private final LocalDirAllocator lDirAlloc =
     new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-  
 
+  /*
+   * ${appDir}/output/${uniqueId}
+   */
   private Path getAttemptOutputDir() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("getAttemptOutputDir: "
@@ -72,11 +72,17 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return the path to local map output file created earlier
+   * Return the path to local output file created earlier.
+   *
+   * ${appDir}/output/${uniqueId}/file.out
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
    *
-   * @return path
+   * @return path the path of the local output file
    * @throws IOException
    */
+  @Override
   public Path getOutputFile() throws IOException {
     Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
@@ -84,12 +90,18 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map output file name.
+   * Create a local output file name.
+   *
+   * ${appDir}/output/${uniqueId}/file.out
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
    *
    * @param size the size of the file
-   * @return path
+   * @return path the path to write to
    * @throws IOException
    */
+  @Override
   public Path getOutputFileForWrite(long size) throws IOException {
     Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
@@ -97,13 +109,19 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map output file name. This should *only* be used if the size
+   * Create a local output file name. This should *only* be used if the size
    * of the file is not known. Otherwise use the equivalent which accepts a size
    * parameter.
-   * 
-   * @return path
+   *
+   * ${appDir}/output/${uniqueId}/file.out
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
+   *
+   * @return path the path to write to
    * @throws IOException
    */
+  @Override
   public Path getOutputFileForWrite() throws IOException {
     Path attemptOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
@@ -111,8 +129,18 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map output file name on the same volume.
+   * Create a local output file name on the same volume.
+   * This is only meant to be used to rename temporary files to their final destination within
the
+   * same volume.
+   *
+   * ${appDir}/output/${uniqueId}/file.out
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
+   *
+   * @return path the path of the output file within the same volume
    */
+  @Override
   public Path getOutputFileForWriteInVolume(Path existing) {
     Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
     Path attemptOutputDir = new Path(outputDir, uniqueId);
@@ -120,11 +148,17 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return the path to a local map output index file created earlier
+   * Return the path to a local output index file created earlier
+   *
+   * ${appDir}/output/${uniqueId}/file.out.index
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out.index
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
    *
-   * @return path
+   * @return path the path of the index file
    * @throws IOException
    */
+  @Override
   public Path getOutputIndexFile() throws IOException {
     Path attemptIndexOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
@@ -133,12 +167,18 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map output index file name.
+   * Create a local output index file name.
+   *
+   * ${appDir}/output/${uniqueId}/file.out.index
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out.index
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
    *
    * @param size the size of the file
-   * @return path
+   * @return path the path to write the index file to
    * @throws IOException
    */
+  @Override
   public Path getOutputIndexFileForWrite(long size) throws IOException {
     Path attemptIndexOutput =
       new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
@@ -148,8 +188,16 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map output index file name on the same volume.
+   * Create a local output index file name on the same volume.
+   * The intended usage of this method is to write the index file on the same volume as the
+   * associated data file.
+   *
+   * ${appDir}/output/${uniqueId}/file.out.index
+   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out.index
+   *
+   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
    */
+  @Override
   public Path getOutputIndexFileForWriteInVolume(Path existing) {
     Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
     Path attemptOutputDir = new Path(outputDir, uniqueId);
@@ -158,12 +206,16 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return a local map spill file created earlier.
+   * Return a local output spill file created earlier.
    *
-   * @param spillNumber the number
-   * @return path
+   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10003_spill_0.out
+   *
+   * @param spillNumber the spill number
+   * @return path the path of the previously written spill file corresponding to the spillNumber
    * @throws IOException
    */
+  @Override
   public Path getSpillFile(int spillNumber) throws IOException {
     return lDirAlloc.getLocalPathToRead(
         String.format(SPILL_FILE_PATTERN,
@@ -171,13 +223,17 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map spill file name.
+   * Create a local spill file name.
    *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
+   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10003_spill_0.out
+   *
+   * @param spillNumber the spill number
+   * @param size the size of the spill file
+   * @return path the path to write the spill file for the specific spillNumber
    * @throws IOException
    */
+  @Override
   public Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(
@@ -189,9 +245,10 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    * Return a local map spill index file created earlier
    *
    * @param spillNumber the number
-   * @return path
+   * @return path the path of the previously written spill index file corresponding to the
spillNumber
    * @throws IOException
    */
+  @Override
   public Path getSpillIndexFile(int spillNumber) throws IOException {
     return lDirAlloc.getLocalPathToRead(
         String.format(SPILL_INDEX_FILE_PATTERN,
@@ -199,13 +256,17 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Create a local map spill index file name.
+   * Create a local output spill index file name.
    *
-   * @param spillNumber the number
+   * ${appDir}/${uniqueId}_spill_${spillNumber}.out.index
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10003_spill_0.out.index
+   *
+   * @param spillNumber the spill number
    * @param size the size of the file
-   * @return path
+   * @return path the path to write the spill index file for the specific spillNumber
    * @throws IOException
    */
+  @Override
   public Path getSpillIndexFileForWrite(int spillNumber, long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(
@@ -214,36 +275,52 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return a local reduce input file created earlier
+   * Return a local input file created earlier
+   *
+   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
    *
    * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @return path
+   * @return path the path to the input file
    * @throws IOException
    */
+  @Override
   public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
     throw new UnsupportedOperationException("Incompatible with LocalRunner");
   }
 
   /**
-   * Create a local reduce input file name.
+   * Create a local input file name.
    *
-   * @param srcTaskId an identifier for a task
+   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
+   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
+   *
+   * This is currently equivalent to getSpillFileForWrite. Files are not clobbered due to
the uniqueId
+   * being different for Outputs / Inputs within the same task (and across tasks)
+   *
+   * @param srcIdentifier The identifier for the source
    * @param size the size of the file
-   * @return path
+   * @return path the path to the input file.
    * @throws IOException
    */
-  public Path getInputFileForWrite(int srcTaskId,
+  @Override
+  public Path getInputFileForWrite(int srcIdentifier,
       long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcTaskId),
+    return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier),
         size, conf);
   }
 
+  /**
+   * Construct a spill file name, given a spill number
+   *
+   * ${uniqueId}_spill_${spillNumber}.out
+   * e.g. attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
+   *
+   * @param spillNum
+   * @return a spill file name independent of the unique identifier and local directories
+   */
+  @Override
   public String getSpillFileName(int spillNum) {
     return String.format(SPILL_FILE_PATTERN, uniqueId, spillNum);
   }
-
-  /** Removes all of the files related to a task. */
-  public void removeAll() throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
 }


Mime
View raw message