tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-463. Fixes for events in Child JVM. (hitesh)
Date Tue, 17 Sep 2013 23:50:40 GMT
Updated Branches:
  refs/heads/TEZ-398 9cad5f841 -> c2985e2e5


TEZ-463. Fixes for events in Child JVM. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c2985e2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c2985e2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c2985e2e

Branch: refs/heads/TEZ-398
Commit: c2985e2e5c4cb6920ca2a06a0a400673bb3aaa74
Parents: 9cad5f8
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Sep 17 16:50:18 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Sep 17 16:50:18 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |   2 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  46 +++++---
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 117 ++-----------------
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  33 +++++-
 .../tez/common/TezTaskUmbilicalProtocol.java    |  21 ----
 .../shuffle/impl/ShuffleInputEventHandler.java  |   3 +-
 .../tez/engine/newapi/impl/EventMetaData.java   |   8 ++
 .../LogicalIOProcessorRuntimeTask.java          |  11 +-
 .../tez/engine/newruntime/RuntimeTask.java      |  12 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |  85 +-------------
 .../apache/tez/mapreduce/processor/MRTask.java  |   4 +
 .../processor/TezTaskReporterImpl.java          |  13 ++-
 .../tez/mapreduce/task/MRRuntimeTask.java       |   2 +
 .../tez/mapreduce/TestUmbilicalProtocol.java    |  56 ---------
 15 files changed, 115 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index e40f4f5..7447974 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -166,7 +166,7 @@ public class TezConfiguration extends Configuration {
 
   public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
       + "am.heartbeat.interval-ms.max";
-  public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+  public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
 
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat.max";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 39c3e4c..235d6d0 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -27,7 +27,9 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -152,7 +154,13 @@ public class YarnTezDagChild {
     return heartbeatThread;
   }
 
-  private static void heartbeat() throws TezException, IOException {
+  private static synchronized void heartbeat() throws TezException, IOException {
+    heartbeat(null);
+  }
+
+  private static synchronized void heartbeat(
+      Collection<TezEvent> outOfBandEvents)
+      throws TezException, IOException {
     TezEvent updateEvent = null;
     int eventCounter = 0;
     int eventsRange = 0;
@@ -163,10 +171,12 @@ public class YarnTezDagChild {
         taskAttemptID = currentTaskAttemptID;
         eventCounter = currentTask.getEventCounter();
         eventsRange = maxEventsToGet;
-        updateEvent = new TezEvent(new TaskStatusUpdateEvent(
-            currentTask.getCounters(), currentTask.getProgress()),
-              new EventMetaData(EventProducerConsumerType.SYSTEM,
-                  currentTask.getVertexName(), "", taskAttemptID));
+        if (!currentTask.isTaskDone() && !currentTask.hadFatalError()) {
+          updateEvent = new TezEvent(new TaskStatusUpdateEvent(
+              currentTask.getCounters(), currentTask.getProgress()),
+                new EventMetaData(EventProducerConsumerType.SYSTEM,
+                    currentTask.getVertexName(), "", taskAttemptID));
+        }
       }
     } finally {
       taskLock.readLock().unlock();
@@ -176,6 +186,9 @@ public class YarnTezDagChild {
       events.add(updateEvent);
     }
     eventsToSend.drainTo(events);
+    if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
+      events.addAll(outOfBandEvents);
+    }
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
         containerIdStr, taskAttemptID, eventCounter, eventsRange);
@@ -286,12 +299,12 @@ public class YarnTezDagChild {
             new TezEvent(new TaskAttemptFailedEvent(diagnostics),
                 sourceInfo);
         try {
-          umbilical.taskAttemptFailed(taskAttemptID, taskAttemptFailedEvent);
-        } catch (IOException e) {
+          heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+        } catch (Throwable t) {
           LOG.fatal("Failed to communicate task attempt failure to AM via"
-              + " umbilical", e);
+              + " umbilical", t);
           heartbeatError.set(true);
-          heartbeatErrorException = e;
+          heartbeatErrorException = t;
         }
       }
 
@@ -391,10 +404,15 @@ public class YarnTezDagChild {
             // TODONEWTEZ check if task had a fatal error before
             // sending completed event
             if (!currentTask.hadFatalError()) {
+              TezEvent statusUpdateEvent =
+                  new TezEvent(new TaskStatusUpdateEvent(
+                      currentTask.getCounters(), currentTask.getProgress()),
+                      new EventMetaData(EventProducerConsumerType.SYSTEM,
+                          currentTask.getVertexName(), "",
+                          currentTask.getTaskAttemptID()));
               TezEvent taskCompletedEvent =
                   new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
-              umbilical.taskAttemptCompleted(currentTaskAttemptID,
-                  taskCompletedEvent);
+              heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
             }
             try {
               taskLock.writeLock().lock();
@@ -419,11 +437,12 @@ public class YarnTezDagChild {
       }
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
+      // TODO NEWTEZ this should be a container failed event?
       TezEvent taskAttemptFailedEvent =
           new TezEvent(new TaskAttemptFailedEvent(
               StringUtils.stringifyException(e)),
               currentSourceInfo);
-      umbilical.taskAttemptFailed(currentTaskAttemptID, taskAttemptFailedEvent);
+      heartbeat(Collections.singletonList(taskAttemptFailedEvent));
     } catch (Throwable throwable) {
       String cause = StringUtils.stringifyException(throwable);
       LOG.fatal("Error running child : " + cause);
@@ -431,8 +450,7 @@ public class YarnTezDagChild {
         TezEvent taskAttemptFailedEvent =
             new TezEvent(new TaskAttemptFailedEvent(cause),
                 currentSourceInfo);
-        umbilical.taskAttemptFailed(currentTaskAttemptID,
-            taskAttemptFailedEvent);
+        heartbeat(Collections.singletonList(taskAttemptFailedEvent));
       }
     } finally {
       stopped.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d4cfce6..419682f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -19,7 +19,6 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -46,13 +45,10 @@ import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerImpl;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -60,7 +56,6 @@ import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -260,6 +255,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     return task;
   }
 
+  /*
   @Override
   public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
       TezTaskStatus taskStatus) throws IOException, InterruptedException {
@@ -348,46 +344,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             taskAttemptStatus));
     return true;
   }
-
-  @Override
-  public void reportDiagnosticInfo(TezTaskAttemptID taskAttemptId, String trace)
-      throws IOException {
-    LOG.info("Diagnostics report from " + taskAttemptId.toString() + ": "
-        + trace);
-
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-
-    // This is mainly used for cases where we want to propagate exception traces
-    // of tasks that fail.
-
-    // This call exists as a hadoop mapreduce legacy wherein all changes in
-    // counters/progress/phase/output-size are reported through statusUpdate()
-    // call but not diagnosticInformation.
-    context.getEventHandler().handle(
-        new TaskAttemptEventDiagnosticsUpdate(taskAttemptId, trace));
-
-  }
-
-  @Override
-  public boolean ping(TezTaskAttemptID taskAttemptId) throws IOException {
-    LOG.info("Ping from " + taskAttemptId.toString());
-    taskHeartbeatHandler.pinged(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-    return true;
-  }
-
-  @Override
-  public void done(TezTaskAttemptID taskAttemptId) throws IOException {
-    LOG.info("Done acknowledgement from " + taskAttemptId.toString());
-
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
-
-  }
+  */
 
   /**
    * TaskAttempt is reporting that it is in commit_pending and it is waiting for
@@ -444,34 +401,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void shuffleError(TezTaskAttemptID taskId, String message)
-      throws IOException {
-    // TODO: This isn't really used in any MR code. Ask for removal.
-  }
-
-  @Override
-  public void fsError(TezTaskAttemptID taskAttemptId, String message)
-      throws IOException {
-    // This happens only in Child.
-    LOG.fatal("Task: " + taskAttemptId + " - failed due to FSError: " + message);
-    reportDiagnosticInfo(taskAttemptId, "FSError: " + message);
-
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
-  }
-
-  @Override
-  public void fatalError(TezTaskAttemptID taskAttemptId, String message)
-      throws IOException {
-    // This happens only in Child and in the Task.
-    LOG.fatal("Task: " + taskAttemptId + " - exited : " + message);
-    reportDiagnosticInfo(taskAttemptId, "Error: " + message);
-
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
-  }
-
-  @Override
   public void outputReady(TezTaskAttemptID taskAttemptId,
       OutputContext outputContext) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -624,14 +553,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     ContainerId containerId = ConverterUtils.toContainerId(request
         .getContainerIdentifier());
     long requestId = request.getRequestId();
-    LOG.info("Received request id " + requestId + 
-        " from child JVM : " + containerId.toString());
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     if(containerInfo == null) {
       throw new TezException("Container " + containerId.toString()
           + " is not recognized for heartbeat");
     }
-    
+
     synchronized (containerInfo) {
       pingContainerHeartbeatHandler(containerId);
 
@@ -658,13 +585,14 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               + containerInfo.lastRequestId+1
               + " and actual: " + requestId);
         }
-        
+
         List<TezEvent> inEvents = request.getEvents();
         LOG.info("Ping from " + taskAttemptID.toString() +
-            " events: " + (inEvents!=null? inEvents.size():0));
-        if(inEvents!=null && inEvents.size()>0) {    
+            " events: " + (inEvents != null? inEvents.size() : -1));
+        if(inEvents!=null && !inEvents.isEmpty()) {
           TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
-          context.getEventHandler().handle(new VertexEventRouteEvent(vertexId, inEvents));
+          context.getEventHandler().handle(
+              new VertexEventRouteEvent(vertexId, inEvents));
         }
         taskHeartbeatHandler.pinged(taskAttemptID);
         List<TezEvent> outEvents = context
@@ -681,33 +609,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  @Override
-  public void taskAttemptFailed(TezTaskAttemptID taskAttemptId,
-      TezEvent tezAttemptFailedEvent) throws IOException {
-    TaskAttemptFailedEvent taskFailedEvent =
-        (TaskAttemptFailedEvent) tezAttemptFailedEvent.getEvent();
-    LOG.fatal("Task Attempt: " + taskAttemptId + " - failed : "
-        + taskFailedEvent.getDiagnostics());
-    reportDiagnosticInfo(taskAttemptId, "Error: "
-        + taskFailedEvent.getDiagnostics());
-
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
-
-  }
-
-  @Override
-  public void taskAttemptCompleted(TezTaskAttemptID taskAttemptId,
-      TezEvent taskAttemptCompletedEvent) throws IOException {
-    LOG.info("Task attempt completed acknowledgement from "
-      + taskAttemptId.toString());
-
-    taskHeartbeatHandler.progressing(taskAttemptId);
-    pingContainerHeartbeatHandler(taskAttemptId);
-
-    context.getEventHandler().handle(
-        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 5a8d4f6..6abaeec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -133,7 +133,7 @@ public class Edge {
   }
   
   public void sendTezEventToSourceTasks(TezEvent tezEvent) {
-    if (bufferEvents.get()) {
+    if (!bufferEvents.get()) {
       switch (tezEvent.getEventType()) {
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent event = (InputReadErrorEvent) tezEvent.getEvent();
@@ -154,7 +154,7 @@ public class Edge {
   }
   
   public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
-    if (bufferEvents.get()) {
+    if (!bufferEvents.get()) {
       List<Integer> destTaskIndices = new ArrayList<Integer>();
       switch (tezEvent.getEventType()) {
       case DATA_MOVEMENT_EVENT:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 12a677f..1cb8831 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -107,6 +108,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;
 import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.newapi.impl.EventMetaData;
 import org.apache.tez.engine.newapi.impl.InputSpec;
@@ -1422,10 +1424,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   
   private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) {
     if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) {
-      throw new TezUncheckedException(
-          "Bad routing of event. Event-vertex: "
-              + sourceMeta.getTaskVertexName() + " Expected: "
-              + vertex.getName());
+      throw new TezUncheckedException("Bad routing of event"
+          + ", Event-vertex=" + sourceMeta.getTaskVertexName()
+          + ", Expected=" + vertex.getName());
     }
   }
 
@@ -1468,12 +1469,34 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case TASK_STATUS_UPDATE_EVENT:
           {
-            TaskStatusUpdateEvent sEvent = (TaskStatusUpdateEvent) tezEvent.getEvent();
+            TaskStatusUpdateEvent sEvent =
+                (TaskStatusUpdateEvent) tezEvent.getEvent();
             vertex.getEventHandler().handle(
                 new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
                     sEvent));
           }
           break;
+        case TASK_ATTEMPT_COMPLETED_EVENT:
+          {
+            vertex.getEventHandler().handle(
+                new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
+                    TaskAttemptEventType.TA_DONE));
+          }
+          break;
+        case TASK_ATTEMPT_FAILED_EVENT:
+          {
+            TaskAttemptFailedEvent taskFailedEvent =
+                (TaskAttemptFailedEvent) tezEvent.getEvent();
+            // TODO NEWTEZ combine these 2 events
+            vertex.getEventHandler().handle(
+                new TaskAttemptEventDiagnosticsUpdate(
+                    sourceMeta.getTaskAttemptID(),
+                    "Error: " + taskFailedEvent.getDiagnostics()));
+            vertex.getEventHandler().handle(
+                new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
+                    TaskAttemptEventType.TA_FAILED));
+          }
+          break;
         default:
           throw new TezUncheckedException("Unhandled tez event type: "
               + tezEvent.getEventType());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index c1289e6..bf95b37 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -26,7 +26,6 @@ import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
@@ -43,26 +42,11 @@ public interface TezTaskUmbilicalProtocol extends Master {
 
   ContainerTask getTask(ContainerContext containerContext) throws IOException;
 
-  boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
-  throws IOException, InterruptedException;
-
-  void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
-
-  boolean ping(TezTaskAttemptID taskid) throws IOException;
-
-  void done(TezTaskAttemptID taskid) throws IOException;
-
   void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
   throws IOException, InterruptedException;
 
   boolean canCommit(TezTaskAttemptID taskid) throws IOException;
 
-  void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
-
-  void fsError(TezTaskAttemptID taskId, String message) throws IOException;
-
-  void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
-
   // TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
   // call.
   // IAC outputReady followed by commit is a little confusing - since the output
@@ -87,9 +71,4 @@ public interface TezTaskUmbilicalProtocol extends Master {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void taskAttemptFailed(TezTaskAttemptID attemptID,
-      TezEvent taskFailedEvent) throws IOException;
-
-  public void taskAttemptCompleted(TezTaskAttemptID attemptID,
-      TezEvent taskAttemptCompletedEvent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
index 012103f..b31d36c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -84,7 +84,8 @@ public class ShuffleInputEventHandler {
   }
 
   private void processDataMovementEvent(DataMovementEvent dmEvent) {
-    Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a
DataMovementEvent is processed");
+    // FIXME TODO NEWTEZ
+    // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before
a DataMovementEvent is processed");
     DataMovementEventPayloadProto shufflePayload;
     try {
       shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
index 7d81449..9faafc5 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -141,4 +141,12 @@ public class EventMetaData implements Writable {
     this.index = index;
   }
 
+  @Override
+  public String toString() {
+    return "{ producerConsumerType=" + producerConsumerType
+        + ", taskVertexName=" + taskVertexName
+        + ", edgeVertexName=" + edgeVertexName
+        + ", taskAttemptId=" + taskAttemptID
+        + ", index=" + index + " }";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index a73a261..a76c564 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -84,7 +83,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private Map<String, LogicalInput> inputMap;
   private Map<String, LogicalOutput> outputMap;
 
-  private AtomicBoolean stopped;
   private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
   private Thread eventRouterThread = null;
 
@@ -104,7 +102,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
         ShuffleUtils.convertJobTokenToBytes(jobToken));
-    this.stopped = new AtomicBoolean(false);
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();    
     this.state = State.NEW;
   }
@@ -174,7 +171,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             destVertexName, taskSpec.getTaskAttemptID());
       }
     } finally {
-      stopped.set(true);
+      setTaskDone();
       if (eventRouterThread != null) {
         eventRouterThread.interrupt();
       }
@@ -361,7 +358,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private void startRouterThread() {
     eventRouterThread = new Thread(new Runnable() {
       public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+        while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
           try {
             TezEvent e = eventsToBeProcessed.take();
             if (e == null) {
@@ -374,7 +371,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
               break;
             }
           } catch (InterruptedException e) {
-            if (!stopped.get()) {
+            if (!isTaskDone()) {
               LOG.warn("Event Router thread interrupted. Returning.");
             }
             return;
@@ -389,7 +386,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public synchronized void cleanup() {
-    stopped.set(true);
+    setTaskDone();
     if (eventRouterThread != null) {
       eventRouterThread.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index 8b9327e..ee6cde8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -40,6 +40,7 @@ public abstract class RuntimeTask {
   protected final Configuration tezConf;
   protected final TezUmbilical tezUmbilical;
   protected final AtomicInteger eventCounter;
+  private final AtomicBoolean taskDone;
 
   protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
       TezUmbilical tezUmbilical) {
@@ -49,6 +50,7 @@ public abstract class RuntimeTask {
     this.tezCounters = new TezCounters();
     this.eventCounter = new AtomicInteger(0);
     this.progress = 0.0f;
+    this.taskDone = new AtomicBoolean(false);
   }
 
   protected enum State {
@@ -60,7 +62,7 @@ public abstract class RuntimeTask {
   public String getVertexName() {
     return taskSpec.getVertexName();
   }
-  
+
   public void setFatalError(Throwable t, String message) {
     hasFatalError.set(true);
     this.fatalError = t;
@@ -93,4 +95,12 @@ public abstract class RuntimeTask {
     return eventCounter.get();
   }
 
+  public boolean isTaskDone() {
+    return taskDone.get();
+  }
+
+  protected void setTaskDone() {
+    taskDone.set(true);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 06da4e6..9286d82 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -557,31 +557,6 @@ public class LocalJobRunnerTez implements ClientProtocol {
       return null;
     }
 
-    @Override
-    public synchronized boolean statusUpdate(TezTaskAttemptID taskId,
-        TezTaskStatus taskStatus) throws IOException, InterruptedException {
-      LOG.info(taskStatus.getStateString());
-      int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) {                       // mapping
-        float numTasks = (float) this.numMapTasks;
-
-        partialMapProgress[taskIndex] = taskStatus.getProgress();
-        mapCounters[taskIndex] = taskStatus.getCounters();
-
-        float partialProgress = 0.0f;
-        for (float f : partialMapProgress) {
-          partialProgress += f;
-        }
-        status.setMapProgress(partialProgress / numTasks);
-      } else {
-        reduceCounters = taskStatus.getCounters();
-        status.setReduceProgress(taskStatus.getProgress());
-      }
-
-      // ignore phase
-      return true;
-    }
-
     /** Return the current values of the counters for this job,
      * including tasks that are in progress.
      */
@@ -599,61 +574,12 @@ public class LocalJobRunnerTez implements ClientProtocol {
       return current;
     }
 
-    /**
-     * Task is reporting that it is in commit_pending
-     * and it is waiting for the commit Response
-     */
-    @Override
-    public void commitPending(TezTaskAttemptID taskid,
-                              TezTaskStatus taskStatus)
-    throws IOException, InterruptedException {
-      statusUpdate(taskid, taskStatus);
-    }
-
-    @Override
-    public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) {
-      // Ignore for now
-    }
-
-    @Override
-    public boolean ping(TezTaskAttemptID taskid) throws IOException {
-      return true;
-    }
-
     @Override
     public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
       return true;
     }
 
     @Override
-    public void done(TezTaskAttemptID taskId) throws IOException {
-      int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) {                       // mapping
-        status.setMapProgress(1.0f);
-      } else {
-        status.setReduceProgress(1.0f);
-      }
-    }
-
-    @Override
-    public synchronized void fsError(TezTaskAttemptID taskId, String message)
-    throws IOException {
-      LOG.fatal("FSError: "+ message + "from task: " + taskId);
-    }
-
-    @Override
-    public void shuffleError(TezTaskAttemptID taskId, String message)
-        throws IOException {
-      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
-    }
-
-    @Override
-    public synchronized void fatalError(TezTaskAttemptID taskId, String msg)
-    throws IOException {
-      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
-    }
-
-    @Override
     public TezTaskDependencyCompletionEventsUpdate
     getDependentTasksCompletionEvents(
         int fromEventIdx, int maxEventsToFetch,
@@ -682,20 +608,13 @@ public class LocalJobRunnerTez implements ClientProtocol {
       return null;
     }
 
-
     @Override
-    public void taskAttemptFailed(TezTaskAttemptID attemptID,
-        TezEvent taskFailedEvent) throws IOException {
+    public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+        throws IOException, InterruptedException {
       // TODO Auto-generated method stub
       // TODO TODONEWTEZ
     }
 
-    @Override
-    public void taskAttemptCompleted(TezTaskAttemptID attemptID,
-        TezEvent taskAttemptCompletedEvent) throws IOException {
-      // TODO Auto-generated method stub
-      // TODO TODONEWTEZ
-    }
   }
 
   public LocalJobRunnerTez(Configuration conf) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index fe3b0e2..338b268 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -444,6 +444,7 @@ public abstract class MRTask extends RunningTaskContext {
    */
   public void statusUpdate() throws IOException, InterruptedException {
     int retries = MAX_RETRIES;
+    /* broken code due to engine re-factor
     while (true) {
       try {
         if (!getUmbilical().statusUpdate(taskAttemptId, status)) {
@@ -462,6 +463,7 @@ public abstract class MRTask extends RunningTaskContext {
         }
       }
     }
+     */
   }
 
   /**
@@ -533,6 +535,7 @@ public abstract class MRTask extends RunningTaskContext {
 
   private void sendDone(TezTaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
+    /* broken code due to engine re-factor
     while (true) {
       try {
         umbilical.done(taskAttemptId);
@@ -546,6 +549,7 @@ public abstract class MRTask extends RunningTaskContext {
         }
       }
     }
+    */
   }
 
   public void updateCounters() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
index d602459..6323fc9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
@@ -159,14 +159,21 @@ class TezTaskReporterImpl
               taskProgress.get(),
               taskProgress.toString(), 
               this.mrTask.counters);
+
+          // broken code now due to tez engine changes
+          taskFound = false;
+          /*
           taskFound = 
               umbilical.statusUpdate(
                   this.mrTask.getTaskAttemptId(), this.mrTask.getStatus());
+           */
           this.mrTask.getStatus().clearStatus();
         }
         else {
           // send ping 
-          taskFound = umbilical.ping(this.mrTask.getTaskAttemptId());
+          taskFound = false;
+          // broken code now due to tez engine changes
+          //umbilical.ping(this.mrTask.getTaskAttemptId());
         }
 
         // if Task Tracker is not aware of our task ID (probably because it died and 
@@ -242,12 +249,14 @@ class TezTaskReporterImpl
     String cause = tCause == null 
                    ? StringUtils.stringifyException(throwable)
                    : StringUtils.stringifyException(tCause);
-    try {
+/*
+                   try {
       umbilical.fatalError(mrTask.getTaskAttemptId(), cause);
     } catch (IOException ioe) {
       LOG.fatal("Failed to contact the tasktracker", ioe);
       System.exit(-1);
     }
+    */
   }
 
   public TezTaskUmbilicalProtocol getUmbilical() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
index 435f33d..917ecc0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -159,9 +159,11 @@ public class MRRuntimeTask extends RuntimeTask {
       }
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       exception.printStackTrace(new PrintStream(baos));
+      /* broken code due to engine re-factor
       if (taskContext.getTaskAttemptId() != null) {
         umbilical.fatalError(taskContext.getTaskAttemptId(), baos.toString());
       }
+      */
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c2985e2e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index 1a40ead..006e383 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -29,7 +29,6 @@ import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
@@ -81,29 +80,6 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol
{
     return null;
   }
 
-  @Override
-  public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
-      throws IOException, InterruptedException {
-    LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus);
-    return true;
-  }
-
-  @Override
-  public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace)
-      throws IOException {
-    LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace);
-  }
-
-  @Override
-  public boolean ping(TezTaskAttemptID taskid) throws IOException {
-    LOG.info("Got 'ping' from " + taskid);
-    return true;
-  }
-
-  @Override
-  public void done(TezTaskAttemptID taskid) throws IOException {
-    LOG.info("Got 'done' from " + taskid);
-  }
 
   @Override
   public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
@@ -118,24 +94,6 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol
{
   }
 
   @Override
-  public void shuffleError(TezTaskAttemptID taskId, String message)
-      throws IOException {
-    LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message);
-  }
-
-  @Override
-  public void fsError(TezTaskAttemptID taskId, String message)
-      throws IOException {
-    LOG.info("Got 'fs-error' from " + taskId + ": message=" + message);
-  }
-
-  @Override
-  public void fatalError(TezTaskAttemptID taskId, String message)
-      throws IOException {
-    LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message);
-  }
-
-  @Override
   public void outputReady(TezTaskAttemptID taskAttemptId,
       OutputContext outputContext) throws IOException {
     // TODO Auto-generated method stub
@@ -155,18 +113,4 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol
{
     return null;
   }
 
-  @Override
-  public void taskAttemptFailed(TezTaskAttemptID attemptID,
-      TezEvent taskFailedEvent) throws IOException {
-    // TODO Auto-generated method stub
-    // TODO TODONEWTEZ
-  }
-
-  @Override
-  public void taskAttemptCompleted(TezTaskAttemptID attemptID,
-      TezEvent taskAttemptCompletedEvent) throws IOException {
-    // TODO Auto-generated method stub
-    // TODO TODONEWTEZ
-  }
-
 }


Mime
View raw message