tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [14/23] git commit: TEZ-1097. Tez assumes that the scratch directory has to be same as the default filesystem. (hitesh)
Date Fri, 20 Jun 2014 22:35:52 GMT
TEZ-1097. Tez assumes that the scratch directory has to be same as the
default filesystem. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ad7f1cbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ad7f1cbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ad7f1cbe

Branch: refs/heads/branch-0.4.1-incubating
Commit: ad7f1cbed475fed96f5c639e9a90a8ac661a508b
Parents: a141218
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Jun 20 15:16:11 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/AMConfiguration.java  |  5 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 10 +++--
 .../mapreduce/examples/OrderedWordCount.java    |  7 +++-
 .../java/org/apache/tez/test/TestTezJobs.java   | 43 ++++++++++++++++++++
 4 files changed, 57 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad7f1cbe/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 2016ffc..9972e8c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -77,8 +77,9 @@ public class AMConfiguration {
           + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
     }
     try {
-      FileSystem fs = FileSystem.get(amConf);
-      this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+      Path p = new Path(stagingDirStr);
+      FileSystem fs = p.getFileSystem(amConf);
+      this.stagingDir = fs.resolvePath(p);
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad7f1cbe/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 94857b4..76a694f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -371,14 +371,16 @@ public class DAGAppMaster extends AbstractService {
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
 
-    recoveryDataDir = FileSystem.get(conf).makeQualified(new Path(
+    Path recoveryPath = new Path(
         conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
             TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
-              this.appAttemptID.getApplicationId().toString() +
-                  File.separator + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME));
+        this.appAttemptID.getApplicationId().toString() +
+            Path.SEPARATOR + TezConfiguration.DAG_RECOVERY_DATA_DIR_NAME);
+
+    recoveryFS = recoveryPath.getFileSystem(conf);
+    recoveryDataDir = recoveryFS.makeQualified(recoveryPath);
     currentRecoveryDataDir = new Path(recoveryDataDir,
         Integer.toString(this.appAttemptID.getAttemptId()));
-    recoveryFS = FileSystem.get(recoveryDataDir.toUri(), conf);
 
     if (isSession) {
       FileInputStream sessionResourcesStream = null;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad7f1cbe/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 9874f7b..e48ad3c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -347,8 +347,11 @@ public class OrderedWordCount {
         + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
         + Path.SEPARATOR + appId.toString();
     Path stagingDir = new Path(stagingDirStr);
+    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+    pathFs.mkdirs(new Path(stagingDirStr));
+
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
-    stagingDir = fs.makeQualified(stagingDir);
+    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
     
     TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
     TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
@@ -514,7 +517,7 @@ public class OrderedWordCount {
       throw e;
     } finally {
       if (!retainStagingDir) {
-        fs.delete(stagingDir, true);
+        pathFs.delete(stagingDir, true);
       }
       if (useTezSession) {
         LOG.info("Shutting down session");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ad7f1cbe/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index dd2fd75..56162a1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -142,4 +142,47 @@ public class TestTezJobs {
     assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
     ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
   }
+
+  @Test
+  public void testNonDefaultFSStagingDir() throws Exception {
+    SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+    DAG dag = new DAG("TezSleepProcessor");
+    Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+        SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+        Resource.newInstance(1024, 1));
+    dag.addVertex(vertex);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir"
+        + String.valueOf(new Random().nextInt(100000)));
+    FileSystem localFs = FileSystem.getLocal(tezConf);
+    stagingDir = localFs.makeQualified(stagingDir);
+    localFs.mkdirs(stagingDir);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+        new HashMap<String, LocalResource>(), tezConf, null);
+
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+          + dagStatus.getState());
+      Thread.sleep(500l);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+    dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+    assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    assertNotNull(dagStatus.getDAGCounters());
+    assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+    assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+    ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+
+
+  }
+
 }


Mime
View raw message