hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1494369 - in /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client: hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/...
Date Tue, 18 Jun 2013 23:19:51 GMT
Author: vinodkv
Date: Tue Jun 18 23:19:49 2013
New Revision: 1494369

URL: http://svn.apache.org/r1494369
Log:
YARN-694. Starting to use NMTokens to authenticate all communication with NodeManagers. Contributed
by Omkar Vinit Joshi.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
Tue Jun 18 23:19:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
@@ -61,4 +62,6 @@ public interface AppContext {
   Set<String> getBlacklistedNodes();
   
   ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
+
+  Map<String, Token> getNMTokens();
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
Tue Jun 18 23:19:49 2013
@@ -886,6 +886,8 @@ public class MRAppMaster extends Composi
     private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
+    private final ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token>
nmTokens =
+        new ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token>();
 
     public RunningAppContext(Configuration config) {
       this.conf = config;
@@ -952,6 +954,11 @@ public class MRAppMaster extends Composi
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return clientToAMTokenSecretManager;
     }
+    
+    @Override
+    public Map<String, org.apache.hadoop.yarn.api.records.Token> getNMTokens() {
+      return this.nmTokens;
+    }
   }
 
   @SuppressWarnings("unchecked")

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
Tue Jun 18 23:19:49 2013
@@ -19,9 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -35,7 +33,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -55,6 +52,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -73,22 +72,22 @@ public class ContainerLauncherImpl exten
 
   private ConcurrentHashMap<ContainerId, Container> containers = 
     new ConcurrentHashMap<ContainerId, Container>(); 
-  private AppContext context;
+  private final AppContext context;
   protected ThreadPoolExecutor launcherPool;
   protected static final int INITIAL_POOL_SIZE = 10;
   private int limitOnPoolSize;
   private Thread eventHandlingThread;
   protected BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
-  YarnRPC rpc;
   private final AtomicBoolean stopped;
+  private ContainerManagementProtocolProxy cmProxy;
 
   private Container getContainer(ContainerLauncherEvent event) {
     ContainerId id = event.getContainerID();
     Container c = containers.get(id);
     if(c == null) {
       c = new Container(event.getTaskAttemptID(), event.getContainerID(),
-          event.getContainerMgrAddress(), event.getContainerToken());
+          event.getContainerMgrAddress());
       Container old = containers.putIfAbsent(id, c);
       if(old != null) {
         c = old;
@@ -114,16 +113,13 @@ public class ContainerLauncherImpl exten
     private TaskAttemptId taskAttemptID;
     private ContainerId containerID;
     final private String containerMgrAddress;
-    private org.apache.hadoop.yarn.api.records.Token containerToken;
     
     public Container(TaskAttemptId taId, ContainerId containerID,
-        String containerMgrAddress,
-        org.apache.hadoop.yarn.api.records.Token containerToken) {
+        String containerMgrAddress) {
       this.state = ContainerState.PREP;
       this.taskAttemptID = taId;
       this.containerMgrAddress = containerMgrAddress;
       this.containerID = containerID;
-      this.containerToken = containerToken;
     }
     
     public synchronized boolean isCompletelyDone() {
@@ -140,11 +136,10 @@ public class ContainerLauncherImpl exten
         return;
       }
       
-      ContainerManagementProtocol proxy = null;
+      ContainerManagementProtocolProxyData proxy = null;
       try {
 
-        proxy = getCMProxy(containerID, containerMgrAddress,
-            containerToken);
+        proxy = getCMProxy(containerMgrAddress, containerID);
 
         // Construct the actual Container
         ContainerLaunchContext containerLaunchContext =
@@ -155,7 +150,8 @@ public class ContainerLauncherImpl exten
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerLaunchContext(containerLaunchContext);
         startRequest.setContainerToken(event.getContainerToken());
-        StartContainerResponse response = proxy.startContainer(startRequest);
+        StartContainerResponse response =
+            proxy.getContainerManagementProtocol().startContainer(startRequest);
 
         ByteBuffer portInfo =
             response.getAllServicesMetaData().get(
@@ -185,7 +181,7 @@ public class ContainerLauncherImpl exten
         sendContainerLaunchFailedMsg(taskAttemptID, message);
       } finally {
         if (proxy != null) {
-          ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+          cmProxy.mayBeCloseProxy(proxy);
         }
       }
     }
@@ -198,29 +194,30 @@ public class ContainerLauncherImpl exten
       } else if (!isCompletelyDone()) {
         LOG.info("KILLING " + taskAttemptID);
 
-        ContainerManagementProtocol proxy = null;
+        ContainerManagementProtocolProxyData proxy = null;
         try {
-          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-              this.containerToken);
+          proxy = getCMProxy(this.containerMgrAddress, this.containerID);
 
-            // kill the remote container if already launched
-            StopContainerRequest stopRequest = Records
+          // kill the remote container if already launched
+          StopContainerRequest stopRequest = Records
               .newRecord(StopContainerRequest.class);
-            stopRequest.setContainerId(this.containerID);
-            proxy.stopContainer(stopRequest);
+          stopRequest.setContainerId(this.containerID);
+          proxy.getContainerManagementProtocol().stopContainer(stopRequest);
 
         } catch (Throwable t) {
 
           // ignore the cleanup failure
           String message = "cleanup failed for container "
-            + this.containerID + " : "
-            + StringUtils.stringifyException(t);
-          context.getEventHandler().handle(
-            new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
+              + this.containerID + " : "
+              + StringUtils.stringifyException(t);
+          context.getEventHandler()
+              .handle(
+                  new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID,
+                      message));
           LOG.warn(message);
         } finally {
           if (proxy != null) {
-            ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+            cmProxy.mayBeCloseProxy(proxy);
           }
         }
         this.state = ContainerState.DONE;
@@ -239,21 +236,14 @@ public class ContainerLauncherImpl exten
   }
 
   @Override
-  protected void serviceInit(Configuration config) throws Exception {
-    Configuration conf = new Configuration(config);
-    conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
-        0);
+  protected void serviceInit(Configuration conf) throws Exception {
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
-    this.rpc = createYarnRPC(conf);
     super.serviceInit(conf);
-  }
-  
-  protected YarnRPC createYarnRPC(Configuration conf) {
-    return YarnRPC.create(conf);
+    cmProxy =
+        new ContainerManagementProtocolProxy(conf, context.getNMTokens());
   }
 
   protected void serviceStart() throws Exception {
@@ -348,34 +338,6 @@ public class ContainerLauncherImpl exten
     return new EventProcessor(event);
   }
 
-  protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
-      final String containerManagerBindAddr,
-      org.apache.hadoop.yarn.api.records.Token containerToken)
-      throws IOException {
-
-    final InetSocketAddress cmAddr =
-        NetUtils.createSocketAddr(containerManagerBindAddr);
-
-    // the user in createRemoteUser in this context has to be ContainerID
-    UserGroupInformation user =
-        UserGroupInformation.createRemoteUser(containerID.toString());
-
-    Token<ContainerTokenIdentifier> token =
-        ConverterUtils.convertFromYarn(containerToken, cmAddr);
-    user.addToken(token);
-
-    ContainerManagementProtocol proxy = user
-        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-          @Override
-          public ContainerManagementProtocol run() {
-            return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
-                cmAddr, getConfig());
-          }
-        });
-    return proxy;
-  }
-
-
   /**
    * Setup and start the container on remote nodemanager.
    */
@@ -410,7 +372,7 @@ public class ContainerLauncherImpl exten
       removeContainerIfDone(containerID);
     }
   }
-
+  
   @SuppressWarnings("unchecked")
   void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID,
       String message) {
@@ -430,4 +392,9 @@ public class ContainerLauncherImpl exten
       throw new YarnRuntimeException(e);
     }
   }
+  
+  public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(
+      String containerMgrBindAddr, ContainerId containerId) throws IOException {
+    return cmProxy.getProxy(containerMgrBindAddr, containerId);
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Tue Jun 18 23:19:49 2013
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -584,6 +585,14 @@ public class RMContainerAllocator extend
     }
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory()
: 0;
     List<Container> newContainers = response.getAllocatedContainers();
+    // Setting NMTokens
+    if (response.getNMTokens() != null) {
+      for (NMToken nmToken : response.getNMTokens()) {
+        getContext().getNMTokens().put(nmToken.getNodeId().toString(),
+            nmToken.getToken());
+      }
+    }
+    
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
     if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom)
{
       //something changed

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
Tue Jun 18 23:19:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
@@ -130,4 +131,10 @@ public class MockAppContext implements A
     // Not implemented
     return null;
   }
+  
+  @Override
+  public Map<String, Token> getNMTokens() {
+    // Not Implemented
+    return null;
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
Tue Jun 18 23:19:49 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.junit.Test;
 
 /**
@@ -225,8 +226,8 @@ public class TestFail {
         }
 
         @Override
-        protected ContainerManagementProtocol getCMProxy(ContainerId contianerID,
-            String containerManagerBindAddr, Token containerToken)
+        public ContainerManagementProtocolProxyData getCMProxy(
+            String containerMgrBindAddr, ContainerId containerId)
             throws IOException {
           try {
             synchronized (this) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
Tue Jun 18 23:19:49 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -857,7 +858,13 @@ public class TestRuntimeEstimators {
       return null;
     }
     
+    @Override
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
+      return null;
+    }
+    
+    @Override
+    public Map<String, Token> getNMTokens() {
       // Not Implemented
       return null;
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
Tue Jun 18 23:19:49 2013
@@ -62,12 +62,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.junit.Test;
 
 public class TestContainerLauncher {
@@ -342,16 +345,26 @@ public class TestContainerLauncher {
     }
 
     @Override
-    protected ContainerLauncher createContainerLauncher(AppContext context) {
+    protected ContainerLauncher
+        createContainerLauncher(final AppContext context) {
       return new ContainerLauncherImpl(context) {
+
         @Override
-        protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
-            String containerManagerBindAddr, Token containerToken)
+        public ContainerManagementProtocolProxyData getCMProxy(
+            String containerMgrBindAddr, ContainerId containerId)
             throws IOException {
-          // make proxy connect to our local containerManager server
-          ContainerManagementProtocol proxy = (ContainerManagementProtocol) rpc.getProxy(
-              ContainerManagementProtocol.class,
-              NetUtils.getConnectAddress(server), conf);
+          Token dummyToken =
+              Token.newInstance("NMTokenIdentifier".getBytes(),
+                  NMTokenIdentifier.KIND.toString(), "password".getBytes(),
+                  "NMToken");
+          ContainerManagementProtocolProxy cmProxy =
+              new ContainerManagementProtocolProxy(conf, context.getNMTokens());
+          InetSocketAddress addr = NetUtils.getConnectAddress(server);
+          ContainerManagementProtocolProxyData proxy =
+              cmProxy.new ContainerManagementProtocolProxyData(
+                  YarnRPC.create(conf),
+                  addr.getHostName() + ":" + addr.getPort(), containerId,
+                  dummyToken);
           return proxy;
         }
       };

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
Tue Jun 18 23:19:49 2013
@@ -95,11 +95,6 @@ public class TestContainerLauncherImpl {
       this.rpc = rpc;
     }
     
-    @Override
-    protected YarnRPC createYarnRPC(Configuration conf) {
-      return rpc;
-    }
-    
     public void waitForPoolToIdle() throws InterruptedException {
       //I wish that we did not need the sleep, but it is here so that we are sure
       // That the other thread had time to insert the event into the queue and

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
Tue Jun 18 23:19:49 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.service.Service
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -315,4 +316,10 @@ public class JobHistory extends Abstract
     // Not implemented.
     return null;
   }
+  
+  @Override
+  public Map<String, Token> getNMTokens() {
+    // Not Implemented.
+    return null;
+  }
 }



Mime
View raw message