tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] TEZ-407. Support multiple inputs and connection patterns in Tez (bikas)
Date Wed, 11 Sep 2013 19:57:57 GMT
Updated Branches:
  refs/heads/TEZ-398 0f5298a5e -> 8d89485fd


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 7e7a8a5..3a140a6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -62,6 +61,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -71,7 +71,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
@@ -108,7 +107,7 @@ public class TestVertexImpl {
   private AppContext appContext;
   private VertexLocationHint vertexLocationHint;
   private Configuration conf;
-  private Map<String, EdgeProperty> edges;
+  private Map<String, Edge> edges;
 
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
@@ -477,27 +476,32 @@ public class TestVertexImpl {
     Map<String, EdgePlan> edgePlans =
         DagTypeConverters.createEdgePlanMapFromDAGPlan(dagPlan.getEdgeList());
 
+    // TODO - this test logic is tightly linked to impl DAGImpl code.
     for (int i = 0; i < vCnt; ++i) {
       VertexPlan vertexPlan = dagPlan.getVertex(i);
       Vertex vertex = vertices.get(vertexPlan.getName());
-      Map<Vertex, EdgeProperty> inVertices =
-          new HashMap<Vertex, EdgeProperty>();
+      Map<Vertex, Edge> inVertices =
+          new HashMap<Vertex, Edge>();
 
-      Map<Vertex, EdgeProperty> outVertices =
-          new HashMap<Vertex, EdgeProperty>();
+      Map<Vertex, Edge> outVertices =
+          new HashMap<Vertex, Edge>();
 
       for(String inEdgeId : vertexPlan.getInEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(inEdgeId);
         Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName());
-        EdgeProperty edgeProp = this.edges.get(inEdgeId);
-        inVertices.put(inVertex, edgeProp);
+        Edge edge = this.edges.get(inEdgeId);
+        edge.setSourceVertex(inVertex);
+        edge.setDestinationVertex(vertex);
+        inVertices.put(inVertex, edge);
       }
 
       for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
         EdgePlan edgePlan = edgePlans.get(outEdgeId);
         Vertex outVertex = this.vertices.get(edgePlan.getOutputVertexName());
-        EdgeProperty edgeProp = this.edges.get(outEdgeId);
-        outVertices.put(outVertex, edgeProp);
+        Edge edge = this.edges.get(outEdgeId);
+        edge.setSourceVertex(vertex);
+        edge.setDestinationVertex(outVertex);
+        outVertices.put(outVertex, edge);
       }
       LOG.info("Setting input vertices for vertex " + vertex.getName()
           + ", inputVerticesCnt=" + inVertices.size());
@@ -526,8 +530,15 @@ public class TestVertexImpl {
     doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(dagId).when(dag).getID();
     setupVertices();
-    edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(
-        dagPlan.getEdgeList());
+    
+    // TODO - this test logic is tightly linked to impl DAGImpl code.
+    edges = new HashMap<String, Edge>();
+    for (EdgePlan edgePlan : dagPlan.getEdgeList()) {
+      EdgeProperty edgeProperty = DagTypeConverters
+          .createEdgePropertyMapFromDAGPlan(edgePlan);
+      edges.put(edgePlan.getId(), new Edge(edgeProperty, dispatcher.getEventHandler()));
+    }
+
     parseVertexEdges();
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
@@ -607,43 +618,43 @@ public class TestVertexImpl {
     Assert.assertEquals("x3.y3", v3.getProcessorName());
     Assert.assertEquals("foo", v3.getJavaOpts());
 
-    Assert.assertEquals(2, v3.getInputSpecList().size());
+    Assert.assertEquals(2, v3.getInputSpecList(0).size());
     Assert.assertEquals(2, v3.getInputVerticesCount());
     Assert.assertEquals(2, v3.getOutputVerticesCount());
     Assert.assertEquals(2, v3.getOutputVerticesCount());
 
-    Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(0)
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(0)
         .getSourceVertexName())
-        || "vertex2".equals(v3.getInputSpecList().get(0)
+        || "vertex2".equals(v3.getInputSpecList(0).get(0)
             .getSourceVertexName()));
-    Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(1)
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(1)
         .getSourceVertexName())
-        || "vertex2".equals(v3.getInputSpecList().get(1)
+        || "vertex2".equals(v3.getInputSpecList(0).get(1)
             .getSourceVertexName()));
-    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(0)
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(0)
         .getInputDescriptor().getClassName())
-        || "i3_v2".equals(v3.getInputSpecList().get(0)
+        || "i3_v2".equals(v3.getInputSpecList(0).get(0)
             .getInputDescriptor().getClassName()));
-    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(1)
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(1)
         .getInputDescriptor().getClassName())
-        || "i3_v2".equals(v3.getInputSpecList().get(1)
+        || "i3_v2".equals(v3.getInputSpecList(0).get(1)
             .getInputDescriptor().getClassName()));
 
-    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(0)
         .getDestinationVertexName())
-        || "vertex5".equals(v3.getOutputSpecList().get(0)
+        || "vertex5".equals(v3.getOutputSpecList(0).get(0)
             .getDestinationVertexName()));
-    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(1)
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(1)
         .getDestinationVertexName())
-        || "vertex5".equals(v3.getOutputSpecList().get(1)
+        || "vertex5".equals(v3.getOutputSpecList(0).get(1)
             .getDestinationVertexName()));
-    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(0)
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(0)
         .getOutputDescriptor().getClassName())
-        || "o3_v5".equals(v3.getOutputSpecList().get(0)
+        || "o3_v5".equals(v3.getOutputSpecList(0).get(0)
             .getOutputDescriptor().getClassName()));
-    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(1)
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(1)
         .getOutputDescriptor().getClassName())
-        || "o3_v5".equals(v3.getOutputSpecList().get(1)
+        || "o3_v5".equals(v3.getOutputSpecList(0).get(1)
             .getOutputDescriptor().getClassName()));
   }
 
@@ -657,43 +668,26 @@ public class TestVertexImpl {
 
   @Test//(timeout = 5000)
   public void testVertexSetParallelism() {
-    VertexImpl v2 = vertices.get("vertex2");
-    initVertex(v2);
-    Assert.assertEquals(2, v2.getTotalTasks());
-    Map<TezTaskID, Task> tasks = v2.getTasks();
+    VertexImpl v3 = vertices.get("vertex3");
+    initVertex(v3);
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
     TezTaskID firstTask = tasks.keySet().iterator().next();
 
-    startVertex(v2);
+    startVertex(v3);
 
-    byte[] payload = new byte[0];
-    List<byte[]> taskPayloads = Collections.singletonList(payload);
-    v2.setParallelism(1, taskPayloads);
-    Assert.assertEquals(1, v2.getTotalTasks());
+    Vertex v1 = vertices.get("vertex1");
+    EdgeManager mockEdgeManager = mock(EdgeManager.class);
+    Map<Vertex, EdgeManager> edgeManager = Collections.singletonMap(
+       v1, mockEdgeManager);
+    v3.setParallelism(1, edgeManager);
+    Assert.assertEquals(1, v3.getTotalTasks());
     Assert.assertEquals(1, tasks.size());
     // the last one is removed
     Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
 
-    VertexImpl v1 = vertices.get("vertex1");
-    TezTaskID t1_v1 = new TezTaskID(v1.getVertexId(), 0);
-    TezTaskAttemptID ta1_t1_v1 = new TezTaskAttemptID(t1_v1, 0);
-
-    TezDependentTaskCompletionEvent cEvt1 =
-        new TezDependentTaskCompletionEvent(1, ta1_t1_v1,
-            Status.SUCCEEDED, "", 3, 0);
-    v2.handle(
-        new VertexEventSourceTaskAttemptCompleted(v2.getVertexId(), cEvt1));
-
-    TezTaskID t1_v2 = new TezTaskID(v2.getVertexId(), 0);
-    TezTaskAttemptID ta1_t1_v2 = new TezTaskAttemptID(t1_v2, 0);
-    TezDependentTaskCompletionEvent[] events =
-        v2.getTaskAttemptCompletionEvents(ta1_t1_v2, 0, 100);
-    Assert.assertEquals(1, events.length);
-    TezDependentTaskCompletionEvent clone = events[0];
-    // payload must be present in the first event
-    Assert.assertEquals(payload, clone.getUserPayload());
-    // event must be a copy
-    Assert.assertFalse(cEvt1 == clone);
+    Assert.assertTrue(v3.sourceVertices.get(v1).getEdgeManager() == mockEdgeManager);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 5661c27..1a07b5b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -19,15 +19,13 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
@@ -35,6 +33,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.records.TezDAGID;
@@ -61,9 +60,10 @@ public class TestVertexScheduler {
         true);
     conf.setLong(TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
1000L);
     ShuffleVertexManager scheduler = null;
+    EventHandler mockEventHandler = mock(EventHandler.class);
     TezDAGID dagId = new TezDAGID("1", 1, 1);
-    HashMap<Vertex, EdgeProperty> mockInputVertices = 
-        new HashMap<Vertex, EdgeProperty>();
+    HashMap<Vertex, Edge> mockInputVertices = 
+        new HashMap<Vertex, Edge>();
     Vertex mockSrcVertex1 = mock(Vertex.class);
     TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
@@ -97,12 +97,13 @@ public class TestVertexScheduler {
     when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
     when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
     
+    
     TezDependentTaskCompletionEvent mockEvent = 
         mock(TezDependentTaskCompletionEvent.class);
     
-    mockInputVertices.put(mockSrcVertex1, eProp1);
-    mockInputVertices.put(mockSrcVertex2, eProp2);
-    mockInputVertices.put(mockSrcVertex3, eProp3);
+    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
 
     // check initialization
     scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
@@ -132,16 +133,16 @@ public class TestVertexScheduler {
           return null;
       }}).when(mockManagedVertex).scheduleTasks(anyCollection());
     
-    final List<byte[]> taskPayloads = new ArrayList<byte[]>();
+    final Map<Vertex, EdgeManager> newEdgeManagers = new HashMap<Vertex, EdgeManager>();
     
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
           managedTasks.remove(mockTaskId3);
           managedTasks.remove(mockTaskId4);
-          taskPayloads.clear();
-          taskPayloads.addAll((List<byte[]>)invocation.getArguments()[1]);
+          newEdgeManagers.clear();
+          newEdgeManagers.putAll((Map<Vertex, EdgeManager>)invocation.getArguments()[1]);
           return null;
-      }}).when(mockManagedVertex).setParallelism(eq(2), anyList());
+      }}).when(mockManagedVertex).setParallelism(eq(2), anyMap());
     
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockSrcVertex1.getTotalTasks()).thenReturn(0);
@@ -171,7 +172,7 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduler.numSourceTasks == 4);
     scheduler.onSourceTaskCompleted(mockSrcAttemptId11, mockEvent);
     // managedVertex tasks reduced
-    verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyList());
+    verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
     Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
     Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
@@ -208,23 +209,19 @@ public class TestVertexScheduler {
     
     scheduler.onSourceTaskCompleted(mockSrcAttemptId12, mockEvent);
     // managedVertex tasks reduced
-    verify(mockManagedVertex).setParallelism(eq(2), anyList());
-    Assert.assertEquals(2, taskPayloads.size());
+    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
+    Assert.assertEquals(2, newEdgeManagers.size());
+    // TODO improve tests for parallelism
     Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(2, scheduledTasks.size());
     Assert.assertTrue(scheduledTasks.contains(mockTaskId1));
     Assert.assertTrue(scheduledTasks.contains(mockTaskId2));
     Assert.assertEquals(2, scheduler.numSourceTasksCompleted);
     Assert.assertEquals(1000L, scheduler.completedSourceTasksOutputSize);
-    Configuration taskConf = TezUtils.createConfFromUserPayload(taskPayloads.get(0));
-    Assert.assertEquals(2,
-        taskConf.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE, 0));
-    taskConf = TezUtils.createConfFromUserPayload(taskPayloads.get(1));
-    Assert.assertEquals(2,
-        taskConf.getInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE, 0));
+    
     // more completions dont cause recalculation of parallelism
     scheduler.onSourceTaskCompleted(mockSrcAttemptId21, mockEvent);
-    verify(mockManagedVertex).setParallelism(eq(2), anyList());
+    verify(mockManagedVertex).setParallelism(eq(2), anyMap());
   }
   
   @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -232,9 +229,10 @@ public class TestVertexScheduler {
   public void testShuffleVertexManagerSlowStart() {
     Configuration conf = new Configuration();
     ShuffleVertexManager scheduler = null;
+    EventHandler mockEventHandler = mock(EventHandler.class);
     TezDAGID dagId = new TezDAGID("1", 1, 1);
-    HashMap<Vertex, EdgeProperty> mockInputVertices = 
-        new HashMap<Vertex, EdgeProperty>();
+    HashMap<Vertex, Edge> mockInputVertices = 
+        new HashMap<Vertex, Edge>();
     Vertex mockSrcVertex1 = mock(Vertex.class);
     TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1);
     EdgeProperty eProp1 = new EdgeProperty(
@@ -272,7 +270,7 @@ public class TestVertexScheduler {
         mock(TezDependentTaskCompletionEvent.class);
 
     // fail if there is no bipartite src vertex
-    mockInputVertices.put(mockSrcVertex3, eProp3);
+    mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
     try {
       scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
      Assert.assertFalse(true);
@@ -281,8 +279,8 @@ public class TestVertexScheduler {
           "Atleast 1 bipartite source should exist"));
     }
     
-    mockInputVertices.put(mockSrcVertex1, eProp1);
-    mockInputVertices.put(mockSrcVertex2, eProp2);
+    mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
+    mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
     
     // check initialization
     scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
index 182e8dc..92006b8 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
@@ -93,7 +93,7 @@ public final class DataMovementEvent extends Event {
   }
 
   @Private
-  void setTargetIndex(int targetIndex) {
+  public void setTargetIndex(int targetIndex) {
     this.targetIndex = targetIndex;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
index 042590e..0ca4a3e 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
@@ -74,7 +74,7 @@ public class InputFailedEvent extends Event{
   }
 
   @Private
-  void setTargetIndex(int targetIndex) {
+  public void setTargetIndex(int targetIndex) {
     this.targetIndex = targetIndex;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
index 9ad71e6..7d81449 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -107,7 +107,13 @@ public class EventMetaData implements Writable {
     } else {
       out.writeBoolean(false);
     }
-    taskAttemptID.write(out);
+    if(taskAttemptID != null) {
+      out.writeBoolean(true);
+      taskAttemptID.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    
     out.writeInt(index);
   }
 
@@ -120,8 +126,10 @@ public class EventMetaData implements Writable {
     if (in.readBoolean()) {
       edgeVertexName = in.readUTF();
     }
-    taskAttemptID = new TezTaskAttemptID();
-    taskAttemptID.readFields(in);
+    if (in.readBoolean()) {
+      taskAttemptID = new TezTaskAttemptID();
+      taskAttemptID.readFields(in);
+    }
     index = in.readInt();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d89485f/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
index 572c7b6..1dad82b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -47,24 +47,39 @@ public class TezHeartbeatResponse implements Writable {
     return lastRequestId;
   }
 
+  public void setEvents(List<TezEvent> events) {
+    this.events = events;
+  }
+
+  public void setLastRequestId(long lastRequestId ) {
+    this.lastRequestId = lastRequestId;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeLong(lastRequestId);
-    out.writeInt(events.size());
-    for (TezEvent e : events) {
-      e.write(out);
+    if(events != null) {
+      out.writeBoolean(true);
+      out.writeInt(events.size());
+      for (TezEvent e : events) {
+        e.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
     }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     lastRequestId = in.readLong();
-    int eventCount = in.readInt();
-    events = new ArrayList<TezEvent>(eventCount);
-    for (int i = 0; i < eventCount; ++i) {
-      TezEvent e = new TezEvent();
-      e.readFields(in);
-      events.add(e);
+    if(in.readBoolean()) {
+      int eventCount = in.readInt();
+      events = new ArrayList<TezEvent>(eventCount);
+      for (int i = 0; i < eventCount; ++i) {
+        TezEvent e = new TezEvent();
+        e.readFields(in);
+        events.add(e);
+      }
     }
   }
 


Mime
View raw message