tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe)
Date Mon, 30 Apr 2018 19:10:32 GMT
Repository: tez
Updated Branches:
  refs/heads/master 21cd02417 -> fa6bc2acf


TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fa6bc2ac
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fa6bc2ac
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fa6bc2ac

Branch: refs/heads/master
Commit: fa6bc2acf1b548e481b236ab50903e0ed0269da4
Parents: 21cd024
Author: Jason Lowe <jlowe@apache.org>
Authored: Mon Apr 30 14:09:23 2018 -0500
Committer: Jason Lowe <jlowe@apache.org>
Committed: Mon Apr 30 14:09:23 2018 -0500

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/RecoveryParser.java  | 19 +++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 ++-
 .../apache/tez/dag/history/HistoryEvent.java    |  9 ++-
 .../tez/dag/history/events/AMLaunchedEvent.java | 12 +--
 .../tez/dag/history/events/AMStartedEvent.java  | 12 +--
 .../dag/history/events/AppLaunchedEvent.java    |  8 +-
 .../history/events/ContainerLaunchedEvent.java  | 13 ++--
 .../history/events/ContainerStoppedEvent.java   | 13 ++--
 .../history/events/DAGCommitStartedEvent.java   | 11 +--
 .../dag/history/events/DAGFinishedEvent.java    | 11 +--
 .../dag/history/events/DAGInitializedEvent.java | 14 ++--
 .../dag/history/events/DAGKillRequestEvent.java | 17 ++--
 .../dag/history/events/DAGRecoveredEvent.java   |  8 +-
 .../tez/dag/history/events/DAGStartedEvent.java | 12 +--
 .../dag/history/events/DAGSubmittedEvent.java   | 11 +--
 .../events/TaskAttemptFinishedEvent.java        | 13 ++--
 .../history/events/TaskAttemptStartedEvent.java | 12 +--
 .../dag/history/events/TaskFinishedEvent.java   | 12 +--
 .../dag/history/events/TaskStartedEvent.java    | 12 +--
 .../events/VertexCommitStartedEvent.java        | 11 +--
 .../events/VertexConfigurationDoneEvent.java    | 12 +--
 .../dag/history/events/VertexFinishedEvent.java | 11 +--
 .../events/VertexGroupCommitFinishedEvent.java  | 11 +--
 .../events/VertexGroupCommitStartedEvent.java   | 11 +--
 .../history/events/VertexInitializedEvent.java  | 13 ++--
 .../dag/history/events/VertexStartedEvent.java  | 12 +--
 .../dag/history/recovery/RecoveryService.java   | 58 ++++++++++----
 .../apache/tez/dag/app/TestRecoveryParser.java  | 82 +++++++++++++++++++-
 .../TestHistoryEventsProtoConversion.java       |  8 +-
 .../dag/history/ats/acls/TestATSHistoryV15.java | 13 ++--
 .../org/apache/tez/test/MiniTezCluster.java     |  6 ++
 .../RecoveryServiceWithEventHandlingHook.java   |  9 ++-
 .../org/apache/tez/test/TestAMRecovery.java     |  4 +
 .../org/apache/tez/test/TestDAGRecovery.java    |  8 +-
 .../java/org/apache/tez/test/TestRecovery.java  |  4 +
 35 files changed, 324 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 368dd17..99ac283 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.google.protobuf.CodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -251,11 +252,15 @@ public class RecoveryParser {
     }
   }
 
-  private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
+  private static HistoryEvent getNextEvent(CodedInputStream inputStream)
       throws IOException {
+    boolean isAtEnd = inputStream.isAtEnd();
+    if (isAtEnd) {
+      return null;
+    }
     int eventTypeOrdinal = -1;
     try {
-      eventTypeOrdinal = inputStream.readInt();
+      eventTypeOrdinal = inputStream.readFixed32();
     } catch (EOFException eof) {
       return null;
     }
@@ -353,13 +358,15 @@ public class RecoveryParser {
   public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream)
       throws IOException {
     List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
+    CodedInputStream codedInputStream = CodedInputStream.newInstance(inputStream);
+    codedInputStream.setSizeLimit(Integer.MAX_VALUE);
     while (true) {
-      HistoryEvent historyEvent = getNextEvent(inputStream);
+      HistoryEvent historyEvent = getNextEvent(codedInputStream);
       if (historyEvent == null) {
         LOG.info("Reached end of stream");
         break;
       }
-      LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent);
+      LOG.debug("Read HistoryEvent, eventType={}, event={}", historyEvent.getEventType(), historyEvent);
       historyEvents.add(historyEvent);
     }
     return historyEvents;
@@ -745,10 +752,12 @@ public class RecoveryParser {
           + ", dagRecoveryFile=" + dagRecoveryFile
           + ", len=" + fileStatus.getLen());
       FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
+      CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream);
+      codedInputStream.setSizeLimit(Integer.MAX_VALUE);
       while (true) {
         HistoryEvent event;
         try {
-          event = getNextEvent(dagRecoveryStream);
+          event = getNextEvent(codedInputStream);
           if (event == null) {
             LOG.info("Reached end of dag recovery stream");
             break;

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ad26173..f3fc269 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -42,6 +42,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.Nullable;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,6 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.io.NonSyncByteArrayInputStream;
 import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -2695,9 +2696,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
       NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
       try {
-        reconfigureDoneEvent.toProtoStream(out);
+        CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
+        reconfigureDoneEvent.toProtoStream(codedOutputStream);
+        codedOutputStream.flush();
       } catch (IOException e) {
-        throw new TezUncheckedException("Unable to deserilize VertexReconfigureDoneEvent");
+        throw new TezUncheckedException("Unable to deserialize VertexReconfigureDoneEvent");
       }
       this.vertexManager = new VertexManager(
           VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName())
@@ -4589,7 +4592,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         LOG.debug("initialize NoOpVertexManager");
       }
       configurationDoneEvent = new VertexConfigurationDoneEvent();
-      configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
+      configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray()));
       String vertexName = getContext().getVertexName();
       if (getContext().getVertexNumTasks(vertexName) == -1) {
         Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called "

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 1ca0d5f..5b077e9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,9 +18,10 @@
 
 package org.apache.tez.dag.history;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
 public interface HistoryEvent {
 
@@ -30,8 +31,8 @@ public interface HistoryEvent {
 
   public boolean isHistoryEvent();
 
-  public void toProtoStream(OutputStream outputStream) throws IOException;
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException;
 
-  public void fromProtoStream(InputStream inputStream) throws IOException;
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index fa332d6..001cbf0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -84,13 +84,13 @@ public class AMLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    AMLaunchedProto proto = inputStream.readMessage(AMLaunchedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index 8a59d84..87daba6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -79,13 +79,13 @@ public class AMStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    AMStartedProto proto = inputStream.readMessage(AMStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
index 08d2aff..0b812f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.VersionInfo;
@@ -67,12 +67,12 @@ public class AppLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
     throw new UnsupportedOperationException("Not a recovery event");
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
     throw new UnsupportedOperationException("Not a recovery event");
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 45d0261..11528e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,14 +77,13 @@ public class ContainerLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    ContainerLaunchedProto proto =
-        ContainerLaunchedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    ContainerLaunchedProto proto = inputStream.readMessage(ContainerLaunchedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
index 86971ce..528f629 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -82,14 +82,13 @@ public class ContainerStoppedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    ContainerStoppedProto proto =
-        ContainerStoppedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    ContainerStoppedProto proto = inputStream.readMessage(ContainerStoppedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 016bb60..241dada 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -19,9 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -69,13 +70,13 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGCommitStartedProto proto = inputStream.readMessage(DAGCommitStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index c395297..0a7ef56 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -121,13 +122,13 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGFinishedProto proto = inputStream.readMessage(DAGFinishedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 98d64d3..9e6c8b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -19,15 +19,16 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGInitializedProto;
 
 public class DAGInitializedEvent implements HistoryEvent {
 
@@ -83,14 +84,13 @@ public class DAGInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    RecoveryProtos.DAGInitializedProto proto =
-        RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGInitializedProto proto = inputStream.readMessage(DAGInitializedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
index 525e361..c87f5ce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
@@ -18,14 +18,16 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGKillRequestProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
 
@@ -60,12 +62,12 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
-  public RecoveryProtos.DAGKillRequestProto toProto() {
-    return RecoveryProtos.DAGKillRequestProto.newBuilder()
+  public DAGKillRequestProto toProto() {
+    return DAGKillRequestProto.newBuilder()
         .setDagId(dagID.toString())
         .setKillRequestTime(killRequestTime)
         .setIsSessionStopped(isSessionStopped)
@@ -73,9 +75,8 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    RecoveryProtos.DAGKillRequestProto proto =
-        RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGKillRequestProto proto = inputStream.readMessage(DAGKillRequestProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
index 2bfa43b..e5f5614 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -76,13 +76,13 @@ public class DAGRecoveredEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
     throw new UnsupportedOperationException("Invalid operation for eventType "
         + getEventType().name());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
     throw new UnsupportedOperationException("Invalid operation for eventType "
         + getEventType().name());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index d0e0e69..f1fdcac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -74,13 +74,13 @@ public class DAGStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGStartedProto proto = inputStream.readMessage(DAGStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 1b1fdf3..e04ee80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -126,13 +127,13 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    DAGSubmittedProto proto = inputStream.readMessage(DAGSubmittedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index e9100e8..96dc099 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -20,10 +20,10 @@ package org.apache.tez.dag.history.events;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.List;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -226,14 +226,13 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    TaskAttemptFinishedProto proto =
-        TaskAttemptFinishedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    TaskAttemptFinishedProto proto = inputStream.readMessage(TaskAttemptFinishedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 71d4419..a49e47c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -91,13 +91,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    TaskAttemptStartedProto proto = inputStream.readMessage(TaskAttemptStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 71ff6c8..6befa1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.common.counters.TezCounters;
@@ -107,13 +107,13 @@ public class TaskFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    TaskFinishedProto proto = inputStream.readMessage(TaskFinishedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index 07dc2f9..cc62969 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -83,13 +83,13 @@ public class TaskStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    TaskStartedProto proto = inputStream.readMessage(TaskStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index c452187..8ff86b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -73,13 +74,13 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexCommitStartedProto proto = inputStream.readMessage(VertexCommitStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
index 137342c..a2e2039 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
@@ -18,12 +18,12 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -155,13 +155,13 @@ public class VertexConfigurationDoneEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexConfigurationDoneProto proto = inputStream.readMessage(VertexConfigurationDoneProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index a2cdae2..58cb628 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,13 +124,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexFinishedProto proto = inputStream.readMessage(VertexFinishedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index ec8f3e1..c9d5aae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -94,13 +95,13 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexGroupCommitFinishedProto proto = inputStream.readMessage(VertexGroupCommitFinishedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 3de355c..cdd11bc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -19,10 +19,11 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -94,13 +95,13 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexGroupCommitStartedProto proto = inputStream.readMessage(VertexGroupCommitStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 90099fc..e7452e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -19,12 +19,12 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -151,14 +151,13 @@ public class VertexInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    RecoveryProtos.VertexInitializedProto proto =
-        RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexInitializedProto proto = inputStream.readMessage(VertexInitializedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index a8bd21e..4a3e05f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -74,13 +74,13 @@ public class VertexStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
+  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+    outputStream.writeMessageNoTag(toProto());
   }
 
   @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream);
+  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+    VertexStartedProto proto = inputStream.readMessage(VertexStartedProto.PARSER, null);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 8c29172..d874e0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -84,8 +85,7 @@ public class RecoveryService extends AbstractService {
   private FileSystem recoveryDirFS; // FS where staging dir exists
   Path recoveryPath;
   @VisibleForTesting
-  public Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
-      HashMap<TezDAGID, FSDataOutputStream>();
+  public Map<TezDAGID, RecoveryStream> outputStreamMap = new HashMap<>();
   private int bufferSize;
   @VisibleForTesting
   public FSDataOutputStream summaryStream;
@@ -101,6 +101,31 @@ public class RecoveryService extends AbstractService {
   private volatile boolean drained = true;
   private Object waitForDrained = new Object();
 
+  @VisibleForTesting
+  public static class RecoveryStream {
+    private final FSDataOutputStream outputStream;
+    private final CodedOutputStream codedOutputStream;
+
+    RecoveryStream(FSDataOutputStream outputStream) {
+      this.outputStream = outputStream;
+      this.codedOutputStream = CodedOutputStream.newInstance(outputStream);
+    }
+
+    public void write(byte[] bytes) throws IOException {
+      codedOutputStream.writeRawBytes(bytes);
+    }
+
+    public void flush() throws IOException {
+      codedOutputStream.flush();
+      outputStream.hflush();
+    }
+
+    public void close() throws IOException {
+      flush();
+      outputStream.close();
+    }
+  }
+
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
     this.appContext = appContext;
@@ -231,10 +256,9 @@ public class RecoveryService extends AbstractService {
           }
         }
       }
-      for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
+      for (Entry<TezDAGID, RecoveryStream> entry : outputStreamMap.entrySet()) {
         try {
           LOG.info("Closing Output Stream for DAG " + entry.getKey());
-          entry.getValue().hflush();
           entry.getValue().close();
         } catch (IOException ioe) {
           if (!recoveryDirFS.exists(recoveryPath)) {
@@ -303,7 +327,7 @@ public class RecoveryService extends AbstractService {
     if (event.getHistoryEvent() instanceof SummaryEvent) {
       synchronized (lock) {
         if (stopped.get()) {
-          LOG.warn("Igoring event as service stopped, eventType"
+          LOG.warn("Ignoring event as service stopped, eventType"
               + event.getHistoryEvent().getEventType());
           return;
         }
@@ -429,9 +453,9 @@ public class RecoveryService extends AbstractService {
       return;
     }
 
-    if (!outputStreamMap.containsKey(dagID)) {
+    RecoveryStream recoveryStream = outputStreamMap.get(dagID);
+    if (recoveryStream == null) {
       Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString());
-      FSDataOutputStream outputStream;
       if (recoveryDirFS.exists(dagFilePath)) {
         createFatalErrorFlagDir();
         return;
@@ -440,12 +464,12 @@ public class RecoveryService extends AbstractService {
           LOG.debug("Opening DAG recovery file in create mode"
               + ", filePath=" + dagFilePath);
         }
-        outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
+        FSDataOutputStream outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
+        recoveryStream = new RecoveryStream(outputStream);
       }
-      outputStreamMap.put(dagID, outputStream);
+      outputStreamMap.put(dagID, recoveryStream);
     }
 
-    FSDataOutputStream outputStream = outputStreamMap.get(dagID);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing recovery event to output stream"
@@ -453,15 +477,15 @@ public class RecoveryService extends AbstractService {
           + ", eventType=" + eventType);
     }
     ++unflushedEventsCount;
-    outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
-    event.getHistoryEvent().toProtoStream(outputStream);
+    recoveryStream.codedOutputStream.writeFixed32NoTag(event.getHistoryEvent().getEventType().ordinal());
+    event.getHistoryEvent().toProtoStream(recoveryStream.codedOutputStream);
     if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
         HistoryEventType.DAG_FINISHED).contains(eventType)) {
-      maybeFlush(outputStream);
+      maybeFlush(recoveryStream);
     }
   }
 
-  private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
+  private void maybeFlush(RecoveryStream recoveryStream) throws IOException {
     long currentTime = appContext.getClock().getTime();
     boolean doFlush = false;
     if (maxUnflushedEvents >=0
@@ -482,12 +506,12 @@ public class RecoveryService extends AbstractService {
     if (!doFlush) {
       return;
     }
-    doFlush(outputStream, currentTime);
+    doFlush(recoveryStream, currentTime);
   }
 
-  private void doFlush(FSDataOutputStream outputStream,
+  private void doFlush(RecoveryStream recoveryStream,
       long currentTime) throws IOException {
-    outputStream.hflush();
+    recoveryStream.flush();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flushing output stream"

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 6673b39..1c09d5d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -20,12 +20,15 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import com.google.common.collect.Sets;
+import com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -59,6 +64,7 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -91,6 +97,8 @@ public class TestRecoveryParser {
   private Path recoveryPath;
   private DAGAppMaster mockAppMaster;
   private DAGImpl mockDAGImpl;
+  // Protobuf message limit is 64 MB by default
+  private static final int PROTOBUF_DEFAULT_SIZE_LIMIT = 64 << 20;
 
   @Before
   public void setUp() throws IllegalArgumentException, IOException {
@@ -105,7 +113,6 @@ public class TestRecoveryParser {
     mockDAGImpl = mock(DAGImpl.class);
     when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl);
     parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3);
-    LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
   private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) {
@@ -267,7 +274,7 @@ public class TestRecoveryParser {
             null, "user", new Configuration(), null, null)));
     // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
     rService.await();
-    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
+    rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8"));
     rService.stop();
 
     // write data in attempt_2
@@ -278,7 +285,7 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
     rService.await();
-    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
+    rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8"));
     rService.stop();
 
     // corrupted last records will be skipped but the whole recovery logs will be read
@@ -618,6 +625,75 @@ public class TestRecoveryParser {
               + ", but its full recovery events are not seen"));
   }
 
+  @Test(timeout=20000)
+  public void testRecoveryLargeEventData() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
+    when(appContext.getClock()).thenReturn(new SystemClock());
+    when(mockDAGImpl.getID()).thenReturn(dagID);
+    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getApplicationID()).thenReturn(appId);
+
+    RecoveryService rService = new RecoveryService(appContext);
+    Configuration conf = new Configuration();
+    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
+    rService.init(conf);
+    rService.start();
+
+    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
+    // DAG  DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
+    rService.handle(new DAGHistoryEvent(dagID,
+        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
+            null, "user", new Configuration(), null, null)));
+    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L,
+        "user", "dagName", null);
+    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
+    rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent));
+    rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent));
+
+    // Create a Recovery event larger than 64 MB to verify default max protobuf size
+    ArrayList<TaskLocationHint> taskLocationHints = new ArrayList<>(100000);
+    TaskLocationHint taskLocationHint = TaskLocationHint.createTaskLocationHint(
+        Sets.newHashSet("aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa",
+            "bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb",
+            "ccccccccccccccc.ccccccccccccccc.ccccccccccccccc",
+            "ddddddddddddddd.ddddddddddddddd.ddddddddddddddd",
+            "eeeeeeeeeeeeeee.eeeeeeeeeeeeeee.eeeeeeeeeeeeeee",
+            "fffffffffffffff.fffffffffffffff.fffffffffffffff",
+            "ggggggggggggggg.ggggggggggggggg.ggggggggggggggg",
+            "hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh",
+            "iiiiiiiiiiiiiii.iiiiiiiiiiiiiii.iiiiiiiiiiiiiii",
+            "jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj",
+            "kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk",
+            "lllllllllllllll.lllllllllllllll.lllllllllllllll",
+            "mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm",
+            "nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn"),
+        Sets.newHashSet("rack1", "rack2", "rack3"));
+    for (int i = 0; i < 100000; i++) {
+      taskLocationHints.add(taskLocationHint);
+    }
+
+    TezVertexID v0Id = TezVertexID.getInstance(dagID, 0);
+    VertexLocationHint vertexLocationHint = VertexLocationHint.create(taskLocationHints);
+    VertexConfigurationDoneEvent vertexConfigurationDoneEvent = new VertexConfigurationDoneEvent(
+        v0Id, 0, 100000, vertexLocationHint, null, null, false);
+    // Verify large protobuf message
+    assertTrue(vertexConfigurationDoneEvent.toProto().getSerializedSize() > PROTOBUF_DEFAULT_SIZE_LIMIT );
+    rService.handle(new DAGHistoryEvent(dagID, vertexConfigurationDoneEvent));
+    rService.stop();
+
+    DAGRecoveryData dagData = parser.parseRecoveryData();
+    VertexRecoveryData v0data = dagData.getVertexRecoveryData(v0Id);
+    assertNotNull("Vertex Recovery Data should be non-null", v0data);
+    VertexConfigurationDoneEvent parsedVertexConfigurationDoneEvent = v0data.getVertexConfigurationDoneEvent();
+    assertNotNull("Vertex Configuration Done Event should be non-null", parsedVertexConfigurationDoneEvent);
+    VertexLocationHint parsedVertexLocationHint = parsedVertexConfigurationDoneEvent.getVertexLocationHint();
+    assertNotNull("Vertex Location Hint should be non-null", parsedVertexLocationHint);
+    assertEquals(parsedVertexLocationHint.getTaskLocationHints().size(), 100000);
+  }
+
   @Test(timeout=5000)
   public void testRecoveryData() throws IOException {
     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 47d8389..50a80cb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,7 +91,9 @@ public class TestHistoryEventsProtoConversion {
   private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     HistoryEvent deserializedEvent = null;
-    event.toProtoStream(os);
+    CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(os);
+    event.toProtoStream(codedOutputStream);
+    codedOutputStream.flush();
     os.flush();
     os.close();
     deserializedEvent = ReflectionUtils.createClazzInstance(
@@ -98,7 +102,7 @@ public class TestHistoryEventsProtoConversion {
         + ", eventType=" + event.getEventType()
         + ", bufLen=" + os.toByteArray().length);
     deserializedEvent.fromProtoStream(
-        new ByteArrayInputStream(os.toByteArray()));
+        CodedInputStream.newInstance(os.toByteArray()));
     return deserializedEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
index a690a19..54abd44 100644
--- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
+++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryV15.java
@@ -20,9 +20,10 @@ package org.apache.tez.dag.history.ats.acls;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Random;
 
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -32,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.logging.ats.ATSV15HistoryLoggingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +59,8 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -288,12 +289,12 @@ public class TestATSHistoryV15 {
         }
 
         @Override
-        public void toProtoStream(OutputStream outputStream) throws IOException {
+        public void toProtoStream(CodedOutputStream outputStream) throws IOException {
 
         }
 
         @Override
-        public void fromProtoStream(InputStream inputStream) throws IOException {
+        public void fromProtoStream(CodedInputStream inputStream) throws IOException {
 
         }
       };

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index c727a8f..bac0e8c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -135,6 +136,11 @@ public class MiniTezCluster extends MiniYARNCluster {
     conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
 
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
 
     try {
       Path stagingPath = FileContext.getFileContext(conf).makeQualified(

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index c08780f..50c5a66 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ReflectionUtils;
@@ -222,15 +224,16 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
 
     private String encodeHistoryEvent(HistoryEvent event) throws IOException {
       ByteArrayOutputStream out = new ByteArrayOutputStream();
-      event.toProtoStream(out);
+      CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
+      event.toProtoStream(codedOutputStream);
+      codedOutputStream.flush();
       return event.getClass().getName() + ","
           + Base64.encodeBase64String(out.toByteArray());
     }
 
     private HistoryEvent decodeHistoryEvent(String eventClass, String base64)
         throws IOException {
-      ByteArrayInputStream in = new ByteArrayInputStream(
-          Base64.decodeBase64(base64));
+      CodedInputStream in = CodedInputStream.newInstance(Base64.decodeBase64(base64));
       try {
         HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass);
         event.fromProtoStream(in);

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index f00ae5c..6d3ab1c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -26,6 +26,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -175,6 +176,9 @@ public class TestAMRecovery {
     tezConf.setBoolean(
         RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         true);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index b0c9ccc..cf4744b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -18,6 +18,9 @@
 
 package org.apache.tez.test;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -133,6 +136,9 @@ public class TestDAGRecovery {
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
 
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
@@ -154,7 +160,7 @@ public class TestDAGRecovery {
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null, 10);
     while (!dagStatus.isCompleted()) {
       LOG.info("Waiting for dag to complete. Sleeping for 500ms."
           + " DAG name: " + dag.getName()

http://git-wip-us.apache.org/repos/asf/tez/blob/fa6bc2ac/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index 93fd972..c7b1fb9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -32,6 +32,7 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -478,6 +479,9 @@ public class TestRecovery {
         RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
     tezConf.setBoolean(
         TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
+    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
 
     hashJoinExample.setConf(tezConf);


Mime
View raw message