tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-456. Fix heartbeat response counting to depend on Container and not TaskAttempt (bikas)
Date Tue, 17 Sep 2013 01:46:01 GMT
Updated Branches:
  refs/heads/TEZ-398 c45e30e46 -> 62689dcc0


TEZ-456. Fix heartbeat response counting to depend on Container and not TaskAttempt (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/62689dcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/62689dcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/62689dcc

Branch: refs/heads/TEZ-398
Commit: 62689dcc01fd5f731739efec0534d9a4b76d1f4d
Parents: c45e30e
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Sep 16 18:44:07 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Sep 16 18:44:07 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   3 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 156 ++++++++++++-------
 .../engine/newapi/impl/TezHeartbeatRequest.java |  11 +-
 3 files changed, 112 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62689dcc/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index ab52a4e..12dbb51 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -178,7 +178,7 @@ public class YarnTezDagChild {
     eventsToSend.drainTo(events);
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
-        taskAttemptID, eventCounter, eventsRange);
+        containerIdStr, taskAttemptID, eventCounter, eventsRange);
     TezHeartbeatResponse response = umbilical.heartbeat(request);
     if (response.getLastRequestId() != reqId) {
       // TODO TODONEWTEZ
@@ -229,6 +229,7 @@ public class YarnTezDagChild {
     }
     // FIXME fix initialize metrics in child runner
     DefaultMetricsSystem.initialize("VertexTask");
+    YarnTezDagChild.containerIdStr = containerIdentifier;
 
     ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
     @SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62689dcc/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 655119f..39a3720 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -20,9 +20,7 @@ package org.apache.tez.dag.app;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -91,22 +89,24 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private InetSocketAddress address;
   private Server server;
 
-
-  class AttemptInfo {
-    AttemptInfo(ContainerId containerId) {
+  class ContainerInfo {
+    ContainerInfo(ContainerId containerId) {
       this.containerId = containerId;
       this.lastReponse = null;
       this.lastRequestId = -1;
+      this.currentAttemptId = null;
     }
     ContainerId containerId;
     long lastRequestId;
     TezHeartbeatResponse lastReponse;
+    TezTaskAttemptID currentAttemptId;
   }
-  private ConcurrentMap<TezTaskAttemptID, AttemptInfo> attemptToInfoMap =
-      new ConcurrentHashMap<TezTaskAttemptID, AttemptInfo>();
+  
+  private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToInfoMap =
+      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
 
-  private Set<ContainerId> registeredContainers = Collections
-      .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+  private ConcurrentHashMap<ContainerId, ContainerInfo> registeredContainers = 
+      new ConcurrentHashMap<ContainerId, ContainerInfo>();
 
   public TaskAttemptListenerImpTezDag(AppContext context,
       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
@@ -534,7 +534,22 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   @Override
   public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
-    attemptToInfoMap.remove(attemptId);
+    ContainerId containerId = attemptToInfoMap.get(attemptId);
+    if(containerId == null) {
+      LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
+      return;
+    }
+    ContainerInfo containerInfo = registeredContainers.get(containerId);
+    if(containerInfo == null) {
+      LOG.warn("Unregister task attempt: " + attemptId + 
+          " from non-registered container: " + containerId);
+      return;
+    }
+    synchronized (containerInfo) {
+      containerInfo.currentAttemptId = null;
+      attemptToInfoMap.remove(attemptId);
+    }
+      
   }
 
   public AMContainerTask pullTaskAttemptContext(ContainerId containerId) {
@@ -549,14 +564,36 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       LOG.debug("ContainerId: " + containerId
           + " registered with TaskAttemptListener");
     }
-    registeredContainers.add(containerId);
+    ContainerInfo oldInfo = registeredContainers.put(containerId,
+        new ContainerInfo(containerId));
+    if(oldInfo != null) {
+      throw new TezUncheckedException(
+          "Multiple registrations for containerId: " + containerId);
+    }
   }
 
   @Override
   public void registerTaskAttempt(TezTaskAttemptID attemptId,
       ContainerId containerId) {
-    AttemptInfo attemptInfo = new AttemptInfo(containerId);
-    attemptToInfoMap.put(attemptId, attemptInfo);
+    ContainerInfo containerInfo = registeredContainers.get(containerId);
+    if(containerInfo == null) {
+      throw new TezUncheckedException("Registering task attempt: "
+          + attemptId + " to unknown container: " + containerId);
+    }
+    synchronized (containerInfo) {
+      if(containerInfo.currentAttemptId != null) {
+        throw new TezUncheckedException("Registering task attempt: "
+            + attemptId + " to container: " + containerId
+            + " with existing assignment to: " + containerInfo.currentAttemptId);
+      }
+      containerInfo.currentAttemptId = attemptId;
+      ContainerId containerIdFromMap = attemptToInfoMap.put(attemptId, containerId);
+      if(containerIdFromMap != null) {
+        throw new TezUncheckedException("Registering task attempt: "
+            + attemptId + " to container: " + containerId
+            + " when already assigned to: " + containerIdFromMap);
+      }
+    }
   }
 
   @Override
@@ -573,7 +610,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
-    ContainerId containerId = attemptToInfoMap.get(taskAttemptId).containerId;
+    ContainerId containerId = attemptToInfoMap.get(taskAttemptId);
     if (containerId != null) {
       containerHeartbeatHandler.pinged(containerId);
     } else {
@@ -585,54 +622,61 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   @Override
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException {
+    ContainerId containerId = ConverterUtils.toContainerId(request
+        .getContainerIdentifier());
     long requestId = request.getRequestId();
-    LOG.info("Received request id " + requestId + " from child JVM");
-    TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
-    if (taskAttemptID == null) {
-      TezHeartbeatResponse response = new TezHeartbeatResponse();
-      response.setLastRequestId(requestId);
-      return response;
-    }
-    AttemptInfo attemptInfo = attemptToInfoMap.get(taskAttemptID);
-    if(attemptInfo == null) {
-      throw new TezException("Attempt " + taskAttemptID
+    LOG.info("Received request id " + requestId + 
+        " from child JVM : " + containerId.toString());
+    ContainerInfo containerInfo = registeredContainers.get(containerId);
+    if(containerInfo == null) {
+      throw new TezException("Container " + containerId.toString()
           + " is not recognized for heartbeat");
     }
-    synchronized (attemptInfo) {
-      if(attemptInfo.lastRequestId == requestId) {
-        LOG.warn("Old sequenceId received: " + requestId + ", Re-sending last response to
client");
-        return attemptInfo.lastReponse;
-      }
-      if(attemptInfo.lastRequestId != -1
-         && attemptInfo.lastRequestId+1 < requestId) {
-        // TODO NEWTEZ fix checking of last req id to ensure heartbeat works
-        // across attempts with container reuse
-        throw new TezException("Attempt " + taskAttemptID
-            + " has invalid request id. Expected: "
-            + attemptInfo.lastRequestId+1
-            + " and actual: " + requestId);
+    
+    synchronized (containerInfo) {
+      pingContainerHeartbeatHandler(containerId);
+
+      if(containerInfo.lastRequestId == requestId) {
+        LOG.warn("Old sequenceId received: " + requestId
+            + ", Re-sending last response to client");
+        return containerInfo.lastReponse;
       }
-      
-      // not safe to multiple call from same task
-      LOG.info("Ping from " + taskAttemptID.toString());
-      List<TezEvent> inEvents = request.getEvents();
-      if(inEvents!=null && inEvents.size()>0) {    
-        TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
-        context.getEventHandler().handle(new VertexEventRouteEvent(vertexId, inEvents));
-      }
-      taskHeartbeatHandler.pinged(taskAttemptID);
-      pingContainerHeartbeatHandler(taskAttemptID);
+
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
-      List<TezEvent> outEvents = context
-          .getCurrentDAG()
-          .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTask(taskAttemptID.getTaskID())
-          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
-              request.getMaxEvents());
-      response.setEvents(outEvents);
-      attemptInfo.lastRequestId = requestId;
-      attemptInfo.lastReponse = response;
+
+      TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
+      if (taskAttemptID != null) {        
+        ContainerId containerIdFromMap = attemptToInfoMap.get(taskAttemptID);
+        if(containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
+          throw new TezException("Attempt " + taskAttemptID
+            + " is not recognized for heartbeat");
+        }
+      
+        if(containerInfo.lastRequestId+1 != requestId) {
+          throw new TezException("Container " + containerId
+              + " has invalid request id. Expected: "
+              + containerInfo.lastRequestId+1
+              + " and actual: " + requestId);
+        }
+        
+        LOG.info("Ping from " + taskAttemptID.toString());
+        List<TezEvent> inEvents = request.getEvents();
+        if(inEvents!=null && inEvents.size()>0) {    
+          TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
+          context.getEventHandler().handle(new VertexEventRouteEvent(vertexId, inEvents));
+        }
+        taskHeartbeatHandler.pinged(taskAttemptID);
+        List<TezEvent> outEvents = context
+            .getCurrentDAG()
+            .getVertex(taskAttemptID.getTaskID().getVertexID())
+            .getTask(taskAttemptID.getTaskID())
+            .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+                request.getMaxEvents());
+        response.setEvents(outEvents);        
+      }
+      containerInfo.lastRequestId = requestId;
+      containerInfo.lastReponse = response;
       return response;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62689dcc/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
index d21de95..462423b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -25,12 +25,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 
 public class TezHeartbeatRequest implements Writable {
 
+  private String containerIdentifier;
   private List<TezEvent> events;
   private TezTaskAttemptID currentTaskAttemptID;
   private int startIndex;
@@ -41,8 +43,9 @@ public class TezHeartbeatRequest implements Writable {
   }
 
   public TezHeartbeatRequest(long requestId, List<TezEvent> events,
-      TezTaskAttemptID taskAttemptID,
+      String containerIdentifier, TezTaskAttemptID taskAttemptID,
       int startIndex, int maxEvents) {
+    this.containerIdentifier = containerIdentifier;
     this.requestId = requestId;
     this.events = Collections.unmodifiableList(events);
     this.startIndex = startIndex;
@@ -50,6 +53,10 @@ public class TezHeartbeatRequest implements Writable {
     this.currentTaskAttemptID = taskAttemptID;
   }
 
+  public String getContainerIdentifier() {
+    return containerIdentifier;
+  }
+  
   public List<TezEvent> getEvents() {
     return events;
   }
@@ -90,6 +97,7 @@ public class TezHeartbeatRequest implements Writable {
     out.writeInt(startIndex);
     out.writeInt(maxEvents);
     out.writeLong(requestId);
+    Text.writeString(out, containerIdentifier);
   }
 
   @Override
@@ -112,6 +120,7 @@ public class TezHeartbeatRequest implements Writable {
     startIndex = in.readInt();
     maxEvents = in.readInt();
     requestId = in.readLong();
+    containerIdentifier = Text.readString(in);
   }
 
 }


Mime
View raw message