tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [37/50] [abbrv] git commit: Include container log urls and counters as part of history events in the AM logs.
Date Tue, 04 Jun 2013 05:33:39 GMT
Include container log urls and counters as part of history events in the AM logs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6d5625f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6d5625f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6d5625f4

Branch: refs/heads/master
Commit: 6d5625f4fa38cf35e6f7201fb4b18c47ec6964d3
Parents: 34de34b
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu May 30 13:34:38 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu May 30 13:34:38 2013 -0700

----------------------------------------------------------------------
 tez-dag/src/main/avro/HistoryEvents.avpr           |   35 +++++++-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |    5 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      |   27 ++++++-
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  |    5 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |    6 +-
 .../apache/tez/dag/history/events/AvroUtils.java   |   53 ++++++++++++
 .../tez/dag/history/events/DAGFinishedEvent.java   |    9 ++-
 .../history/events/TaskAttemptFinishedEvent.java   |   10 ++-
 .../history/events/TaskAttemptStartedEvent.java    |   11 ++-
 .../tez/dag/history/events/TaskFinishedEvent.java  |    9 ++-
 .../dag/history/events/VertexFinishedEvent.java    |   10 ++-
 .../tez/dag/app/rm/container/TestAMContainer.java  |    3 +-
 .../apache/tez/mapreduce/examples/MRRSleepJob.java |   65 +++++++++------
 13 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/avro/HistoryEvents.avpr
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/avro/HistoryEvents.avpr b/tez-dag/src/main/avro/HistoryEvents.avpr
index 04333e6..8805c6d 100644
--- a/tez-dag/src/main/avro/HistoryEvents.avpr
+++ b/tez-dag/src/main/avro/HistoryEvents.avpr
@@ -21,6 +21,29 @@
 
  "types": [
 
+     {"type": "record", "name": "TezCounter",
+      "fields": [
+          {"name": "name", "type": "string"},
+          {"name": "displayName", "type": "string"},
+          {"name": "value", "type": "long"}
+      ]
+     },
+
+     {"type": "record", "name": "TezCounterGroup",
+      "fields": [
+          {"name": "name", "type": "string"},
+          {"name": "displayName", "type": "string"},
+          {"name": "counts", "type": {"type": "array", "items": "TezCounter"}}
+      ]
+     },
+
+     {"type": "record", "name": "TezCounters",
+      "fields": [
+          {"name": "groups", "type": {"type": "array", "items": "TezCounterGroup"}}
+      ]
+     },
+
+
      {"type": "record", "name": "AMStarted",
       "fields": [
           {"name": "applicationAttemptId", "type": "string"},
@@ -50,7 +73,8 @@
           {"name": "dagId", "type": "string"},
           {"name": "finishTime", "type": "long"},
           {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"}
+          {"name": "diagnostics", "type": "string"},
+          {"name": "counters", "type": "TezCounters"}
       ]
      },
 
@@ -71,7 +95,8 @@
           {"name": "vertexId", "type": "string"},
           {"name": "finishTime", "type": "long"},
           {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"}
+          {"name": "diagnostics", "type": "string"},
+          {"name": "counters", "type": "TezCounters"}
       ]
      },
 
@@ -89,7 +114,8 @@
           {"name": "vertexName", "type": "string"},
           {"name": "taskId", "type": "string"},
           {"name": "finishTime", "type": "long"},
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "counters", "type": "TezCounters"}
       ]
      },
 
@@ -109,7 +135,8 @@
           {"name": "taskAttemptId", "type": "string"},
           {"name": "finishTime", "type": "long"},
           {"name": "status", "type": "string"},
-          {"name": "diagnostics", "type": "string"}
+          {"name": "diagnostics", "type": "string"},
+          {"name": "counters", "type": "TezCounters"}
       ]
      },
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3681c85..d2395ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -625,7 +625,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   void logJobHistoryFinishedEvent() {
     this.setFinishTime();
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, finishTime,
-        DAGStatus.State.SUCCEEDED, "");
+        DAGStatus.State.SUCCEEDED, "", getAllCounters());
     this.eventHandler.handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }
@@ -641,7 +641,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
     DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
-        state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+        state, StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
+        getAllCounters());
     this.eventHandler.handle(
         new DAGHistoryEvent(dagId, finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5225ecf..c14ba8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -794,9 +795,27 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @SuppressWarnings("unchecked")
   protected void logJobHistoryAttemptStarted() {
+    final String containerIdStr = containerId.toString();
+    String inProgressLogsUrl = nodeHttpAddress
+       + "/" + "node/containerlogs"
+       + "/" + containerIdStr
+       + "/" + this.appContext.getUser();
+    String completedLogsUrl = "";
+    if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+        && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
+      String contextStr = "v_" + getTask().getVertex().getName()
+          + "_" + this.attemptId.toString();
+      completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+          + "/" + containerNodeId.toString()
+          + "/" + containerIdStr
+          + "/" + contextStr
+          + "/" + this.appContext.getUser();
+    }
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getTask().getVertex().getName(),
-        launchTime, containerId, containerNodeId);
+        launchTime, containerId, containerNodeId,
+        inProgressLogsUrl, completedLogsUrl);
     eventHandler.handle(new DAGHistoryEvent(
         attemptId.getTaskID().getVertexID().getDAGId(),
         startEvt));
@@ -809,7 +828,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(),
-        getFinishTime(), TaskAttemptState.SUCCEEDED, "");
+        getFinishTime(), TaskAttemptState.SUCCEEDED, "",
+        getCounters());
     // FIXME how do we store information regd completion events
     eventHandler.handle(new DAGHistoryEvent(
         attemptId.getTaskID().getVertexID().getDAGId(),
@@ -823,7 +843,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         attemptId, getTask().getVertex().getName(),
         clock.getTime(), state,
         StringUtils.join(
-            LINE_SEPARATOR, getDiagnostics()));
+            LINE_SEPARATOR, getDiagnostics()),
+        getCounters());
     // FIXME how do we store information regd completion events
     eventHandler.handle(new DAGHistoryEvent(
         attemptId.getTaskID().getVertexID().getDAGId(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index fb738ce..575f32c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -806,14 +806,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
     // FIXME need to handle getting finish time as this function
     // is called from within a transition
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
-        getVertex().getName(), clock.getTime(), TaskState.SUCCEEDED);
+        getVertex().getName(), clock.getTime(), TaskState.SUCCEEDED,
+        getCounters());
     this.eventHandler.handle(new DAGHistoryEvent(
         taskId.getVertexID().getDAGId(), finishEvt));
   }
   
   protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
-        getVertex().getName(), clock.getTime(), finalState);
+        getVertex().getName(), clock.getTime(), finalState, getCounters());
     this.eventHandler.handle(new DAGHistoryEvent(
         taskId.getVertexID().getDAGId(), finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e34382f..c7bceb6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -670,14 +670,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   void logJobHistoryVertexFinishedEvent() {
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
-        vertexName, finishTime, VertexStatus.State.SUCCEEDED, "");
+        vertexName, finishTime, VertexStatus.State.SUCCEEDED, "",
+        getAllCounters());
     this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
   void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, clock.getTime(), state,
-        StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
+        StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
+        getAllCounters());
     this.eventHandler.handle(new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
new file mode 100644
index 0000000..3587964
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AvroUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.util.ArrayList;
+
+import org.apache.avro.util.Utf8;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.dag.history.avro.TezCounter;
+import org.apache.tez.dag.history.avro.TezCounterGroup;
+import org.apache.tez.dag.history.avro.TezCounters;
+
+public class AvroUtils {
+
+  public static TezCounters toAvro(
+      org.apache.tez.common.counters.TezCounters counters) {
+    TezCounters result = new TezCounters();
+    result.groups = new ArrayList<TezCounterGroup>(0);
+    if (counters == null) return result;
+    for (CounterGroup group : counters) {
+      TezCounterGroup g = new TezCounterGroup();
+      g.name = new Utf8(group.getName());
+      g.displayName = new Utf8(group.getDisplayName());
+      g.counts = new ArrayList<TezCounter>(group.size());
+      for (org.apache.tez.common.counters.TezCounter counter : group) {
+        TezCounter c = new TezCounter();
+        c.name = new Utf8(counter.getName());
+        c.displayName = new Utf8(counter.getDisplayName());
+        c.value = counter.getValue();
+        g.counts.add(c);
+      }
+      result.groups.add(g);
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 8794cea..8ded3d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.avro.DAGFinished;
@@ -27,14 +28,17 @@ import org.apache.tez.dag.records.TezDAGID;
 public class DAGFinishedEvent implements HistoryEvent {
 
   private DAGFinished datum = new DAGFinished();
+  // FIXME remove this when we have a proper history
+  private final TezCounters tezCounters;
 
   public DAGFinishedEvent(TezDAGID dagId,
       long finishTime, DAGStatus.State state,
-      String diagnostics) {
+      String diagnostics, TezCounters counters) {
     datum.dagId = dagId.toString();
     datum.finishTime = finishTime;
     datum.status = state.name();
     datum.diagnostics = diagnostics;
+    tezCounters = counters;
   }
 
   @Override
@@ -58,6 +62,7 @@ public class DAGFinishedEvent implements HistoryEvent {
     return "dagId=" + datum.dagId
         + ", finishTime=" + datum.finishTime
         + ", status=" + datum.status
-        + ", diagnostics=" + datum.diagnostics;
+        + ", diagnostics=" + datum.diagnostics
+        + ", counters=" + tezCounters.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 395ba93..9794938 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
@@ -27,17 +28,21 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 public class TaskAttemptFinishedEvent implements HistoryEvent {
 
   private TaskAttemptFinished datum = new TaskAttemptFinished();
+  // FIXME remove this when we have a proper history
+  private final TezCounters tezCounters;
 
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
       long finishTime,
       TaskAttemptState state,
-      String diagnostics) {
+      String diagnostics,
+      TezCounters counters) {
     datum.taskAttemptId = taId.toString();
     datum.vertexName = vertexName;
     datum.finishTime = finishTime;
     datum.status = state.name();
     datum.diagnostics = diagnostics;
+    tezCounters = counters;
   }
 
   @Override
@@ -62,6 +67,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", taskAttemptId=" + datum.taskAttemptId
         + ", finishTime=" + datum.finishTime
         + ", status=" + datum.status
-        + ", diagnostics=" + datum.diagnostics;
+        + ", diagnostics=" + datum.diagnostics
+        + ", counters=" + tezCounters.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 97c9f2f..5d29b21 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -28,15 +28,20 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 public class TaskAttemptStartedEvent implements HistoryEvent {
 
   private TaskAttemptStarted datum = new TaskAttemptStarted();
+  private final String inProgressLogsUrl;
+  private final String completedLogsUrl;
 
   public TaskAttemptStartedEvent(TezTaskAttemptID taId,
       String vertexName, long startTime,
-      ContainerId containerId, NodeId nodeId) {
+      ContainerId containerId, NodeId nodeId,
+      String inProgressLogsUrl, String completedLogsUrl) {
     datum.taskAttemptId = taId.toString();
     datum.vertexName = vertexName;
     datum.startTime = startTime;
     datum.containerId = containerId.toString();
     datum.nodeId = nodeId.toString();
+    this.inProgressLogsUrl = inProgressLogsUrl;
+    this.completedLogsUrl = completedLogsUrl;
   }
 
   @Override
@@ -61,6 +66,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", taskAttemptId=" + datum.taskAttemptId
         + ", startTime=" + datum.startTime
         + ", containerId=" + datum.containerId
-        + ", nodeId=" + datum.nodeId;
+        + ", nodeId=" + datum.nodeId
+        + ", inProgressLogs=" + inProgressLogsUrl
+        + ", completedLogs=" + completedLogsUrl;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 60f810b..5408a72 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
@@ -27,14 +28,17 @@ import org.apache.tez.dag.records.TezTaskID;
 public class TaskFinishedEvent implements HistoryEvent {
 
   private TaskFinished datum = new TaskFinished();
+  // FIXME remove this when we have a proper history
+  private final TezCounters tezCounters;
 
   public TaskFinishedEvent(TezTaskID taskId,
       String vertexName, long finishTime,
-      TaskState state) {
+      TaskState state, TezCounters counters) {
     datum.vertexName = vertexName;
     datum.taskId = taskId.toString();
     datum.finishTime = finishTime;
     datum.status = state.name();
+    tezCounters = counters;
   }
 
   @Override
@@ -58,6 +62,7 @@ public class TaskFinishedEvent implements HistoryEvent {
     return "vertexName=" + datum.vertexName
         + ", taskId=" + datum.taskId
         + ", finishTime=" + datum.finishTime
-        + ", status=" + datum.status;
+        + ", status=" + datum.status
+        + ", counters=" + tezCounters.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 7e86c66..ec13ebd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
@@ -27,15 +28,19 @@ import org.apache.tez.dag.records.TezVertexID;
 public class VertexFinishedEvent implements HistoryEvent {
 
   private VertexFinished datum = new VertexFinished();
+  // FIXME remove this when we have a proper history
+  private final TezCounters tezCounters;
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long finishTime,
-      VertexStatus.State state, String diagnostics) {
+      VertexStatus.State state, String diagnostics,
+      TezCounters counters) {
     datum.vertexName = vertexName;
     datum.vertexId = vertexId.toString();
     datum.finishTime = finishTime;
     datum.status = state.name();
     datum.diagnostics = diagnostics;
+    tezCounters = counters;
   }
 
   @Override
@@ -60,6 +65,7 @@ public class VertexFinishedEvent implements HistoryEvent {
         + ", vertexId=" + datum.vertexId
         + ", finishTime=" + datum.finishTime
         + ", status=" + datum.status
-        + ", diagnostics=" + datum.diagnostics;
+        + ", diagnostics=" + datum.diagnostics
+        + ", counters=" + tezCounters.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 4541a2e..cf01f43 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -839,7 +839,8 @@ public class TestAMContainer {
       resource = BuilderUtils.newResource(1024, 1);
       priority = BuilderUtils.newPriority(1);
       container = BuilderUtils.newContainer(containerID, nodeID,
-          nodeHttpAddress, resource, priority, null, rmIdentifier);
+          nodeHttpAddress, resource, priority, null);
+
       chh = mock(ContainerHeartbeatHandler.class);
       
       InetSocketAddress addr = new InetSocketAddress("localhost", 0);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6d5625f4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 6115c6f..627d583 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -72,6 +72,8 @@ public class MRRSleepJob extends Configured implements Tool {
       "mrr.sleepjob.ireduce.sleep.time";
   public static String IREDUCE_STAGES_COUNT =
       "mrr.sleepjob.ireduces.stages.count";
+  public static String IREDUCE_TASKS_COUNT =
+      "mrr.sleepjob.ireduces.tasks.count";
 
   // Flags to inject failures
   public static String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
@@ -109,28 +111,21 @@ public class MRRSleepJob extends Configured implements Tool {
         InputSplit ignored, TaskAttemptContext taskContext)
         throws IOException {
       Configuration conf = taskContext.getConfiguration();
-      String vertexName = conf.get(
-          org.apache.tez.mapreduce.hadoop.MRJobConfig.VERTEX_NAME);
-      boolean isIntermediateReduce =
-          MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) != -1;
-
-      final int count =
-          (isIntermediateReduce)?
-              conf.getInt(IREDUCE_SLEEP_COUNT, 1) :
-              conf.getInt(MAP_SLEEP_COUNT, 1);
-      if (count < 0) throw new IOException("Invalid map count: " + count);
 
-      int totalIReduces = MultiStageMRConfigUtil.getNumIntermediateStages(conf);
-      boolean finalIReduce = totalIReduces ==
-          (MultiStageMRConfigUtil.getIntermediateStageNum(vertexName) + 1);
+      final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) {
+        throw new IOException("Invalid map count: " + count);
+      }
 
-      final int redcount = finalIReduce?
-          conf.getInt(REDUCE_SLEEP_COUNT, 1) :
-            conf.getInt(IREDUCE_SLEEP_COUNT, 1);
-      if (redcount < 0)
-        throw new IOException("Invalid reduce count: " + redcount);
+      int totalIReduces = conf.getInt(IREDUCE_STAGES_COUNT, 1);
 
-      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      int reduceTasks = totalIReduces == 0?
+          taskContext.getNumReduceTasks() :
+            conf.getInt(IREDUCE_TASKS_COUNT, 1);
+      int sleepCount = totalIReduces == 0?
+          conf.getInt(REDUCE_SLEEP_COUNT,1) :
+            conf.getInt(IREDUCE_SLEEP_COUNT,1);
+      final int emitPerMapTask = sleepCount * reduceTasks;
 
       return new RecordReader<IntWritable,IntWritable>() {
         private int records = 0;
@@ -219,6 +214,9 @@ public class MRRSleepJob extends Configured implements Tool {
                ) throws IOException, InterruptedException {
       //it is expected that every map processes mapSleepCount number of records.
       try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
         LOG.info("Sleeping in InitialMap"
             + ", vertexName=" + vertexName
             + ", taskAttemptId=" + context.getTaskAttemptID()
@@ -228,7 +226,9 @@ public class MRRSleepJob extends Configured implements Tool {
             + (mapSleepDuration * (mapSleepCount - count)));
         context.setStatus("Sleeping... (" +
           (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
-        Thread.sleep(mapSleepDuration);
+        if ((mapSleepCount - count) > 0) {
+          Thread.sleep(mapSleepDuration);
+        }
         if (throwError || throwFatal) {
           throw new IOException("Throwing a simulated error from map");
         }
@@ -242,6 +242,9 @@ public class MRRSleepJob extends Configured implements Tool {
       // each reducer will get reduceSleepCount number of keys.
       int k = key.get();
       for (int i = 0; i < value.get(); ++i) {
+        LOG.info("Writing in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + (k+i) + " value 1");
         context.write(new IntWritable(k + i), new IntWritable(1));
       }
     }
@@ -269,6 +272,10 @@ public class MRRSleepJob extends Configured implements Tool {
         Context context)
             throws IOException, InterruptedException {
       try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
+
         LOG.info("Sleeping in IntermediateReduce"
             + ", vertexName=" + vertexName
             + ", taskAttemptId=" + context.getTaskAttemptID()
@@ -278,7 +285,9 @@ public class MRRSleepJob extends Configured implements Tool {
             + (iReduceSleepDuration * (iReduceSleepCount - count)));
         context.setStatus("Sleeping... (" +
           (iReduceSleepDuration * (iReduceSleepCount - count)) + ") ms left");
-        Thread.sleep(iReduceSleepDuration);
+        if ((iReduceSleepCount - count) > 0) {
+          Thread.sleep(iReduceSleepDuration);
+        }
       }
       catch (InterruptedException ex) {
         throw (IOException)new IOException(
@@ -290,6 +299,9 @@ public class MRRSleepJob extends Configured implements Tool {
       int k = key.get();
       for (IntWritable value : values) {
         for (int i = 0; i < value.get(); ++i) {
+          LOG.info("Writing in " + vertexName
+              + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+              + " key " + (k+i) + " value 1");
           context.write(new IntWritable(k + i), new IntWritable(1));
         }
       }
@@ -318,6 +330,9 @@ public class MRRSleepJob extends Configured implements Tool {
                        Context context)
       throws IOException {
       try {
+        LOG.info("Reading in " + vertexName
+            + " taskid " + context.getTaskAttemptID().getTaskID().getId()
+            + " key " + key.get());
         LOG.info("Sleeping in FinalReduce"
             + ", vertexName=" + vertexName
             + ", taskAttemptId=" + context.getTaskAttemptID()
@@ -327,8 +342,9 @@ public class MRRSleepJob extends Configured implements Tool {
             + (reduceSleepDuration * (reduceSleepCount - count)));
         context.setStatus("Sleeping... (" +
             (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
-        Thread.sleep(reduceSleepDuration);
-
+        if ((reduceSleepCount - count) > 0) {
+          Thread.sleep(reduceSleepDuration);
+        }
       }
       catch (InterruptedException ex) {
         throw (IOException)new IOException(
@@ -357,6 +373,7 @@ public class MRRSleepJob extends Configured implements Tool {
     conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
     conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
     conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+    conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
 
     // Configure intermediate reduces
     conf.setInt(
@@ -364,7 +381,7 @@ public class MRRSleepJob extends Configured implements Tool {
         iReduceStagesCount);
     LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
 
-    for (int i = 0; i < iReduceStagesCount; ++i) {
+    for (int i = 1; i <= iReduceStagesCount; ++i) {
       // Set reducer class for intermediate reduce
       conf.setClass(
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,


Mime
View raw message