tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [10/23] git commit: TEZ-1004. AM relocalization doesn't handle conflicting resources correctly. (Sergey Shelukhin via hitesh)
Date Fri, 20 Jun 2014 22:35:48 GMT
TEZ-1004. AM relocalization doesn't handle conflicting resources correctly. (Sergey Shelukhin
via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b82d5f35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b82d5f35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b82d5f35

Branch: refs/heads/branch-0.4.1-incubating
Commit: b82d5f35d227468521337ad5e8ffd2783333f6d2
Parents: c15d3ac
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Apr 3 17:02:22 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  95 +++++++++----
 .../tez/dag/utils/RelocalizationUtils.java      |  26 ++++
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 136 +++++++++++--------
 3 files changed, 178 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b82d5f35/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6a92161..8d1fc1c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -30,6 +30,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -1094,27 +1095,75 @@ public class DAGAppMaster extends AbstractService {
   }
 
   private Map<String, LocalResource> getAdditionalLocalResourceDiff(
-      Map<String, LocalResource> additionalResources) {
+      DAG dag, Map<String, LocalResource> additionalResources) throws TezException
{
     if (additionalResources == null) {
       return Collections.emptyMap();
-    } else {
-      // Check for existing resources.
-      Iterator<Entry<String, LocalResource>> lrIter = additionalResources.entrySet().iterator();
-      while (lrIter.hasNext()) {
-        Entry<String, LocalResource> lrEntry = lrIter.next();
-        LocalResource existing = amResources.get(lrEntry.getKey());
-        if (existing != null) {
-          if (!existing.equals(lrEntry.getValue())) {
-            throw new TezUncheckedException(
-                "Cannot add different additional resources with the same name : "
-                    + lrEntry.getKey() + ", Existing: [" + existing + "], New: ["
-                    + lrEntry.getValue() + "]");
-          } else {
-            lrIter.remove();
-          }
+    }
+    // Check for existing resources.
+    Iterator<Entry<String, LocalResource>> lrIter = additionalResources.entrySet().iterator();
+    while (lrIter.hasNext()) {
+      Entry<String, LocalResource> lrEntry = lrIter.next();
+      LocalResource existing = amResources.get(lrEntry.getKey());
+      if (existing != null) {
+        if (!isSameFile(dag, lrEntry.getKey(), existing, lrEntry.getValue())) {
+          throw new TezUncheckedException(
+              "Cannot add different additional resources with the same name : "
+                  + lrEntry.getKey() + ", Existing: [" + existing + "], New: ["
+                  + lrEntry.getValue() + "]");
+        } else {
+          lrIter.remove();
         }
       }
-      return containerSignatureMatcher.getAdditionalResources(amResources, additionalResources);
+    }
+    return containerSignatureMatcher.getAdditionalResources(amResources, additionalResources);
+  }
+
+  private boolean isSameFile(DAG dag, final String fileName,
+      final LocalResource oldLr, final LocalResource newLr) throws TezException {
+    try {
+      return oldLr.equals(newLr) || dag.getDagUGI().doAs(new PrivilegedExceptionAction<Boolean>()
{
+        @Override
+        public Boolean run() throws Exception {
+          Configuration conf = getConfig();
+          byte[] oldSha = null;
+          try {
+            // The existing file must already be in usercache... let's try to find it.
+            Path localFile = findLocalFileForResource(fileName);
+            if (localFile != null) {
+              oldSha = RelocalizationUtils.getLocalSha(localFile, conf);
+            } else {
+              LOG.warn("Couldn't find local file for " + oldLr);
+            }
+          } catch (Exception ex) {
+            LOG.warn("Error getting SHA from local file for " + oldLr, ex);
+          }
+          if (oldSha == null) { // Well, no dice.
+            oldSha = RelocalizationUtils.getResourceSha(getLocalResourceUri(oldLr), conf);
+          }
+          // Get the new SHA directly from Hadoop stream. If it matches, we already have
the
+          // file, and if it doesn't we are going to fail; no need to download either way.
+          byte[] newSha = RelocalizationUtils.getResourceSha(getLocalResourceUri(newLr),
conf);
+          return Arrays.equals(oldSha, newSha);
+        }
+      });
+    } catch (InterruptedException ex) {
+      throw new TezException(ex);
+    } catch (IOException ex) {
+      throw new TezException(ex);
+    }
+  }
+
+  private static Path findLocalFileForResource(String fileName) {
+    URL localResource = ClassLoader.getSystemClassLoader().getResource(fileName);
+    if (localResource == null) return null;
+    return new Path(localResource.getPath());
+  }
+
+  private static URI getLocalResourceUri(LocalResource input) {
+    try {
+      return TezConverterUtils.getURIFromYarnURL(input.getResource());
+    } catch (URISyntaxException e) {
+      throw new TezUncheckedException("Failed while handling : " + input, e);
     }
   }
 
@@ -1131,11 +1180,7 @@ public class DAGAppMaster extends AbstractService {
 
               @Override
               public URI apply(LocalResource input) {
-                try {
-                  return TezConverterUtils.getURIFromYarnURL(input.getResource());
-                } catch (URISyntaxException e) {
-                  throw new TezUncheckedException("Failed while handling : " + input, e);
-                }
+                return getLocalResourceUri(input);
               }
             }), getConfig());
       } catch (IOException e) {
@@ -1817,8 +1862,8 @@ public class DAGAppMaster extends AbstractService {
         LOG.debug("DAG has vertex " + v.getName());
       }
     }
-    
-    Map<String, LocalResource> lrDiff = getAdditionalLocalResourceDiff(additionalAMResources);
+    Map<String, LocalResource> lrDiff = getAdditionalLocalResourceDiff(
+        newDAG, additionalAMResources);
     if (lrDiff != null) {
       amResources.putAll(lrDiff);
       cumulativeAdditionalResources.putAll(lrDiff);
@@ -1835,7 +1880,7 @@ public class DAGAppMaster extends AbstractService {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    
+
     startDAGExecution(newDAG, lrDiff);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b82d5f35/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index b3e0bcf..aae82f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.utils;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -65,4 +67,28 @@ public class RelocalizationUtils {
     fs.copyToLocalFile(srcPath, dFile);
     return dFile.makeQualified(FileSystem.getLocal(conf).getUri(), cwd);
   }
+
+  public static byte[] getLocalSha(Path path, Configuration conf) throws IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.getLocal(conf).open(path);
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+
+  public static byte[] getResourceSha(URI uri, Configuration conf) throws IOException {
+    InputStream is = null;
+    try {
+      is = FileSystem.get(uri, conf).open(new Path(uri));
+      return DigestUtils.sha256(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b82d5f35/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index d091e53..394df72 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -25,6 +25,8 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.rmi.RemoteException;
+import java.security.CodeSource;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -113,7 +115,6 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 public class TestMRRJobsDAGApi {
 
   private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
@@ -199,35 +200,19 @@ public class TestMRRJobsDAGApi {
   // additional local resource, which is verified by the initializer.
   @Test(timeout = 120000)
   public void testAMRelocalization() throws Exception {
-    Map<String, String> commonEnv = createCommonEnv();
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
-        .valueOf(new Random().nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    TezConfiguration tezConf = new TezConfiguration(
-        mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-        remoteStagingDir.toString());
-
-    Map<String, LocalResource> amLocalResources =
-        new HashMap<String, LocalResource>();
-
-    AMConfiguration amConfig = new AMConfiguration(
-        commonEnv, amLocalResources,
-        tezConf, null);
-    TezSessionConfiguration tezSessionConfig =
-        new TezSessionConfiguration(amConfig, tezConf);
-    TezSession tezSession = new TezSession("testrelocalizationsession", tezSessionConfig);
-    tezSession.start();
-    Assert.assertEquals(TezSessionStatus.INITIALIZING,
-        tezSession.getSessionStatus());
+    Path relocPath = new Path("/tmp/relocalizationfilefound");
+    if (remoteFs.exists(relocPath)) {
+      remoteFs.delete(relocPath, true);
+    }
+    TezSession tezSession = createTezSession();
 
     State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
         tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
     Assert.assertFalse(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
-    
+
     // Start the second job with some additional resources.
-    
+
     // Create a test jar directly to HDFS
     LOG.info("Creating jar for relocalization test");
     Path relocFilePath = new Path("/tmp/test.jar");
@@ -235,10 +220,14 @@ public class TestMRRJobsDAGApi {
     OutputStream os = remoteFs.create(relocFilePath, true);
     createTestJar(os, RELOCALIZATION_TEST_CLASS_NAME);
 
+    // Also upload one of Tez's own JARs to HDFS and add as resource; should be ignored
+    Path tezAppJar = new Path(MiniTezCluster.APPJAR);
+    Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
+    remoteFs.copyFromLocalFile(tezAppJar, tezAppJarRemote);
+
     Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
-    additionalResources.put("test.jar", LocalResource.newInstance(
-        ConverterUtils.getYarnUrlFromPath(relocFilePath), LocalResourceType.FILE,
-        LocalResourceVisibility.PRIVATE, 0, 0));
+    additionalResources.put("test.jar", createLrObjFromPath(relocFilePath));
+    additionalResources.put("TezAppJar.jar", createLrObjFromPath(tezAppJarRemote));
 
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
@@ -249,6 +238,11 @@ public class TestMRRJobsDAGApi {
         tezSession.getSessionStatus());
     Assert.assertTrue(remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
 
+    stopAndVerifyYarnApp(tezSession);
+  }
+
+  private void stopAndVerifyYarnApp(TezSession tezSession) throws TezException,
+      IOException, YarnException {
     ApplicationId appId = tezSession.getApplicationId();
     tezSession.stop();
     Assert.assertEquals(TezSessionStatus.SHUTDOWN,
@@ -276,7 +270,66 @@ public class TestMRRJobsDAGApi {
     Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
         appReport.getFinalApplicationStatus());
   }
-  
+
+
+  @Test(timeout = 120000)
+  public void testAMRelocalizationConflict() throws Exception {
+    Path relocPath = new Path("/tmp/relocalizationfilefound");
+    if (remoteFs.exists(relocPath)) {
+      remoteFs.delete(relocPath, true);
+    }
+
+    // Run a DAG w/o a file.
+    TezSession tezSession = createTezSession();
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertFalse(remoteFs.exists(relocPath));
+
+    // Create a bogus TezAppJar directly to HDFS
+    LOG.info("Creating jar for relocalization test");
+    Path tezAppJar = new Path(MiniTezCluster.APPJAR);
+    Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
+    OutputStream os = remoteFs.create(tezAppJarRemote, true);
+    createTestJar(os, RELOCALIZATION_TEST_CLASS_NAME);
+
+    Map<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
+    additionalResources.put("TezAppJar.jar", createLrObjFromPath(tezAppJarRemote));
+
+    try {
+      testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
+      Assert.fail("should have failed");
+    } catch (Exception ex) {
+      // expected
+    }
+
+    stopAndVerifyYarnApp(tezSession);
+  }
+
+  private LocalResource createLrObjFromPath(Path filePath) {
+    return LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(filePath),
+        LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, 0, 0);
+  }
+
+  private TezSession createTezSession() throws IOException, TezException {
+    Map<String, String> commonEnv = createCommonEnv();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+    Map<String, LocalResource> amLocalResources = new HashMap<String, LocalResource>();
+
+    AMConfiguration amConfig = new AMConfiguration(commonEnv, amLocalResources, tezConf,
null);
+    TezSessionConfiguration tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+    TezSession tezSession = new TezSession("testrelocalizationsession", tezSessionConfig);
+    tezSession.start();
+    Assert.assertEquals(TezSessionStatus.INITIALIZING, tezSession.getSessionStatus());
+    return tezSession;
+  }
+
   // Submits a DAG to AM via RPC after AM has started
   @Test(timeout = 120000)
   public void testMultipleMRRSleepJobViaSession() throws IOException,
@@ -314,32 +367,7 @@ public class TestMRRJobsDAGApi {
     Assert.assertEquals(TezSessionStatus.READY,
         tezSession.getSessionStatus());
 
-    ApplicationId appId = tezSession.getApplicationId();
-    tezSession.stop();
-    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
-        tezSession.getSessionStatus());
-
-    YarnClient yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(mrrTezCluster.getConfig());
-    yarnClient.start();
-
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState().equals(
-          YarnApplicationState.FINISHED)
-          || appReport.getYarnApplicationState().equals(
-              YarnApplicationState.FAILED)
-          || appReport.getYarnApplicationState().equals(
-              YarnApplicationState.KILLED)) {
-        break;
-      }
-    }
-
-    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-    Assert.assertEquals(YarnApplicationState.FINISHED,
-        appReport.getYarnApplicationState());
-    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
-        appReport.getFinalApplicationStatus());
+    stopAndVerifyYarnApp(tezSession);
   }
 
   // Submits a simple 5 stage sleep job using tez session. Then kills it.


Mime
View raw message