tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-1106. Tez framework should use a unique subdir for staging data. (Mohammad Kamrul Islam via hitesh)
Date Wed, 18 Jun 2014 01:18:51 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master aed87d2db -> b25c8668d


TEZ-1106. Tez framework should use a unique subdir for staging data. (Mohammad Kamrul Islam
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b25c8668
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b25c8668
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b25c8668

Branch: refs/heads/master
Commit: b25c8668d32fdc4839c5bae1358f4d8396160bd7
Parents: aed87d2
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Jun 17 18:17:24 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Jun 17 18:17:53 2014 -0700

----------------------------------------------------------------------
 tez-api/pom.xml                                 |  11 +
 .../org/apache/tez/client/AMConfiguration.java  |  15 +-
 .../org/apache/tez/client/TezClientUtils.java   |  57 ++--
 .../org/apache/tez/common/TezCommonUtils.java   | 287 +++++++++++++++++++
 .../apache/tez/dag/api/TezConfiguration.java    |   4 +-
 .../apache/tez/common/TestTezCommonUtils.java   | 211 ++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  25 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  23 +-
 .../dag/history/recovery/RecoveryService.java   |  11 +-
 9 files changed, 563 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-api/pom.xml b/tez-api/pom.xml
index 0bfea9e..559d18a 100644
--- a/tez-api/pom.xml
+++ b/tez-api/pom.xml
@@ -57,6 +57,17 @@
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
+     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 9972e8c..9891daa 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -70,19 +71,7 @@ public class AMConfiguration {
     }
 
     this.localResources = localResources;
-    String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
-    if (stagingDirStr == null || stagingDirStr.isEmpty()) {
-      throw new TezUncheckedException("Staging directory for AM resources"
-          + " not specified in config"
-          + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
-    }
-    try {
-      Path p = new Path(stagingDirStr);
-      FileSystem fs = p.getFileSystem(amConf);
-      this.stagingDir = fs.resolvePath(p);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
+    this.stagingDir = TezCommonUtils.getTezBaseStagingPath(amConf);
     this.credentials = credentials;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 89e5219..6cebde9 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -75,6 +74,7 @@ import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.impl.LogUtils;
@@ -104,12 +104,6 @@ import com.google.common.collect.Lists;
 public class TezClientUtils {
 
   private static Log LOG = LogFactory.getLog(TezClientUtils.class);
-
-  public static final FsPermission TEZ_AM_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-  public static final FsPermission TEZ_AM_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
   private static final int UTF8_CHUNK_SIZE = 16 * 1024;
 
   /**
@@ -238,15 +232,15 @@ public class TezClientUtils {
             + ". The directory must " + "be owned by the submitter "
             + currentUser + " or " + "by " + realUser);
       }
-      if (!fsStatus.getPermission().equals(TEZ_AM_DIR_PERMISSION)) {
+      if (!fsStatus.getPermission().equals(TezCommonUtils.TEZ_AM_DIR_PERMISSION)) {
         LOG.info("Permissions on staging directory " + stagingArea + " are "
             + "incorrect: " + fsStatus.getPermission()
             + ". Fixing permissions " + "to correct value "
-            + TEZ_AM_DIR_PERMISSION);
-        fs.setPermission(stagingArea, TEZ_AM_DIR_PERMISSION);
+            + TezCommonUtils.TEZ_AM_DIR_PERMISSION);
+        fs.setPermission(stagingArea, TezCommonUtils.TEZ_AM_DIR_PERMISSION);
       }
     } else {
-      fs.mkdirs(stagingArea, new FsPermission(TEZ_AM_DIR_PERMISSION));
+      TezCommonUtils.mkDirForAM(fs, stagingArea);
     }
     return fs;
   }
@@ -335,8 +329,9 @@ public class TezClientUtils {
     
     FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
         amConfig.getStagingDir());
-    Path binaryConfPath =  new Path(amConfig.getStagingDir(),
-        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    String strAppId = appId.toString();
+    Path tezSysStagingPath = TezCommonUtils.createTezSystemStagingPath(conf, strAppId);
+    Path binaryConfPath = TezCommonUtils.getTezConfStagingPath(tezSysStagingPath);
     binaryConfPath = fs.makeQualified(binaryConfPath);
 
     // Setup resource requirements
@@ -454,8 +449,7 @@ public class TezClientUtils {
         confProtoBuilder.addConfKeyValues(kvp);
       }
       //binary output
-      amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
-          new FsPermission(TEZ_AM_FILE_PERMISSION));
+      amConfPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryConfPath);
       confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
     } finally {
       if(amConfPBOutBinaryStream != null){
@@ -471,9 +465,7 @@ public class TezClientUtils {
         binaryConfLRsrc);
 
     // Create Session Jars definition to be sent to AM as a local resource
-    Path sessionJarsPath = new Path(amConfig.getStagingDir(),
-      TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME + "."
-      + appId.toString());
+    Path sessionJarsPath = TezCommonUtils.getTezSessionJarStagingPath(tezSysStagingPath);
     FSDataOutputStream sessionJarsPBOutStream = null;
     try {
       Map<String, LocalResource> sessionJars =
@@ -483,8 +475,7 @@ public class TezClientUtils {
         binaryConfLRsrc);
       DAGProtos.PlanLocalResourcesProto proto =
         DagTypeConverters.convertFromLocalResources(sessionJars);
-      sessionJarsPBOutStream = FileSystem.create(fs, sessionJarsPath,
-        new FsPermission(TEZ_AM_FILE_PERMISSION));
+      sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath);
       proto.writeDelimitedTo(sessionJarsPBOutStream);
       
       // Write out the initial list of resources which will be available in the AM
@@ -529,8 +520,12 @@ public class TezClientUtils {
       }
 
       // emit protobuf DAG file style
-      Path binaryPath =  new Path(amConfig.getStagingDir(),
-          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
+      Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Stage directory information for AppId :" + appId + " tezSysStagingPath
:"
+            + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath
:"
+            + sessionJarsPath + " binaryPlanPath :" + binaryPath);
+      }
       amConfig.getAMConf().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
           binaryPath.toUri().toString());
 
@@ -540,8 +535,7 @@ public class TezClientUtils {
 
       try {
         //binary output
-        dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
-            new FsPermission(TEZ_AM_FILE_PERMISSION));
+        dagPBOutBinaryStream = TezCommonUtils.createFileForAM(fs, binaryPath);
         dagPB.writeTo(dagPBOutBinaryStream);
       } finally {
         if(dagPBOutBinaryStream != null){
@@ -555,15 +549,13 @@ public class TezClientUtils {
           LocalResourceVisibility.APPLICATION));
 
       if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
-        Path textPath = localizeDagPlanAsText(dagPB, fs,
-            amConfig.getStagingDir(), appId);
+        Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
         localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
             TezClientUtils.createLocalResource(fs,
                 textPath, LocalResourceType.FILE,
                 LocalResourceVisibility.APPLICATION));
       }
     }
-
     Map<ApplicationAccessType, String> acls
         = new HashMap<ApplicationAccessType, String>();
 
@@ -690,14 +682,12 @@ public class TezClientUtils {
     return rsrc;
   }
 
-  private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
-      Path appStagingDir, ApplicationId appId) throws IOException {
-    Path textPath = new Path(appStagingDir,
-        TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
+  private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs, AMConfiguration
amConfig,
+      String strAppId, Path tezSysStagingPath) throws IOException {
+    Path textPath = TezCommonUtils.getTezTextPlanStagingPath(tezSysStagingPath);
     FSDataOutputStream dagPBOutTextStream = null;
     try {
-      dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(
-          TEZ_AM_FILE_PERMISSION));
+      dagPBOutTextStream = TezCommonUtils.createFileForAM(fs, textPath);
       String dagPBStr = dagPB.toString();
       int dagPBStrLen = dagPBStr.length();
       if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
@@ -797,5 +787,4 @@ public class TezClientUtils {
     TokenCache.setSessionToken(sessionToken, credentials);
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
new file mode 100644
index 0000000..a7c1031
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class TezCommonUtils {
+  public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
+      .createImmutable((short) 0700); // rwx--------
+  public static final FsPermission TEZ_AM_FILE_PERMISSION = FsPermission
+      .createImmutable((short) 0644); // rw-r--r--
+  private static final Log LOG = LogFactory.getLog(TezClient.class);
+
+  public static final String TEZ_SYSTEM_SUB_DIR = ".tez";
+
+  /**
+   * <p>
+   * This function returns the staging directory defined in the config with
+   * property name <code>TezConfiguration.TEZ_AM_STAGING_DIR</code>. If the
+   * property is not defined in the conf, Tez uses the value defined as
+   * <code>TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT</code>. In addition, the
+   * function makes sure if the staging directory exists. If not, it creates the
+   * directory with permission <code>TEZ_AM_DIR_PERMISSION</code>.
+   * </p>
+   * 
+   * @param conf
+   *          TEZ configuration
+   * @return Fully qualified staging directory
+   */
+  public static Path getTezBaseStagingPath(Configuration conf) {
+    String stagingDirStr = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+        TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
+    Path baseStagingDir;
+    try {
+      Path p = new Path(stagingDirStr);
+      FileSystem fs = p.getFileSystem(conf);
+      if (!fs.exists(p)) {
+        mkDirForAM(fs, p);
+        LOG.info("Stage directory " + p + " doesn't exist and is created");
+      }
+      baseStagingDir = fs.resolvePath(p);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    return baseStagingDir;
+  }
+
+  /**
+   * <p>
+   * This function returns the staging directory for TEZ system. Tez creates all
+   * its temporary files under this sub-directory. The function creates a
+   * sub-directory (<code>TEZ_SYSTEM_SUB_DIR</code>/<APP_ID>) under the
base
+   * staging directory, often provided by user.
+   * </p>
+   * 
+   * @param conf
+   *          Tez configuration
+   * @param strAppId
+   *          Application ID as string
+   * @return TEZ system level staging directory used for Tez internals
+   */
+  @Private
+  public static Path createTezSystemStagingPath(Configuration conf, String strAppId) {
+    Path baseStagingPath = getTezBaseStagingPath(conf);
+    Path tezStagingDir;
+    try {
+      tezStagingDir = new Path(baseStagingPath, TEZ_SYSTEM_SUB_DIR);
+      FileSystem fs = tezStagingDir.getFileSystem(conf);
+      tezStagingDir = new Path(tezStagingDir, strAppId);
+      if (!fs.exists(tezStagingDir)) {
+        mkDirForAM(fs, tezStagingDir);
+        LOG.info("Tez system stage directory " + tezStagingDir + " doesn't exist and is created");
+      }
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    return tezStagingDir;
+  }
+
+  /**
+   * <p>
+   * This function returns the staging directory for TEZ system. Tez creates all
+   * its temporary files under this sub-directory. The function normally doesn't
+   * creates any sub-directory under the base staging directory.
+   * </p>
+   * 
+   * @param conf
+   *          Tez configuration
+   * @param strAppId
+   *          Application ID as string
+   * @return TEZ system level staging directory used for Tez internals
+   */
+  @Private
+  public static Path getTezSystemStagingPath(Configuration conf, String strAppId) {
+    Path baseStagingPath = getTezBaseStagingPath(conf);
+    Path tezStagingDir;
+    tezStagingDir = new Path(baseStagingPath, TEZ_SYSTEM_SUB_DIR);
+    tezStagingDir = new Path(tezStagingDir, strAppId);
+    return tezStagingDir;
+  }
+
+  /**
+   * <p>
+   * Returns a path to store binary configuration
+   * </p>
+   * 
+   * @param tezSysStagingPath
+   *          TEZ system level staging directory used for Tez internals
+   * @return path to configuration
+   */
+  @Private
+  public static Path getTezConfStagingPath(Path tezSysStagingPath) {
+    return new Path(tezSysStagingPath, TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store local resources/session jars
+   * </p>
+   * 
+   * @param tezSysStagingPath
+   *          TEZ system level staging directory used for Tez internals
+   * @return path to store the session jars
+   */
+  @Private
+  public static Path getTezSessionJarStagingPath(Path tezSysStagingPath) {
+    return new Path(tezSysStagingPath, TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store binary plan
+   * </p>
+   * 
+   * @param tezSysStagingPath
+   *          TEZ system level staging directory used for Tez internals
+   * @return path to store the plan in binary
+   */
+  @Private
+  public static Path getTezBinPlanStagingPath(Path tezSysStagingPath) {
+    return new Path(tezSysStagingPath, TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store text plan
+   * </p>
+   * 
+   * @param tezSysStagingPath
+   *          TEZ system level staging directory used for Tez internals
+   * @return path to store the plan in text
+   */
+  @Private
+  public static Path getTezTextPlanStagingPath(Path tezSysStagingPath) {
+    return new Path(tezSysStagingPath, TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store recovery information
+   * </p>
+   * 
+   * @param tezSysStagingPath
+   *          TEZ system level staging directory used for Tez internals
+   * @param conf
+   *          Tez configuration
+   * @return App recovery path
+   * @throws IOException
+   */
+  @Private
+  public static Path getRecoveryPath(Path tezSysStagingPath, Configuration conf)
+      throws IOException {
+    Path baseReecoveryPath = new Path(tezSysStagingPath,
+        TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME);
+    FileSystem recoveryFS = baseReecoveryPath.getFileSystem(conf);
+    return recoveryFS.makeQualified(baseReecoveryPath);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store app attempt specific recovery details
+   * </p>
+   * 
+   * @param recoveryPath
+   *          TEZ recovery directory used for Tez internals
+   * @param conf
+   *          Tez configuration
+   * @param attemptID
+   *          Application Attempt Id
+   * @return App attempt specific recovery path
+   */
+  @Private
+  public static Path getAttemptRecoveryPath(Path recoveryPath, int attemptID) {
+    return new Path(recoveryPath, Integer.toString(attemptID));
+  }
+
+  /**
+   * <p>
+   * Returns a path to store DAG specific recovery info
+   * </p>
+   * 
+   * @param attemptRecoverPath
+   *          :TEZ system level staging directory used for Tez internals
+   * @param dagID
+   *          DagID as string
+   * @return DAG specific recovery path
+   */
+  @Private
+  public static Path getDAGRecoveryPath(Path attemptRecoverPath, String dagID) {
+    return new Path(attemptRecoverPath, dagID + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+  }
+
+  /**
+   * <p>
+   * Returns a path to store summary info for recovery
+   * </p>
+   * 
+   * @param attemptRecoverPath
+   *          TEZ system level staging directory used for Tez internals
+   * @return Summary event path used in recovery
+   */
+  @Private
+  public static Path getSummaryRecoveryPath(Path attemptRecoverPath) {
+    return new Path(attemptRecoverPath, TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+  }
+
+  /**
+   * <p>
+   * Create a directory with predefined directory permission
+   * </p>
+   * 
+   * @param fs
+   *          Filesystem
+   * @param dir
+   *          directory to be created
+   * @throws IOException
+   */
+  public static void mkDirForAM(FileSystem fs, Path dir) throws IOException {
+    fs.mkdirs(dir, new FsPermission(TEZ_AM_DIR_PERMISSION));
+  }
+
+  /**
+   * <p>
+   * Create a file with <code>TEZ_AM_FILE_PERMISSION</code> permission and
+   * returns OutputStream
+   * </p>
+   * 
+   * @param fs
+   *          Filesystem
+   * @param filePath
+   *          file path to create the file
+   * @return FSDataOutputStream
+   * @throws IOException
+   */
+  public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException
{
+    return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 18e468a..97a35eb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -320,7 +320,7 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "session.";
 
   public static final String TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME =
-    TEZ_SESSION_PREFIX + "local-resources.pb.file-name";
+    TEZ_SESSION_PREFIX + "local-resources.pb";
 
   /**
    * Time (in seconds) to wait for AM to come up when trying to submit a DAG
@@ -416,7 +416,7 @@ public class TezConfiguration extends Configuration {
   public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
 
   public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
-  public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
+  public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = "summary";
   public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
new file mode 100644
index 0000000..121c673
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -0,0 +1,211 @@
+package org.apache.tez.common;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.common.TezCommonUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTezCommonUtils {
+  private static final String STAGE_DIR = "/tmp/mystage";
+  private static String RESOLVED_STAGE_DIR;
+  private static Configuration conf = new Configuration();;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestTezCommonUtils.class.getName() + "-tmpDir";
+  private static MiniDFSCluster dfsCluster = null;
+  private static FileSystem remoteFs = null;
+  private static final Log LOG = LogFactory.getLog(TestTezCommonUtils.class);
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGE_DIR);
+    LOG.info("Starting mini clusters");
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).racks(null)
+          .build();
+      remoteFs = dfsCluster.getFileSystem();
+      RESOLVED_STAGE_DIR = remoteFs.getUri() + STAGE_DIR;
+      conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  // Testing base staging dir
+  @Test
+  public void testTezBaseStagingPath() throws Exception {
+    Configuration localConf = new Configuration();
+    // Check if default works with localFS
+    localConf.unset(TezConfiguration.TEZ_AM_STAGING_DIR);
+    localConf.set("fs.defaultFS", "file:///");
+    Path stageDir = TezCommonUtils.getTezBaseStagingPath(localConf);
+    Assert.assertEquals(stageDir.toString(), "file:" + TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
+
+    // check if user set something, indeed works
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGE_DIR);
+    stageDir = TezCommonUtils.getTezBaseStagingPath(conf);
+    Assert.assertEquals(stageDir.toString(), RESOLVED_STAGE_DIR);
+  }
+
+  // Testing System staging dir if createed
+  @Test
+  public void testCreateTezSysStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    String expectedStageDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId;
+    String unResolvedStageDir = STAGE_DIR + File.separatorChar + TezCommonUtils.TEZ_SYSTEM_SUB_DIR
+        + File.separatorChar + strAppId;
+
+    Path stagePath = new Path(unResolvedStageDir);
+    FileSystem fs = stagePath.getFileSystem(conf);
+    if (fs.exists(stagePath)) {
+      fs.delete(stagePath, true);
+    }
+    Assert.assertFalse(fs.exists(stagePath));
+    Path stageDir = TezCommonUtils.createTezSystemStagingPath(conf, strAppId);
+    Assert.assertEquals(stageDir.toString(), expectedStageDir);
+    Assert.assertTrue(fs.exists(stagePath));
+  }
+
+  // Testing System staging dir
+  @Test
+  public void testTezSysStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    String expectedStageDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId;
+    Assert.assertEquals(stageDir.toString(), expectedStageDir);
+  }
+
+  // Testing conf staging dir
+  @Test
+  public void testTezConfStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path confStageDir = TezCommonUtils.getTezConfStagingPath(stageDir);
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.TEZ_PB_BINARY_CONF_NAME;
+    Assert.assertEquals(confStageDir.toString(), expectedDir);
+  }
+
+  // Testing session jars staging dir
+  @Test
+  public void testTezSessionJarStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path confStageDir = TezCommonUtils.getTezSessionJarStagingPath(stageDir);
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME;
+    Assert.assertEquals(confStageDir.toString(), expectedDir);
+  }
+
+  // Testing bin plan staging dir
+  @Test
+  public void testTezBinPlanStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path confStageDir = TezCommonUtils.getTezBinPlanStagingPath(stageDir);
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.TEZ_PB_PLAN_BINARY_NAME;
+    Assert.assertEquals(confStageDir.toString(), expectedDir);
+  }
+
+  // Testing text plan staging dir
+  @Test
+  public void testTezTextPlanStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path confStageDir = TezCommonUtils.getTezTextPlanStagingPath(stageDir);
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME;
+    Assert.assertEquals(confStageDir.toString(), expectedDir);
+  }
+
+  // Testing recovery path staging dir
+  @Test
+  public void testTezRecoveryStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path confStageDir = TezCommonUtils.getRecoveryPath(stageDir, conf);
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME;
+    Assert.assertEquals(confStageDir.toString(), expectedDir);
+  }
+
+  // Testing app attempt specific recovery path staging dir
+  @Test
+  public void testTezAttemptRecoveryStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path recoveryPath = TezCommonUtils.getRecoveryPath(stageDir, conf);
+    Path recoveryStageDir = TezCommonUtils.getAttemptRecoveryPath(recoveryPath, 2);
+
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME + File.separator + "2";
+    Assert.assertEquals(recoveryStageDir.toString(), expectedDir);
+  }
+
+  // Testing DAG specific recovery path staging dir
+  @Test
+  public void testTezDAGRecoveryStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path recoveryPath = TezCommonUtils.getRecoveryPath(stageDir, conf);
+    Path recoveryStageDir = TezCommonUtils.getAttemptRecoveryPath(recoveryPath, 2);
+
+    Path dagRecoveryPathj = TezCommonUtils.getDAGRecoveryPath(recoveryStageDir, "dag_123");
+
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME + File.separator + "2" + File.separator
+        + "dag_123" + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX;
+    Assert.assertEquals(dagRecoveryPathj.toString(), expectedDir);
+  }
+
+  // Testing Summary recovery path staging dir
+  @Test
+  public void testTezSummaryRecoveryStagingPath() throws Exception {
+    String strAppId = "testAppId";
+    Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    Path recoveryPath = TezCommonUtils.getRecoveryPath(stageDir, conf);
+    Path recoveryStageDir = TezCommonUtils.getAttemptRecoveryPath(recoveryPath, 2);
+    Path summaryRecoveryPathj = TezCommonUtils.getSummaryRecoveryPath(recoveryStageDir);
+
+    String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
+        + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
+        + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME + File.separator + "2" + File.separator
+        + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX;
+    Assert.assertEquals(summaryRecoveryPathj.toString(), expectedDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index aed9aa7..7189300 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
@@ -238,6 +239,7 @@ public class DAGAppMaster extends AbstractService {
   private boolean recoveryEnabled;
   private Path recoveryDataDir;
   private Path currentRecoveryDataDir;
+  private Path tezSystemStagingDir;
   private FileSystem recoveryFS;
   /**
    * set of already executed dag names.
@@ -377,18 +379,17 @@ public class DAGAppMaster extends AbstractService {
     this.sessionTimeoutInterval = 1000 * amConf.getInt(
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
-
-    Path recoveryPath = new Path(
-        conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
-            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
-        this.appAttemptID.getApplicationId().toString() +
-            Path.SEPARATOR + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME);
-
-    recoveryFS = recoveryPath.getFileSystem(conf);
-    recoveryDataDir = recoveryFS.makeQualified(recoveryPath);
-    currentRecoveryDataDir = new Path(recoveryDataDir,
-        Integer.toString(this.appAttemptID.getAttemptId()));
-
+    String strAppId = this.appAttemptID.getApplicationId().toString();
+    this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
+    recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf);
+    recoveryFS = recoveryDataDir.getFileSystem(conf);
+    currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
+        appAttemptID.getAttemptId());
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID
+          + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir
+          + " recoveryAttemptDir :" + currentRecoveryDataDir);
+    }
     if (isSession) {
       FileInputStream sessionResourcesStream = null;
       try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 4321daf..85254cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -90,12 +91,11 @@ public class RecoveryParser {
     this.recoveryFS = recoveryFS;
     this.recoveryDataDir = recoveryDataDir;
     this.currentAttemptId = currentAttemptId;
-    this.currentAttemptRecoveryDataDir =
-        getAttemptRecoveryDataDir(recoveryDataDir, currentAttemptId);
+    this.currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,
+        currentAttemptId);
     recoveryBufferSize = dagAppMaster.getConfig().getInt(
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
-
     this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
   }
 
@@ -246,11 +246,6 @@ public class RecoveryParser {
     }
   }
 
-  private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
-      int attemptId) {
-    return new Path(recoveryDataDir, Integer.toString(attemptId));
-  }
-
   public static void main(String argv[]) throws IOException {
     // TODO clean up with better usage and error handling
     Configuration conf = new Configuration();
@@ -270,10 +265,8 @@ public class RecoveryParser {
     }
   }
 
-  private Path getSummaryPath(Path recoveryDataDir) {
-    return new Path(recoveryDataDir,
-        dagAppMaster.getAttemptID().getApplicationId().toString()
-        + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+  private Path getSummaryPath(Path attemptRrecoveryDataDir) {
+    return TezCommonUtils.getSummaryRecoveryPath(attemptRrecoveryDataDir);
   }
 
   private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
@@ -339,7 +332,7 @@ public class RecoveryParser {
     LOG.info("Looking for the correct attempt directory to recover from");
     int foundPreviousAttempt = -1;
     for (int i = currentAttemptId - 1; i > 0; --i) {
-      Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+      Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
       LOG.info("Looking at attempt directory, path=" + attemptPath);
       Path fatalErrorOccurred = new Path(attemptPath,
           RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
@@ -371,7 +364,7 @@ public class RecoveryParser {
       LOG.info("Did not find any attempt dir that had data recovered file."
           + " Looking for oldest summary file");
       for (int i = 1; i < currentAttemptId; ++i) {
-        Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+        Path attemptPath = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, i);
         Path summaryPath = getSummaryPath(attemptPath);
         if (recoveryFS.exists(summaryPath)) {
           LOG.info("Found summary file in attempt directory"
@@ -391,7 +384,7 @@ public class RecoveryParser {
       foundPreviousAttempt = 1;
     }
 
-    return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
+    return TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, foundPreviousAttempt);
   }
 
   private static class DAGSummaryData {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b25c8668/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 840ad1d..db0dcf1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -292,9 +293,10 @@ public class RecoveryService extends AbstractService {
           + ", eventType=" + eventType);
     }
     if (summaryStream == null) {
-      Path summaryPath = new Path(recoveryPath,
-          appContext.getApplicationID()
-              + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+      Path summaryPath = TezCommonUtils.getSummaryRecoveryPath(recoveryPath);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppId :" + appContext.getApplicationID() + " summaryPath " + summaryPath);
+      }
       if (!recoveryDirFS.exists(summaryPath)) {
         summaryStream = recoveryDirFS.create(summaryPath, false,
             bufferSize);
@@ -331,8 +333,7 @@ public class RecoveryService extends AbstractService {
     }
 
     if (!outputStreamMap.containsKey(dagID)) {
-      Path dagFilePath = new Path(recoveryPath,
-          dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+      Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString());
       FSDataOutputStream outputStream;
       if (recoveryDirFS.exists(dagFilePath)) {
         if (LOG.isDebugEnabled()) {


Mime
View raw message