tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [2/2] git commit: TEZ-1176. Set parallelism should end up sending an update to ATS if numTasks are updated at run-time. (hitesh)
Date Thu, 16 Oct 2014 22:57:04 GMT
TEZ-1176. Set parallelism should end up sending an update to ATS if numTasks are updated at
run-time. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2600c538
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2600c538
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2600c538

Branch: refs/heads/master
Commit: 2600c538886c5864a9248cff04f445cfbde8b203
Parents: 4186c6d
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Oct 16 15:45:30 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Oct 16 15:56:28 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  2 +-
 .../events/VertexParallelismUpdatedEvent.java   | 17 ++++-
 .../impl/HistoryEventJsonConversion.java        | 51 ++++++++++++-
 .../apache/tez/dag/history/utils/DAGUtils.java  | 77 ++++++++++++--------
 .../org/apache/tez/dag/app/TestPreemption.java  |  6 +-
 .../TestHistoryEventsProtoConversion.java       |  4 +-
 .../impl/TestHistoryEventJsonConversion.java    | 51 ++++++++++++-
 .../ats/HistoryEventTimelineConversion.java     | 39 +++++++++-
 .../ats/TestHistoryEventTimelineConversion.java | 42 +++++++++++
 11 files changed, 246 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d86b28..25da87f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,7 @@ ALL CHANGES:
   priority
   TEZ-1632. NPE at TestPreemption.testPreemptionWithoutSession
   TEZ-1674. Rename configuration parameters related to counters / memory scaling.
+  TEZ-1176. Set parallelism should end up sending an update to ATS if numTasks are updated
at run-time.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 065614c..3859373 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -64,6 +64,7 @@ public class ATSConstants {
   public static final String COUNTERS = "counters";
   public static final String STATS = "stats";
   public static final String NUM_TASKS = "numTasks";
+  public static final String OLD_NUM_TASKS = "oldNumTasks";
   public static final String NUM_COMPLETED_TASKS = "numCompletedTasks";
   public static final String NUM_SUCCEEDED_TASKS = "numSucceededTasks";
   public static final String NUM_FAILED_TASKS = "numFailedTasks";
@@ -72,6 +73,7 @@ public class ATSConstants {
   public static final String IN_PROGRESS_LOGS_URL = "inProgressLogsURL";
   public static final String COMPLETED_LOGS_URL = "completedLogsURL";
   public static final String EXIT_STATUS = "exitStatus";
+  public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 34fffd8..9f88523 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1336,7 +1336,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         VertexParallelismUpdatedEvent parallelismUpdatedEvent =
             new VertexParallelismUpdatedEvent(vertexId, numTasks,
                 vertexLocationHint,
-                sourceEdgeManagers, rootInputSpecUpdates);
+                sourceEdgeManagers, rootInputSpecUpdates, oldNumTasks);
         appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
             parallelismUpdatedEvent));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 2389ff2..ef21537 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -42,9 +42,11 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
 
   private TezVertexID vertexID;
   private int numTasks;
+  private int oldNumTasks;
   private VertexLocationHint vertexLocationHint;
   private Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers;
   private Map<String, InputSpecUpdate> rootInputSpecUpdates;
+  private long updateTime;
 
   public VertexParallelismUpdatedEvent() {
   }
@@ -52,12 +54,14 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
   public VertexParallelismUpdatedEvent(TezVertexID vertexID,
       int numTasks, VertexLocationHint vertexLocationHint,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates) {
+      Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) {
     this.vertexID = vertexID;
     this.numTasks = numTasks;
     this.vertexLocationHint = vertexLocationHint;
     this.sourceEdgeManagers = sourceEdgeManagers;
     this.rootInputSpecUpdates = rootInputSpecUpdates;
+    this.updateTime = System.currentTimeMillis();
+    this.oldNumTasks = oldNumTasks;
   }
 
   @Override
@@ -72,7 +76,7 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
 
   @Override
   public boolean isHistoryEvent() {
-    return false;
+    return true;
   }
 
   public VertexParallelismUpdatedProto toProto() {
@@ -189,4 +193,13 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
   public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {
     return rootInputSpecUpdates;
   }
+
+  public long getUpdateTime() {
+    return updateTime;
+  }
+
+  public int getOldNumTasks() {
+    return oldNumTasks;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index a9987d6..37292ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,7 +18,12 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -35,6 +40,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
@@ -96,11 +102,13 @@ public class HistoryEventJsonConversion {
       case TASK_ATTEMPT_FINISHED:
         jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
         break;
+      case VERTEX_PARALLELISM_UPDATED:
+        jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent)
historyEvent);
+        break;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
-      case VERTEX_PARALLELISM_UPDATED:
       case DAG_COMMIT_STARTED:
         throw new UnsupportedOperationException("Invalid Event, does not support history"
             + ", eventType=" + historyEvent.getEventType());
@@ -599,7 +607,8 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
-  private static JSONObject convertVertexStartedEvent(VertexStartedEvent event) throws JSONException
{
+  private static JSONObject convertVertexStartedEvent(VertexStartedEvent event)
+      throws JSONException {
     JSONObject jsonObject = new JSONObject();
     jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
     jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
@@ -631,4 +640,42 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
+  private static JSONObject convertVertexParallelismUpdatedEvent(
+      VertexParallelismUpdatedEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject updateEvent = new JSONObject();
+    updateEvent.put(ATSConstants.TIMESTAMP, event.getUpdateTime());
+    updateEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
+
+    JSONObject eventInfo = new JSONObject();
+    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
+    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty())
{
+      JSONObject updatedEdgeManagers = new JSONObject();
+      for (Entry<String, EdgeManagerPluginDescriptor> entry :
+          event.getSourceEdgeManagers().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(),
+            new JSONObject(DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue())));
+      }
+      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
+    events.put(updateEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    // TODO add more on all other updated information
+    return jsonObject;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index cab3c83..5d364fd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.utils;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
@@ -39,42 +41,43 @@ import org.codehaus.jettison.json.JSONObject;
 
 public class DAGUtils {
 
-  static final String DAG_NAME_KEY = "dagName";
-  static final String VERTICES_KEY = "vertices";
-  static final String EDGES_KEY = "edges";
-  static final String VERTEX_GROUPS_KEY = "vertexGroups";
-
-  static final String VERTEX_NAME_KEY = "vertexName";
-  static final String PROCESSOR_CLASS_KEY = "processorClass";
-  static final String IN_EDGE_IDS_KEY = "inEdgeIds";
-  static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
-  static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
-  static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
-  static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+  public static final String DAG_NAME_KEY = "dagName";
+  public static final String VERTICES_KEY = "vertices";
+  public static final String EDGES_KEY = "edges";
+  public static final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+  public static final String VERTEX_NAME_KEY = "vertexName";
+  public static final String PROCESSOR_CLASS_KEY = "processorClass";
+  public static final String IN_EDGE_IDS_KEY = "inEdgeIds";
+  public static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+  public static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+  public static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+  public static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
       "vertexManagerPluginClass";
-  static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
-  static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
-  static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
-
-  static final String EDGE_ID_KEY = "edgeId";
-  static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
-  static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
-  static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
-  static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
-  static final String SCHEDULING_TYPE_KEY = "schedulingType";
-  static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
-  static final String EDGE_DESTINATION_CLASS_KEY =
+  public static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
+  public static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
+  public static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
+
+  public static final String EDGE_ID_KEY = "edgeId";
+  public static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+  public static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+  public static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+  public static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+  public static final String SCHEDULING_TYPE_KEY = "schedulingType";
+  public static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+  public static final String EDGE_DESTINATION_CLASS_KEY =
       "edgeDestinationClass";
+  public static final String EDGE_MANAGER_CLASS_KEY = "edgeManagerClass";
 
-  static final String NAME_KEY = "name";
-  static final String CLASS_KEY = "class";
-  static final String INITIALIZER_KEY = "initializer";
+  public static final String NAME_KEY = "name";
+  public static final String CLASS_KEY = "class";
+  public static final String INITIALIZER_KEY = "initializer";
 
-  static final String VERTEX_GROUP_NAME_KEY = "groupName";
-  static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
-  static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
-  static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
-  static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+  public static final String VERTEX_GROUP_NAME_KEY = "groupName";
+  public static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+  public static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+  public static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+  public static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
 
 
 
@@ -332,4 +335,14 @@ public class DAGUtils {
     return vertexStatsMap;
   }
 
+  public static Map<String,Object> convertEdgeManagerPluginDescriptor(
+      EdgeManagerPluginDescriptor descriptor) {
+    Map<String, Object> jsonDescriptor = new HashMap<String, Object>();
+    jsonDescriptor.put(EDGE_MANAGER_CLASS_KEY, descriptor.getClassName());
+    if (descriptor.getHistoryText() != null && !descriptor.getHistoryText().isEmpty())
{
+      jsonDescriptor.put(USER_PAYLOAD_AS_TEXT, descriptor.getHistoryText());
+    }
+    return jsonDescriptor;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..cfb1c9f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -99,11 +99,7 @@ public class TestPreemption {
     // now the MockApp has been started. sync with it to get the launcher
     syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
 
-    DAGImpl dagImpl;
-    do {
-      Thread.sleep(100); // usually needs to sleep 2-3 times
-    } while ((dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG()) == null);
-
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
     int vertexIndex = 0;
     int upToTaskVersion = 3;
     TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index f52671c..f030db7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -278,7 +278,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexParallelismUpdatedEvent(
               TezVertexID.getInstance(
                   TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
-              100, null, null, rootInputSpecUpdates);
+              100, null, null, rootInputSpecUpdates, 1);
       VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -315,7 +315,7 @@ public class TestHistoryEventsProtoConversion {
               100, VertexLocationHint.create(Arrays.asList(TaskLocationHint.createTaskLocationHint(
               new HashSet<String>(Arrays.asList("h1")),
               new HashSet<String>(Arrays.asList("r1"))))),
-              sourceEdgeManagers, null);
+              sourceEdgeManagers, null, 1);
 
       VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
           testProtoConversion(event);

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 2149053..d7aca55 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -18,12 +18,17 @@
 
 package org.apache.tez.dag.history.logging.impl;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -52,11 +57,14 @@ import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -122,7 +130,7 @@ public class TestHistoryEventJsonConversion {
           event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
           break;
         case VERTEX_PARALLELISM_UPDATED:
-          event = new VertexParallelismUpdatedEvent();
+          event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 10);
           break;
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
@@ -176,4 +184,45 @@ public class TestHistoryEventJsonConversion {
     }
   }
 
+  @Test
+  public void testConvertVertexParallelismUpdatedEvent() throws JSONException {
+    TezVertexID vId = TezVertexID.getInstance(
+        TezDAGID.getInstance(
+            ApplicationId.newInstance(1l, 1), 1), 1);
+    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
+        new HashMap<String, EdgeManagerPluginDescriptor>();
+    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
+        edgeMgrs, null, 10);
+
+    JSONObject jsonObject = HistoryEventJsonConversion.convertToJson(event);
+    Assert.assertNotNull(jsonObject);
+    Assert.assertEquals(vId.toString(), jsonObject.getString(ATSConstants.ENTITY));
+    Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, jsonObject.get(ATSConstants.ENTITY_TYPE));
+
+    JSONArray events = jsonObject.getJSONArray(ATSConstants.EVENTS);
+    Assert.assertEquals(1, events.length());
+
+    JSONObject evt = events.getJSONObject(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(),
+        evt.getString(ATSConstants.EVENT_TYPE));
+
+    JSONObject evtInfo = evt.getJSONObject(ATSConstants.EVENT_INFO);
+    Assert.assertEquals(1, evtInfo.getInt(ATSConstants.NUM_TASKS));
+    Assert.assertEquals(10, evtInfo.getInt(ATSConstants.OLD_NUM_TASKS));
+    Assert.assertNotNull(evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS));
+
+    JSONObject updatedEdgeMgrs = evtInfo.getJSONObject(ATSConstants.UPDATED_EDGE_MANAGERS);
+    Assert.assertEquals(1, updatedEdgeMgrs.length());
+    Assert.assertNotNull(updatedEdgeMgrs.getJSONObject("a"));
+    JSONObject updatedEdgeMgr = updatedEdgeMgrs.getJSONObject("a");
+
+    Assert.assertEquals("a.class", updatedEdgeMgr.getString(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+
+    JSONObject otherInfo = jsonObject.getJSONObject(ATSConstants.OTHER_INFO);
+    Assert.assertEquals(1, otherInfo.getInt(ATSConstants.NUM_TASKS));
+
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index a81bdf4..3ed3077 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -19,11 +19,14 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -41,6 +44,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
@@ -99,11 +103,14 @@ public class HistoryEventTimelineConversion {
       case TASK_ATTEMPT_FINISHED:
         timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
         break;
+      case VERTEX_PARALLELISM_UPDATED:
+        timelineEntity = convertVertexParallelismUpdatedEvent(
+            (VertexParallelismUpdatedEvent) historyEvent);
+        break;
       case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
-      case VERTEX_PARALLELISM_UPDATED:
       case DAG_COMMIT_STARTED:
         throw new UnsupportedOperationException("Invalid Event, does not support history"
             + ", eventType=" + historyEvent.getEventType());
@@ -478,4 +485,34 @@ public class HistoryEventTimelineConversion {
     return atsEntity;
   }
 
+  private static TimelineEntity convertVertexParallelismUpdatedEvent(
+      VertexParallelismUpdatedEvent event) {
+    TimelineEntity atsEntity = new TimelineEntity();
+    atsEntity.setEntityId(event.getVertexID().toString());
+    atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
+
+    TimelineEvent updateEvt = new TimelineEvent();
+    updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
+    updateEvt.setTimestamp(event.getUpdateTime());
+
+    Map<String,Object> eventInfo = new HashMap<String, Object>();
+    if (event.getSourceEdgeManagers() != null && !event.getSourceEdgeManagers().isEmpty())
{
+      Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>();
+      for (Entry<String, EdgeManagerPluginDescriptor> entry :
+          event.getSourceEdgeManagers().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(),
+            DAGUtils.convertEdgeManagerPluginDescriptor(entry.getValue()));
+      }
+      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
+    updateEvt.setEventInfo(eventInfo);
+    atsEntity.addEvent(updateEvt);
+
+    atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
+
+    return atsEntity;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2600c538/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index d2e366d..f275921 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -18,12 +18,18 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -52,6 +58,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -176,4 +183,39 @@ public class TestHistoryEventTimelineConversion {
     }
   }
 
+  @Test
+  public void testConvertVertexParallelismUpdatedEvent() {
+    TezVertexID vId = TezVertexID.getInstance(
+        TezDAGID.getInstance(
+            ApplicationId.newInstance(1l, 1), 1), 1);
+    Map<String, EdgeManagerPluginDescriptor> edgeMgrs =
+        new HashMap<String, EdgeManagerPluginDescriptor>();
+    edgeMgrs.put("a", EdgeManagerPluginDescriptor.create("a.class").setHistoryText("text"));
+    VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
+        edgeMgrs, null, 10);
+
+    TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
+    Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
+    Assert.assertEquals(vId.toString(), timelineEntity.getEntityId());
+    Assert.assertEquals(1, timelineEntity.getEvents().size());
+
+    TimelineEvent evt = timelineEntity.getEvents().get(0);
+    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType());
+    Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
+    Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS));
+    Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
+
+    Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
+        evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS);
+    Assert.assertEquals(1, updatedEdgeMgrs.size());
+    Assert.assertTrue(updatedEdgeMgrs.containsKey("a"));
+    Map<String, Object> updatedEdgeMgr = (Map<String, Object>) updatedEdgeMgrs.get("a");
+
+    Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
+
+    Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
+
+  }
+
+
 }


Mime
View raw message