tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-260 Fix Tez after YARN-694 (bikas)
Date Wed, 19 Jun 2013 01:26:25 GMT
Updated Branches:
  refs/heads/master 524f4624c -> a037d9693


TEZ-260 Fix Tez after YARN-694 (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a037d969
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a037d969
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a037d969

Branch: refs/heads/master
Commit: a037d969381e5289b4c94d4f40d07f4321fed953
Parents: 524f462
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Jun 18 18:22:41 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Jun 18 18:22:41 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/dag/app/AppContext.java |  5 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 29 +++++++---
 .../dag/app/launcher/ContainerLauncherImpl.java | 61 ++++++--------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  6 ++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  7 ++-
 5 files changed, 54 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a037d969/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index fb9afbd..3d6a946 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -19,11 +19,13 @@
 package org.apache.tez.dag.app;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.dag.DAG;
@@ -39,6 +41,9 @@ import org.apache.tez.dag.records.TezDAGID;
 @InterfaceAudience.Private
 public interface AppContext {
 
+  ConcurrentMap<String, Token> getNMTokens();
+  void setNMTokens(ConcurrentMap<String, org.apache.hadoop.yarn.api.records.Token>
tokens);
+  
   DAGAppMaster getAppMaster();
   
   ApplicationId getApplicationID();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a037d969/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 58fe0fd..118410e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,6 +27,8 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -275,19 +277,20 @@ public class DAGAppMaster extends CompositeService {
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
 
+    taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+        clientRpcServer);
+    addIfService(taskSchedulerEventHandler);
+    dispatcher.register(AMSchedulerEventType.class,
+        taskSchedulerEventHandler);
+
     //    TODO XXX: Rename to NMComm
     //    corresponding service to launch allocated containers via NodeManager
     //    containerLauncher = createNMCommunicator(context);
+    // needs to start after TaskScheduler for nmtokens to be available
     containerLauncher = createContainerLauncher(context);
     addIfService(containerLauncher);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
 
-    taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
-        clientRpcServer);
-    addIfService(taskSchedulerEventHandler);
-    dispatcher.register(AMSchedulerEventType.class,
-        taskSchedulerEventHandler);
-
     historyEventHandler = new HistoryEventHandler(context);
     addIfService(historyEventHandler);
     dispatcher.register(HistoryEventType.class, historyEventHandler);
@@ -749,11 +752,11 @@ public class DAGAppMaster extends CompositeService {
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock rLock = rwLock.readLock();
     private final Lock wLock = rwLock.writeLock();
-
+    private ConcurrentMap<String, org.apache.hadoop.yarn.api.records.Token> nmTokens;
     public RunningAppContext(TezConfiguration config) {
       this.conf = config;
     }
-
+    
     @Override
     public DAGAppMaster getAppMaster() {
       return DAGAppMaster.this;
@@ -852,6 +855,16 @@ public class DAGAppMaster extends CompositeService {
         wLock.unlock();
       }
     }
+    
+    @Override
+    public void setNMTokens(ConcurrentMap<String, org.apache.hadoop.yarn.api.records.Token>
tokens) {
+      nmTokens = tokens;
+    }
+
+    @Override
+    public ConcurrentMap<String, org.apache.hadoop.yarn.api.records.Token> getNMTokens()
{
+      return nmTokens;
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a037d969/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index db91622..2c10a59 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -19,9 +19,7 @@
 package org.apache.tez.dag.app.launcher;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,21 +32,17 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -89,8 +83,9 @@ public class ContainerLauncherImpl extends AbstractService implements
   private Thread eventHandlingThread;
   protected BlockingQueue<NMCommunicatorEvent> eventQueue =
       new LinkedBlockingQueue<NMCommunicatorEvent>();
-  YarnRPC rpc;
   private Clock clock;
+  private ContainerManagementProtocolProxy cmProxy;
+
 
   private Container getContainer(NMCommunicatorEvent event) {
     ContainerId id = event.getContainerId();
@@ -146,7 +141,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         return;
       }
       
-      ContainerManagementProtocol proxy = null;
+      ContainerManagementProtocolProxyData proxy = null;
       try {
 
         proxy = getCMProxy(containerID, containerMgrAddress,
@@ -161,7 +156,8 @@ public class ContainerLauncherImpl extends AbstractService implements
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerToken(event.getContainerToken());
         startRequest.setContainerLaunchContext(containerLaunchContext);
-        StartContainerResponse response = proxy.startContainer(startRequest);
+        StartContainerResponse response = 
+            proxy.getContainerManagementProtocol().startContainer(startRequest);
 
         ByteBuffer portInfo = response.getAllServicesMetaData().get(
             ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
@@ -195,7 +191,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         sendContainerLaunchFailedMsg(containerID, message);
       } finally {
         if (proxy != null) {
-          ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+          cmProxy.mayBeCloseProxy(proxy);
         }
       }
     }
@@ -212,7 +208,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         LOG.info("Sending a stop request to the NM for ContainerId: "
             + containerID);
 
-        ContainerManagementProtocol proxy = null;
+        ContainerManagementProtocolProxyData proxy = null;
         try {
           proxy = getCMProxy(this.containerID, this.containerMgrAddress,
               this.containerToken);
@@ -221,7 +217,7 @@ public class ContainerLauncherImpl extends AbstractService implements
             StopContainerRequest stopRequest = Records
               .newRecord(StopContainerRequest.class);
             stopRequest.setContainerId(this.containerID);
-            proxy.stopContainer(stopRequest);
+            proxy.getContainerManagementProtocol().stopContainer(stopRequest);
             // If stopContainer returns without an error, assuming the stop made
             // it over to the NodeManager.
           context.getEventHandler().handle(
@@ -239,7 +235,7 @@ public class ContainerLauncherImpl extends AbstractService implements
           return;
         } finally {
           if (proxy != null) {
-            ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+            cmProxy.mayBeCloseProxy(proxy);
           }
         }
         this.state = ContainerState.DONE;
@@ -263,15 +259,12 @@ public class ContainerLauncherImpl extends AbstractService implements
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
-    this.rpc = createYarnRPC(conf);
   }
   
-  protected YarnRPC createYarnRPC(Configuration conf) {
-    return YarnRPC.create(conf);
-  }
-
   @Override
   public void serviceStart() {
+    cmProxy =
+        new ContainerManagementProtocolProxy(getConfig(), context.getNMTokens());
 
     ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
         "ContainerLauncher #%d").setDaemon(true).build();
@@ -352,32 +345,12 @@ public class ContainerLauncherImpl extends AbstractService implements
     return new EventProcessor(event);
   }
 
-  protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
-      final String containerManagerBindAddr, Token containerToken)
-      throws IOException {
-
-    final InetSocketAddress cmAddr =
-        NetUtils.createSocketAddr(containerManagerBindAddr);
-
-    // the user in createRemoteUser in this context has to be ContainerID
-    UserGroupInformation user = UserGroupInformation
-        .createRemoteUser(containerID.toString());
-    org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token = ConverterUtils
-        .convertFromYarn(containerToken, cmAddr);
-    user.addToken(token);
-
-    ContainerManagementProtocol proxy = user
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
-                cmAddr, getConfig());
-          }
-        });
-    return proxy;
+  protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
+      ContainerId containerID, final String containerManagerBindAddr,
+      Token containerToken) throws IOException {
+    return cmProxy.getProxy(containerManagerBindAddr, containerID);
   }
 
-
   /**
    * Setup and start the container on remote nodemanager.
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a037d969/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index f8ae006..75aa434 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -368,6 +370,10 @@ public class TaskScheduler extends AbstractService
     appClient.onError(e);
   }
   
+  public ConcurrentMap<String, Token> getNMTokens() {
+    return amRmClient.getNMTokens();
+  }
+  
   public synchronized Resource getTotalResources() {
     return totalResources;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a037d969/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index cbae332..1bd088d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -76,7 +78,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   // Has a signal (SIGTERM etc) been issued?
   protected volatile boolean isSignalled = false;
   final DAGClientServer clientService;
-
+  
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
 
@@ -96,7 +98,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     this.isSignalled = isSignalled;
     LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
   }
-  
+    
   public Resource getAvailableResources() {
     return taskScheduler.getAvailableResources();
   }
@@ -327,6 +329,7 @@ public class TaskSchedulerEventHandler extends AbstractService
 
     dagAppMaster = appContext.getAppMaster();
     taskScheduler.start();
+    appContext.setNMTokens(taskScheduler.getNMTokens());
     this.eventHandlingThread = new Thread() {
       @Override
       public void run() {


Mime
View raw message