tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: Revert "TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe)"
Date Fri, 27 Apr 2018 19:15:13 GMT
Repository: tez
Updated Branches:
  refs/heads/master 086d7bad2 -> 21cd02417


Revert "TEZ-3914. Recovering a large DAG fails to size limit exceeded (Jonathan Eagles via jlowe)"

This reverts commit ebc9f4f6dee1badeca39cac26c00818be3e4d77d.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/21cd0241
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/21cd0241
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/21cd0241

Branch: refs/heads/master
Commit: 21cd0241776c46821c83a67aac2c979e405651b6
Parents: 086d7ba
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri Apr 27 14:14:40 2018 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri Apr 27 14:14:40 2018 -0500

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/RecoveryParser.java  | 19 ++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 11 +--
 .../apache/tez/dag/history/HistoryEvent.java    |  9 +--
 .../tez/dag/history/events/AMLaunchedEvent.java | 12 +--
 .../tez/dag/history/events/AMStartedEvent.java  | 12 +--
 .../dag/history/events/AppLaunchedEvent.java    |  8 +-
 .../history/events/ContainerLaunchedEvent.java  | 13 ++--
 .../history/events/ContainerStoppedEvent.java   | 13 ++--
 .../history/events/DAGCommitStartedEvent.java   | 11 ++-
 .../dag/history/events/DAGFinishedEvent.java    | 11 ++-
 .../dag/history/events/DAGInitializedEvent.java | 14 ++--
 .../dag/history/events/DAGKillRequestEvent.java | 17 ++--
 .../dag/history/events/DAGRecoveredEvent.java   |  8 +-
 .../tez/dag/history/events/DAGStartedEvent.java | 12 +--
 .../dag/history/events/DAGSubmittedEvent.java   | 11 ++-
 .../events/TaskAttemptFinishedEvent.java        | 13 ++--
 .../history/events/TaskAttemptStartedEvent.java | 12 +--
 .../dag/history/events/TaskFinishedEvent.java   | 12 +--
 .../dag/history/events/TaskStartedEvent.java    | 12 +--
 .../events/VertexCommitStartedEvent.java        | 11 ++-
 .../events/VertexConfigurationDoneEvent.java    | 12 +--
 .../dag/history/events/VertexFinishedEvent.java | 11 ++-
 .../events/VertexGroupCommitFinishedEvent.java  | 11 ++-
 .../events/VertexGroupCommitStartedEvent.java   | 11 ++-
 .../history/events/VertexInitializedEvent.java  | 13 ++--
 .../dag/history/events/VertexStartedEvent.java  | 12 +--
 .../dag/history/recovery/RecoveryService.java   | 58 ++++----------
 .../apache/tez/dag/app/TestRecoveryParser.java  | 82 +-------------------
 .../TestHistoryEventsProtoConversion.java       |  8 +-
 .../org/apache/tez/test/MiniTezCluster.java     |  6 --
 .../RecoveryServiceWithEventHandlingHook.java   |  9 +--
 .../org/apache/tez/test/TestAMRecovery.java     |  4 -
 .../org/apache/tez/test/TestDAGRecovery.java    |  8 +-
 .../java/org/apache/tez/test/TestRecovery.java  |  4 -
 34 files changed, 173 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 99ac283..368dd17 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.protobuf.CodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -252,15 +251,11 @@ public class RecoveryParser {
     }
   }
 
-  private static HistoryEvent getNextEvent(CodedInputStream inputStream)
+  private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
       throws IOException {
-    boolean isAtEnd = inputStream.isAtEnd();
-    if (isAtEnd) {
-      return null;
-    }
     int eventTypeOrdinal = -1;
     try {
-      eventTypeOrdinal = inputStream.readFixed32();
+      eventTypeOrdinal = inputStream.readInt();
     } catch (EOFException eof) {
       return null;
     }
@@ -358,15 +353,13 @@ public class RecoveryParser {
   public static List<HistoryEvent> parseDAGRecoveryFile(FSDataInputStream inputStream)
       throws IOException {
     List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
-    CodedInputStream codedInputStream = CodedInputStream.newInstance(inputStream);
-    codedInputStream.setSizeLimit(Integer.MAX_VALUE);
     while (true) {
-      HistoryEvent historyEvent = getNextEvent(codedInputStream);
+      HistoryEvent historyEvent = getNextEvent(inputStream);
       if (historyEvent == null) {
         LOG.info("Reached end of stream");
         break;
       }
-      LOG.debug("Read HistoryEvent, eventType={}, event={}", historyEvent.getEventType(), historyEvent);
+      LOG.debug("Read HistoryEvent, eventType=" + historyEvent.getEventType() + ", event=" + historyEvent);
       historyEvents.add(historyEvent);
     }
     return historyEvents;
@@ -752,12 +745,10 @@ public class RecoveryParser {
           + ", dagRecoveryFile=" + dagRecoveryFile
           + ", len=" + fileStatus.getLen());
       FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
-      CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream);
-      codedInputStream.setSizeLimit(Integer.MAX_VALUE);
       while (true) {
         HistoryEvent event;
         try {
-          event = getNextEvent(codedInputStream);
+          event = getNextEvent(dagRecoveryStream);
           if (event == null) {
             LOG.info("Reached end of dag recovery stream");
             break;

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f3fc269..ad26173 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -42,8 +42,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.Nullable;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -65,6 +63,7 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
 import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -2696,11 +2695,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
       NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
       try {
-        CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
-        reconfigureDoneEvent.toProtoStream(codedOutputStream);
-        codedOutputStream.flush();
+        reconfigureDoneEvent.toProtoStream(out);
       } catch (IOException e) {
-        throw new TezUncheckedException("Unable to deserialize VertexReconfigureDoneEvent");
+        throw new TezUncheckedException("Unable to deserilize VertexReconfigureDoneEvent");
       }
       this.vertexManager = new VertexManager(
           VertexManagerPluginDescriptor.create(NoOpVertexManager.class.getName())
@@ -4592,7 +4589,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         LOG.debug("initialize NoOpVertexManager");
       }
       configurationDoneEvent = new VertexConfigurationDoneEvent();
-      configurationDoneEvent.fromProtoStream(CodedInputStream.newInstance(getContext().getUserPayload().deepCopyAsArray()));
+      configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
       String vertexName = getContext().getVertexName();
       if (getContext().getVertexNumTasks(vertexName) == -1) {
         Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called "

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 5b077e9..1ca0d5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,10 +18,9 @@
 
 package org.apache.tez.dag.history;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
-
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 public interface HistoryEvent {
 
@@ -31,8 +30,8 @@ public interface HistoryEvent {
 
   public boolean isHistoryEvent();
 
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException;
+  public void toProtoStream(OutputStream outputStream) throws IOException;
 
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException;
+  public void fromProtoStream(InputStream inputStream) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 001cbf0..fa332d6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -84,13 +84,13 @@ public class AMLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    AMLaunchedProto proto = inputStream.readMessage(AMLaunchedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index 87daba6..8a59d84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -79,13 +79,13 @@ public class AMStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    AMStartedProto proto = inputStream.readMessage(AMStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
index 0b812f0..08d2aff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AppLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.VersionInfo;
@@ -67,12 +67,12 @@ public class AppLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+  public void toProtoStream(OutputStream outputStream) throws IOException {
     throw new UnsupportedOperationException("Not a recovery event");
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+  public void fromProtoStream(InputStream inputStream) throws IOException {
     throw new UnsupportedOperationException("Not a recovery event");
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 11528e2..45d0261 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,13 +77,14 @@ public class ContainerLaunchedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    ContainerLaunchedProto proto = inputStream.readMessage(ContainerLaunchedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    ContainerLaunchedProto proto =
+        ContainerLaunchedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
index 528f629..86971ce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -82,13 +82,14 @@ public class ContainerStoppedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    ContainerStoppedProto proto = inputStream.readMessage(ContainerStoppedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    ContainerStoppedProto proto =
+        ContainerStoppedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 241dada..016bb60 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -19,10 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -70,13 +69,13 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGCommitStartedProto proto = inputStream.readMessage(DAGCommitStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 0a7ef56..c395297 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -122,13 +121,13 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGFinishedProto proto = inputStream.readMessage(DAGFinishedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9e6c8b2..98d64d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -19,16 +19,15 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGInitializedProto;
 
 public class DAGInitializedEvent implements HistoryEvent {
 
@@ -84,13 +83,14 @@ public class DAGInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGInitializedProto proto = inputStream.readMessage(DAGInitializedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    RecoveryProtos.DAGInitializedProto proto =
+        RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
index c87f5ce..525e361 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
@@ -18,16 +18,14 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGKillRequestProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.utils.ProtoUtils;
 
@@ -62,12 +60,12 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
-  public DAGKillRequestProto toProto() {
-    return DAGKillRequestProto.newBuilder()
+  public RecoveryProtos.DAGKillRequestProto toProto() {
+    return RecoveryProtos.DAGKillRequestProto.newBuilder()
         .setDagId(dagID.toString())
         .setKillRequestTime(killRequestTime)
         .setIsSessionStopped(isSessionStopped)
@@ -75,8 +73,9 @@ public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGKillRequestProto proto = inputStream.readMessage(DAGKillRequestProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    RecoveryProtos.DAGKillRequestProto proto =
+        RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
index e5f5614..2bfa43b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGRecoveredEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -76,13 +76,13 @@ public class DAGRecoveredEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
+  public void toProtoStream(OutputStream outputStream) throws IOException {
     throw new UnsupportedOperationException("Invalid operation for eventType "
         + getEventType().name());
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
+  public void fromProtoStream(InputStream inputStream) throws IOException {
     throw new UnsupportedOperationException("Invalid operation for eventType "
         + getEventType().name());
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index f1fdcac..d0e0e69 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -74,13 +74,13 @@ public class DAGStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGStartedProto proto = inputStream.readMessage(DAGStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index e04ee80..1b1fdf3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -127,13 +126,13 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    DAGSubmittedProto proto = inputStream.readMessage(DAGSubmittedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 96dc099..e9100e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -20,10 +20,10 @@ package org.apache.tez.dag.history.events;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
@@ -226,13 +226,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    TaskAttemptFinishedProto proto = inputStream.readMessage(TaskAttemptFinishedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    TaskAttemptFinishedProto proto =
+        TaskAttemptFinishedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index a49e47c..71d4419 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -91,13 +91,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    TaskAttemptStartedProto proto = inputStream.readMessage(TaskAttemptStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 6befa1a..71ff6c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.common.counters.TezCounters;
@@ -107,13 +107,13 @@ public class TaskFinishedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    TaskFinishedProto proto = inputStream.readMessage(TaskFinishedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index cc62969..07dc2f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -83,13 +83,13 @@ public class TaskStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    TaskStartedProto proto = inputStream.readMessage(TaskStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 8ff86b8..c452187 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -74,13 +73,13 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexCommitStartedProto proto = inputStream.readMessage(VertexCommitStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
index a2e2039..137342c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
@@ -18,12 +18,12 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -155,13 +155,13 @@ public class VertexConfigurationDoneEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexConfigurationDoneProto proto = inputStream.readMessage(VertexConfigurationDoneProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 58cb628..a2cdae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,13 +123,13 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexFinishedProto proto = inputStream.readMessage(VertexFinishedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index c9d5aae..ec8f3e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -95,13 +94,13 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexGroupCommitFinishedProto proto = inputStream.readMessage(VertexGroupCommitFinishedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index cdd11bc..3de355c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -19,11 +19,10 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collection;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -95,13 +94,13 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexGroupCommitStartedProto proto = inputStream.readMessage(VertexGroupCommitStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e7452e6..90099fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -19,12 +19,12 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -151,13 +151,14 @@ public class VertexInitializedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexInitializedProto proto = inputStream.readMessage(VertexInitializedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    RecoveryProtos.VertexInitializedProto proto =
+        RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 4a3e05f..a8bd21e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.history.events;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -74,13 +74,13 @@ public class VertexStartedEvent implements HistoryEvent {
   }
 
   @Override
-  public void toProtoStream(CodedOutputStream outputStream) throws IOException {
-    outputStream.writeMessageNoTag(toProto());
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
   }
 
   @Override
-  public void fromProtoStream(CodedInputStream inputStream) throws IOException {
-    VertexStartedProto proto = inputStream.readMessage(VertexStartedProto.PARSER, null);
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream);
     if (proto == null) {
       throw new IOException("No data found in stream");
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index d874e0a..8c29172 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -85,7 +84,8 @@ public class RecoveryService extends AbstractService {
   private FileSystem recoveryDirFS; // FS where staging dir exists
   Path recoveryPath;
   @VisibleForTesting
-  public Map<TezDAGID, RecoveryStream> outputStreamMap = new HashMap<>();
+  public Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
+      HashMap<TezDAGID, FSDataOutputStream>();
   private int bufferSize;
   @VisibleForTesting
   public FSDataOutputStream summaryStream;
@@ -101,31 +101,6 @@ public class RecoveryService extends AbstractService {
   private volatile boolean drained = true;
   private Object waitForDrained = new Object();
 
-  @VisibleForTesting
-  public static class RecoveryStream {
-    private final FSDataOutputStream outputStream;
-    private final CodedOutputStream codedOutputStream;
-
-    RecoveryStream(FSDataOutputStream outputStream) {
-      this.outputStream = outputStream;
-      this.codedOutputStream = CodedOutputStream.newInstance(outputStream);
-    }
-
-    public void write(byte[] bytes) throws IOException {
-      codedOutputStream.writeRawBytes(bytes);
-    }
-
-    public void flush() throws IOException {
-      codedOutputStream.flush();
-      outputStream.hflush();
-    }
-
-    public void close() throws IOException {
-      flush();
-      outputStream.close();
-    }
-  }
-
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
     this.appContext = appContext;
@@ -256,9 +231,10 @@ public class RecoveryService extends AbstractService {
           }
         }
       }
-      for (Entry<TezDAGID, RecoveryStream> entry : outputStreamMap.entrySet()) {
+      for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
         try {
           LOG.info("Closing Output Stream for DAG " + entry.getKey());
+          entry.getValue().hflush();
           entry.getValue().close();
         } catch (IOException ioe) {
           if (!recoveryDirFS.exists(recoveryPath)) {
@@ -327,7 +303,7 @@ public class RecoveryService extends AbstractService {
     if (event.getHistoryEvent() instanceof SummaryEvent) {
       synchronized (lock) {
         if (stopped.get()) {
-          LOG.warn("Ignoring event as service stopped, eventType"
+          LOG.warn("Igoring event as service stopped, eventType"
               + event.getHistoryEvent().getEventType());
           return;
         }
@@ -453,9 +429,9 @@ public class RecoveryService extends AbstractService {
       return;
     }
 
-    RecoveryStream recoveryStream = outputStreamMap.get(dagID);
-    if (recoveryStream == null) {
+    if (!outputStreamMap.containsKey(dagID)) {
       Path dagFilePath = TezCommonUtils.getDAGRecoveryPath(recoveryPath, dagID.toString());
+      FSDataOutputStream outputStream;
       if (recoveryDirFS.exists(dagFilePath)) {
         createFatalErrorFlagDir();
         return;
@@ -464,12 +440,12 @@ public class RecoveryService extends AbstractService {
           LOG.debug("Opening DAG recovery file in create mode"
               + ", filePath=" + dagFilePath);
         }
-        FSDataOutputStream outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
-        recoveryStream = new RecoveryStream(outputStream);
+        outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
       }
-      outputStreamMap.put(dagID, recoveryStream);
+      outputStreamMap.put(dagID, outputStream);
     }
 
+    FSDataOutputStream outputStream = outputStreamMap.get(dagID);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing recovery event to output stream"
@@ -477,15 +453,15 @@ public class RecoveryService extends AbstractService {
           + ", eventType=" + eventType);
     }
     ++unflushedEventsCount;
-    recoveryStream.codedOutputStream.writeFixed32NoTag(event.getHistoryEvent().getEventType().ordinal());
-    event.getHistoryEvent().toProtoStream(recoveryStream.codedOutputStream);
+    outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
+    event.getHistoryEvent().toProtoStream(outputStream);
     if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
         HistoryEventType.DAG_FINISHED).contains(eventType)) {
-      maybeFlush(recoveryStream);
+      maybeFlush(outputStream);
     }
   }
 
-  private void maybeFlush(RecoveryStream recoveryStream) throws IOException {
+  private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
     long currentTime = appContext.getClock().getTime();
     boolean doFlush = false;
     if (maxUnflushedEvents >=0
@@ -506,12 +482,12 @@ public class RecoveryService extends AbstractService {
     if (!doFlush) {
       return;
     }
-    doFlush(recoveryStream, currentTime);
+    doFlush(outputStream, currentTime);
   }
 
-  private void doFlush(RecoveryStream recoveryStream,
+  private void doFlush(FSDataOutputStream outputStream,
       long currentTime) throws IOException {
-    recoveryStream.flush();
+    outputStream.hflush();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flushing output stream"

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 1c09d5d..6673b39 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -20,15 +20,12 @@ package org.apache.tez.dag.app;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import com.google.common.collect.Sets;
-import com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,8 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -64,7 +59,6 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -97,8 +91,6 @@ public class TestRecoveryParser {
   private Path recoveryPath;
   private DAGAppMaster mockAppMaster;
   private DAGImpl mockDAGImpl;
-  // Protobuf message limit is 64 MB by default
-  private static final int PROTOBUF_DEFAULT_SIZE_LIMIT = 64 << 20;
 
   @Before
   public void setUp() throws IllegalArgumentException, IOException {
@@ -113,6 +105,7 @@ public class TestRecoveryParser {
     mockDAGImpl = mock(DAGImpl.class);
     when(mockAppMaster.createDAG(any(DAGPlan.class), any(TezDAGID.class))).thenReturn(mockDAGImpl);
     parser = new RecoveryParser(mockAppMaster, localFS, recoveryPath, 3);
+    LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
   private DAGSummaryData createDAGSummaryData(TezDAGID dagId, boolean completed) {
@@ -274,7 +267,7 @@ public class TestRecoveryParser {
             null, "user", new Configuration(), null, null)));
     // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
     rService.await();
-    rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8"));
+    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
     rService.stop();
 
     // write data in attempt_2
@@ -285,7 +278,7 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
     rService.await();
-    rService.outputStreamMap.get(dagID).write("INVALID_DATA".getBytes("UTF-8"));
+    rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
     rService.stop();
 
     // corrupted last records will be skipped but the whole recovery logs will be read
@@ -625,75 +618,6 @@ public class TestRecoveryParser {
               + ", but its full recovery events are not seen"));
   }
 
-  @Test(timeout=20000)
-  public void testRecoveryLargeEventData() throws IOException {
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
-    AppContext appContext = mock(AppContext.class);
-    when(appContext.getCurrentRecoveryDir()).thenReturn(new Path(recoveryPath+"/1"));
-    when(appContext.getClock()).thenReturn(new SystemClock());
-    when(mockDAGImpl.getID()).thenReturn(dagID);
-    when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
-    when(appContext.getApplicationID()).thenReturn(appId);
-
-    RecoveryService rService = new RecoveryService(appContext);
-    Configuration conf = new Configuration();
-    conf.setBoolean(RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, true);
-    rService.init(conf);
-    rService.start();
-
-    DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
-    // DAG  DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
-    rService.handle(new DAGHistoryEvent(dagID,
-        new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
-            null, "user", new Configuration(), null, null)));
-    DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L,
-        "user", "dagName", null);
-    DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
-    rService.handle(new DAGHistoryEvent(dagID, dagInitedEvent));
-    rService.handle(new DAGHistoryEvent(dagID, dagStartedEvent));
-
-    // Create a Recovery event larger than 64 MB to verify default max protobuf size
-    ArrayList<TaskLocationHint> taskLocationHints = new ArrayList<>(100000);
-    TaskLocationHint taskLocationHint = TaskLocationHint.createTaskLocationHint(
-        Sets.newHashSet("aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa.aaaaaaaaaaaaaaa",
-            "bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb.bbbbbbbbbbbbbbb",
-            "ccccccccccccccc.ccccccccccccccc.ccccccccccccccc",
-            "ddddddddddddddd.ddddddddddddddd.ddddddddddddddd",
-            "eeeeeeeeeeeeeee.eeeeeeeeeeeeeee.eeeeeeeeeeeeeee",
-            "fffffffffffffff.fffffffffffffff.fffffffffffffff",
-            "ggggggggggggggg.ggggggggggggggg.ggggggggggggggg",
-            "hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh.hhhhhhhhhhhhhhh",
-            "iiiiiiiiiiiiiii.iiiiiiiiiiiiiii.iiiiiiiiiiiiiii",
-            "jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj.jjjjjjjjjjjjjjj",
-            "kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk.kkkkkkkkkkkkkkk",
-            "lllllllllllllll.lllllllllllllll.lllllllllllllll",
-            "mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm.mmmmmmmmmmmmmmm",
-            "nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn.nnnnnnnnnnnnnnn"),
-        Sets.newHashSet("rack1", "rack2", "rack3"));
-    for (int i = 0; i < 100000; i++) {
-      taskLocationHints.add(taskLocationHint);
-    }
-
-    TezVertexID v0Id = TezVertexID.getInstance(dagID, 0);
-    VertexLocationHint vertexLocationHint = VertexLocationHint.create(taskLocationHints);
-    VertexConfigurationDoneEvent vertexConfigurationDoneEvent = new VertexConfigurationDoneEvent(
-        v0Id, 0, 100000, vertexLocationHint, null, null, false);
-    // Verify large protobuf message
-    assertTrue(vertexConfigurationDoneEvent.toProto().getSerializedSize() > PROTOBUF_DEFAULT_SIZE_LIMIT );
-    rService.handle(new DAGHistoryEvent(dagID, vertexConfigurationDoneEvent));
-    rService.stop();
-
-    DAGRecoveryData dagData = parser.parseRecoveryData();
-    VertexRecoveryData v0data = dagData.getVertexRecoveryData(v0Id);
-    assertNotNull("Vertex Recovery Data should be non-null", v0data);
-    VertexConfigurationDoneEvent parsedVertexConfigurationDoneEvent = v0data.getVertexConfigurationDoneEvent();
-    assertNotNull("Vertex Configuration Done Event should be non-null", parsedVertexConfigurationDoneEvent);
-    VertexLocationHint parsedVertexLocationHint = parsedVertexConfigurationDoneEvent.getVertexLocationHint();
-    assertNotNull("Vertex Location Hint should be non-null", parsedVertexLocationHint);
-    assertEquals(parsedVertexLocationHint.getTaskLocationHints().size(), 100000);
-  }
-
   @Test(timeout=5000)
   public void testRecoveryData() throws IOException {
     ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 50a80cb..47d8389 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,9 +89,7 @@ public class TestHistoryEventsProtoConversion {
   private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     HistoryEvent deserializedEvent = null;
-    CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(os);
-    event.toProtoStream(codedOutputStream);
-    codedOutputStream.flush();
+    event.toProtoStream(os);
     os.flush();
     os.close();
     deserializedEvent = ReflectionUtils.createClazzInstance(
@@ -102,7 +98,7 @@ public class TestHistoryEventsProtoConversion {
         + ", eventType=" + event.getEventType()
         + ", bufLen=" + os.toByteArray().length);
     deserializedEvent.fromProtoStream(
-        CodedInputStream.newInstance(os.toByteArray()));
+        new ByteArrayInputStream(os.toByteArray()));
     return deserializedEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index bac0e8c..c727a8f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -136,11 +135,6 @@ public class MiniTezCluster extends MiniYARNCluster {
     conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
 
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000);
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
 
     try {
       Path stagingPath = FileContext.getFileContext(conf).makeQualified(

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index 50c5a66..c08780f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -24,8 +24,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.protobuf.CodedInputStream;
-import com.google.protobuf.CodedOutputStream;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ReflectionUtils;
@@ -224,16 +222,15 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
 
     private String encodeHistoryEvent(HistoryEvent event) throws IOException {
       ByteArrayOutputStream out = new ByteArrayOutputStream();
-      CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
-      event.toProtoStream(codedOutputStream);
-      codedOutputStream.flush();
+      event.toProtoStream(out);
       return event.getClass().getName() + ","
           + Base64.encodeBase64String(out.toByteArray());
     }
 
     private HistoryEvent decodeHistoryEvent(String eventClass, String base64)
         throws IOException {
-      CodedInputStream in = CodedInputStream.newInstance(Base64.decodeBase64(base64));
+      ByteArrayInputStream in = new ByteArrayInputStream(
+          Base64.decodeBase64(base64));
       try {
         HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass);
         event.fromProtoStream(in);

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 6d3ab1c..f00ae5c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -26,7 +26,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -176,9 +175,6 @@ public class TestAMRecovery {
     tezConf.setBoolean(
         RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED,
         true);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index cf4744b..b0c9ccc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -18,9 +18,6 @@
 
 package org.apache.tez.test;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -136,9 +133,6 @@ public class TestDAGRecovery {
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
 
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
@@ -160,7 +154,7 @@ public class TestDAGRecovery {
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
-    DAGStatus dagStatus = dagClient.getDAGStatus(null, 10);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
     while (!dagStatus.isCompleted()) {
       LOG.info("Waiting for dag to complete. Sleeping for 500ms."
           + " DAG name: " + dag.getName()

http://git-wip-us.apache.org/repos/asf/tez/blob/21cd0241/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index c7b1fb9..93fd972 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -32,7 +32,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -479,9 +478,6 @@ public class TestRecovery {
         RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
     tezConf.setBoolean(
         TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, 0);
-    tezConf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY,1000);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
 
     hashJoinExample.setConf(tezConf);


Mime
View raw message