tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-567. Generate Shuffle/job token once per AM. (sseth)
Date Fri, 18 Oct 2013 02:03:40 GMT
Updated Branches:
  refs/heads/master 9f4e98b0a -> 5f587818d


TEZ-567. Generate Shuffle/job token once per AM. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5f587818
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5f587818
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5f587818

Branch: refs/heads/master
Commit: 5f587818de8d55f5045371cd9b1a4140131b4013
Parents: 9f4e98b
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Oct 17 19:03:10 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Oct 17 19:03:33 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 27 +++++-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 96 ++++++--------------
 .../input/BroadcastShuffleManager.java          |  2 +-
 .../runtime/library/shuffle/common/Fetcher.java |  2 +
 4 files changed, 55 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f587818/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1873d62..cf00b31 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -48,6 +49,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -121,7 +123,9 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
 import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.TokenCache;
 
 /**
  * The Map-Reduce Application Master.
@@ -174,6 +178,7 @@ public class DAGAppMaster extends AbstractService {
   private TaskAttemptListener taskAttemptListener;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
+  private Token<JobTokenIdentifier> sessionToken;
   private DagEventDispatcher dagEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
@@ -190,7 +195,7 @@ public class DAGAppMaster extends AbstractService {
   private DAGClientHandler clientHandler;
 
   private DAG currentDAG;
-  private Credentials fsTokens = new Credentials(); // Filled during init
+  private Credentials tokens = new Credentials(); // Filled during init
   private UserGroupInformation currentUser; // Will be setup during init
 
   private AtomicBoolean sessionStopped = new AtomicBoolean(false);
@@ -261,6 +266,13 @@ public class DAGAppMaster extends AbstractService {
     containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
     addIfService(containerHeartbeatHandler, true);
 
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(UUID
+        .randomUUID().toString()));
+    sessionToken = new Token<JobTokenIdentifier>(identifier,
+        jobTokenSecretManager);
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setJobToken(sessionToken, tokens);
+
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
         taskHeartbeatHandler, containerHeartbeatHandler);
@@ -464,6 +476,13 @@ public class DAGAppMaster extends AbstractService {
   protected DAG createDAG(DAGPlan dagPB) {
     TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
         dagCounter.incrementAndGet());
+    
+    // Prepare the TaskAttemptListener server for authentication of Containers
+    // TaskAttemptListener gets the information via jobTokenSecretManager.
+    String dagIdString = dagId.toString().replace("application", "job");
+    jobTokenSecretManager.addTokenForJob(dagIdString, sessionToken);
+    LOG.info("Adding job token for " + dagIdString
+        + " to jobTokenSecretManager");
 
     Iterator<PlanKeyValuePair> iter =
         dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
@@ -477,7 +496,7 @@ public class DAGAppMaster extends AbstractService {
     // create single dag
     DAG newDag =
         new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
-            taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
+            taskAttemptListener, jobTokenSecretManager, tokens, clock,
             currentUser.getShortUserName(),
             taskHeartbeatHandler, context);
 
@@ -502,11 +521,11 @@ public class DAGAppMaster extends AbstractService {
                     .getAbsolutePath()));
         Path jobTokenFile =
             new Path(jobSubmitDir, TezConfiguration.APPLICATION_TOKENS_FILE);
-        fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+        tokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
         LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
             + jobTokenFile);
 
-        for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+        for (Token<? extends TokenIdentifier> tk : tokens.getAllTokens()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Token of kind " + tk.getKind()
                 + "in current ugi in the AppMaster for service "

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f587818/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6bc151c..43479c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,10 +35,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -92,9 +89,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
 import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.common.security.TokenCache;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -323,8 +318,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private boolean isUber = false;
   private DAGTerminationCause terminationCause;
   private Credentials credentials;
-  private Token<JobTokenIdentifier> jobToken;
-  private JobTokenSecretManager jobTokenSecretManager;
 
   private long initTime;
   private long startTime;
@@ -358,7 +351,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.writeLock = readWriteLock.writeLock();
 
     this.credentials = fsTokenCredentials;
-    this.jobTokenSecretManager = jobTokenSecretManager;
 
     this.aclsManager = new ApplicationACLsManager(conf);
 
@@ -834,51 +826,42 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       // TODO Metrics
       //dag.metrics.submittedJob(dag);
       //dag.metrics.preparingJob(dag);
-      try {
-        setup(dag);
-
-        // If we have no vertices, fail the dag
-        dag.numVertices = dag.getJobPlan().getVertexCount();
-        if (dag.numVertices == 0) {
-          dag.addDiagnostic("No vertices for dag");
-          dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
-          dag.abortJob(DAGStatus.State.FAILED);
-          return dag.finished(DAGState.FAILED);
-        }
 
-        checkTaskLimits();
+      dag.initTime = dag.clock.getTime();
 
-        // create the vertices
-        for (int i=0; i < dag.numVertices; ++i) {
-          String vertexName = dag.getJobPlan().getVertex(i).getName();
-          VertexImpl v = createVertex(dag, vertexName, i);
-          dag.addVertex(v);
-        }
-
-        createDAGEdges(dag);
-        Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+      // If we have no vertices, fail the dag
+      dag.numVertices = dag.getJobPlan().getVertexCount();
+      if (dag.numVertices == 0) {
+        dag.addDiagnostic("No vertices for dag");
+        dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
+        dag.abortJob(DAGStatus.State.FAILED);
+        return dag.finished(DAGState.FAILED);
+      }
 
-        // setup the dag
-        for (Vertex v : dag.vertices.values()) {
-          parseVertexEdges(dag, edgePlans, v);
-        }
+      checkTaskLimits();
 
-        assignDAGScheduler(dag);
+      // create the vertices
+      for (int i=0; i < dag.numVertices; ++i) {
+        String vertexName = dag.getJobPlan().getVertex(i).getName();
+        VertexImpl v = createVertex(dag, vertexName, i);
+        dag.addVertex(v);
+      }
 
-        // TODO Metrics
-        //dag.metrics.endPreparingJob(dag);
-        return DAGState.INITED;
+      createDAGEdges(dag);
+      Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
 
-      } catch (IOException e) {
-        LOG.warn("Job init failed", e);
-        dag.addDiagnostic("Job init failed : "
-            + StringUtils.stringifyException(e));
-        dag.trySetTerminationCause(DAGTerminationCause.INIT_FAILURE);
-        dag.abortJob(DAGStatus.State.FAILED);
-        // TODO Metrics
-        //dag.metrics.endPreparingJob(dag);
-        return dag.finished(DAGState.FAILED);
+      // setup the dag
+      for (Vertex v : dag.vertices.values()) {
+        parseVertexEdges(dag, edgePlans, v);
       }
+
+      assignDAGScheduler(dag);
+
+      // TODO Metrics
+      //dag.metrics.endPreparingJob(dag);
+      return DAGState.INITED;
+
+
     }
 
     private void createDAGEdges(DAGImpl dag) {
@@ -982,27 +965,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       vertex.setOutputVertices(outVertices);
     }
 
-    protected void setup(DAGImpl job) throws IOException {
-      job.initTime = job.clock.getTime();
-      String dagIdString = job.dagId.toString().replace("application", "job");
-
-      // Prepare the TaskAttemptListener server for authentication of Containers
-      // TaskAttemptListener gets the information via jobTokenSecretManager.
-      JobTokenIdentifier identifier =
-          new JobTokenIdentifier(new Text(dagIdString));
-      job.jobToken =
-          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
-      job.jobToken.setService(identifier.getJobId());
-      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
-      // can authenticate containers(tasks)
-      job.jobTokenSecretManager.addTokenForJob(dagIdString, job.jobToken);
-      LOG.info("Adding job token for " + dagIdString
-          + " to jobTokenSecretManager");
-
-      // Populate the jobToken into job credentials.
-      TokenCache.setJobToken(job.jobToken, job.credentials);
-    }
-
     /**
      * If the number of tasks are greater than the configured value
      * throw an exception that will fail job initialization

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f587818/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 5f77cb1..09652d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -445,7 +445,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
       InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
-    LOG.info("Fetch failed for src: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed);
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier
+ ", connectFailed: " + connectFailed);
     InputReadErrorEvent readError = new InputReadErrorEvent(
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f587818/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 0f41cda..d9bc101 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -166,6 +166,8 @@ public class Fetcher implements Callable<FetchResult> {
       // with the first map, typically lost map. So, penalize only that map
       // and add the rest
       InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
+      LOG.warn("Fetch Failure from host while connecting: " + host
+          + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e);
       fetcherCallback.fetchFailed(host, firstAttempt, false);
       remaining.remove(firstAttempt);
       return new FetchResult(host, port, partition, remaining);


Mime
View raw message