tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-319. Fix TEZ due to changes brought in by YARN-926. (hitesh)
Date Wed, 24 Jul 2013 18:56:57 GMT
Updated Branches:
  refs/heads/master 699e168e4 -> e563d0e54


TEZ-319. Fix TEZ due to changes brought in by YARN-926. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e563d0e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e563d0e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e563d0e5

Branch: refs/heads/master
Commit: e563d0e54a6f29637f941a1646a60b5f8abac800
Parents: 699e168
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Jul 24 11:56:35 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Jul 24 11:56:35 2013 -0700

----------------------------------------------------------------------
 .../dag/app/launcher/ContainerLauncherImpl.java | 54 ++++++++++++--------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e563d0e5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 73f6a48..28886b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.launcher;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -35,10 +36,12 @@ import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
@@ -74,8 +77,8 @@ public class ContainerLauncherImpl extends AbstractService implements
   // TODO XXX Ensure the same thread is used to launch / stop the same container. Or - ensure
event ordering.
   static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
 
-  private ConcurrentHashMap<ContainerId, Container> containers = 
-    new ConcurrentHashMap<ContainerId, Container>(); 
+  private ConcurrentHashMap<ContainerId, Container> containers =
+    new ConcurrentHashMap<ContainerId, Container>();
   private AppContext context;
   protected ThreadPoolExecutor launcherPool;
   protected static final int INITIAL_POOL_SIZE = 10;
@@ -100,14 +103,14 @@ public class ContainerLauncherImpl extends AbstractService implements
     }
     return c;
   }
-  
+
   private void removeContainerIfDone(ContainerId id) {
     Container c = containers.get(id);
     if(c != null && c.isCompletelyDone()) {
       containers.remove(id);
     }
   }
-  
+
   private static enum ContainerState {
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }
@@ -118,7 +121,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     private ContainerId containerID;
     final private String containerMgrAddress;
     private Token containerToken;
-    
+
     public Container(ContainerId containerID,
         String containerMgrAddress, Token containerToken) {
       this.state = ContainerState.PREP;
@@ -126,21 +129,21 @@ public class ContainerLauncherImpl extends AbstractService implements
       this.containerID = containerID;
       this.containerToken = containerToken;
     }
-    
+
     public synchronized boolean isCompletelyDone() {
       return state == ContainerState.DONE || state == ContainerState.FAILED;
     }
-    
+
     @SuppressWarnings("unchecked")
     public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
       LOG.info("Launching Container with Id: " + event.getContainerId());
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
-        sendContainerLaunchFailedMsg(event.getContainerId(), 
+        sendContainerLaunchFailedMsg(event.getContainerId(),
             "Container was killed before it was launched");
         return;
       }
-      
+
       ContainerManagementProtocolProxyData proxy = null;
       try {
 
@@ -156,8 +159,15 @@ public class ContainerLauncherImpl extends AbstractService implements
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerToken(event.getContainerToken());
         startRequest.setContainerLaunchContext(containerLaunchContext);
-        StartContainerResponse response = 
-            proxy.getContainerManagementProtocol().startContainer(startRequest);
+
+        StartContainersResponse response =
+            proxy.getContainerManagementProtocol().startContainers(
+                StartContainersRequest.newInstance(
+                    Collections.singletonList(startRequest)));
+        if (response.getFailedRequests() != null
+            && !response.getFailedRequests().isEmpty()) {
+          throw response.getFailedRequests().get(containerID).deSerialize();
+        }
 
         ByteBuffer portInfo = response.getAllServicesMetaData().get(
             ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
@@ -182,7 +192,7 @@ public class ContainerLauncherImpl extends AbstractService implements
             containerID, clock.getTime());
         context.getEventHandler().handle(new DAGHistoryEvent(
             context.getDAGID(), lEvt));
-        
+
         this.state = ContainerState.RUNNING;
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
@@ -195,11 +205,11 @@ public class ContainerLauncherImpl extends AbstractService implements
         }
       }
     }
-    
+
     @SuppressWarnings("unchecked")
     public synchronized void kill() {
 
-      if(isCompletelyDone()) { 
+      if(isCompletelyDone()) {
         return;
       }
       if(this.state == ContainerState.PREP) {
@@ -214,10 +224,12 @@ public class ContainerLauncherImpl extends AbstractService implements
               this.containerToken);
 
             // kill the remote container if already launched
-            StopContainerRequest stopRequest = Records
-              .newRecord(StopContainerRequest.class);
-            stopRequest.setContainerId(this.containerID);
-            proxy.getContainerManagementProtocol().stopContainer(stopRequest);
+            StopContainersRequest stopRequest = Records
+              .newRecord(StopContainersRequest.class);
+            stopRequest.setContainerIds(Collections.singletonList(containerID));
+
+            proxy.getContainerManagementProtocol().stopContainers(stopRequest);
+
             // If stopContainer returns without an error, assuming the stop made
             // it over to the NodeManager.
           context.getEventHandler().handle(
@@ -260,7 +272,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
   }
-  
+
   @Override
   public void serviceStart() {
     cmProxy =


Mime
View raw message