tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3284. Synchronization for every write in UnorderdKVWriter (jeagles)
Date Thu, 08 Sep 2016 15:40:10 GMT
Repository: tez
Updated Branches:
  refs/heads/master 91a397b0b -> 495e6f0a4


TEZ-3284. Synchronization for every write in UnorderdKVWriter (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/495e6f0a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/495e6f0a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/495e6f0a

Branch: refs/heads/master
Commit: 495e6f0a41c4e359c4e4f6786bea74f914d2c8b7
Parents: 91a397b
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Sep 8 10:37:58 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Sep 8 10:38:13 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/common/TezUtilsInternal.java |   6 +-
 .../common/io/NonSyncByteArrayInputStream.java  |  99 ++++++++++++++++
 .../common/io/NonSyncByteArrayOutputStream.java | 113 +++++++++++++++++++
 .../tez/common/io/NonSyncDataOutputStream.java  |  57 ++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   8 +-
 .../org/apache/tez/examples/JoinDataGen.java    |  12 +-
 .../tez/mapreduce/hadoop/MRInputHelpers.java    |   4 +-
 .../tez/history/parser/ATSFileParser.java       |   4 +-
 .../vertexmanager/ShuffleVertexManagerBase.java |   4 +-
 .../common/shuffle/MemoryFetchedInput.java      |   4 +-
 .../shuffle/orderedgrouped/InMemoryReader.java  |   4 +-
 .../shuffle/orderedgrouped/InMemoryWriter.java  |   4 +-
 .../common/sort/impl/PipelinedSorter.java       |   6 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   4 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   6 +-
 .../examples/MultipleCommitsExample.java        |  12 +-
 17 files changed, 309 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3f6281f..060d6d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3284. Synchronization for every write in UnorderdKVWriter
   TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain
cases
   TEZ-3230. Implement vertex manager and edge manager of cartesian product edge.
   TEZ-3326. Display JVM system properties in AM and task logs.

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 08a9aa8..b0b8906 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -17,7 +17,6 @@
 
 package org.apache.tez.common;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -41,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.log4j.Appender;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -139,7 +139,7 @@ public class TezUtilsInternal {
   private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
     Deflater deflater = new Deflater(Deflater.BEST_SPEED);
     deflater.setInput(inBytes);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
     deflater.finish();
     byte[] buffer = new byte[1024 * 8];
     while (!deflater.finished()) {
@@ -153,7 +153,7 @@ public class TezUtilsInternal {
   private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException
{
     Inflater inflater = new Inflater();
     inflater.setInput(inBytes);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length);
     byte[] buffer = new byte[1024 * 8];
     while (!inflater.finished()) {
       int count;

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
new file mode 100644
index 0000000..5b6e52a
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * A thread-not-safe version of ByteArrayInputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
+  public NonSyncByteArrayInputStream(byte[] bs) {
+    super(bs);
+  }
+
+  public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int read() {
+    return (pos < count) ? (buf[pos++] & 0xff) : -1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int read(byte b[], int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    if (pos >= count) {
+      return -1;
+    }
+
+    int avail = count - pos;
+    if (len > avail) {
+      len = avail;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+    System.arraycopy(buf, pos, b, off, len);
+    pos += len;
+    return len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long skip(long n) {
+    long k = count - pos;
+    if (n < k) {
+      k = n < 0 ? 0 : n;
+    }
+
+    pos += k;
+    return k;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int available() {
+    return count - pos;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void reset() {
+    pos = mark;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
new file mode 100644
index 0000000..40fae6f
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+/**
+ * A thread-not-safe version of ByteArrayOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
+  public NonSyncByteArrayOutputStream(int size) {
+    super(size);
+  }
+
+  public NonSyncByteArrayOutputStream() {
+    super();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(int b) {
+    enLargeBuffer(1);
+    buf[count] = (byte) b;
+    count += 1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(byte b[], int off, int len) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    enLargeBuffer(len);
+    System.arraycopy(b, off, buf, count, len);
+    count += len;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void reset() {
+    count = 0;
+  }
+
+  public void write(DataInput in, int length) throws IOException {
+    enLargeBuffer(length);
+    in.readFully(buf, count, length);
+    count += length;
+  }
+
+  private int enLargeBuffer(int increment) {
+    int temp = count + increment;
+    int newLen = temp;
+    if (temp > buf.length) {
+      if ((buf.length << 1) > temp) {
+        newLen = buf.length << 1;
+      }
+      byte newbuf[] = new byte[newLen];
+      System.arraycopy(buf, 0, newbuf, 0, count);
+      buf = newbuf;
+    }
+    return newLen;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    out.write(buf, 0, count);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override  public byte toByteArray()[] {
+    return Arrays.copyOf(buf, count);
+  }
+
+  /**
+   * {@inheritDoc}
+   */  public int size() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
new file mode 100644
index 0000000..d6302fe
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A thread-not-safe version of DataOutputStream, which removes all
+ * synchronized modifiers.
+ */
+public class NonSyncDataOutputStream extends DataOutputStream {
+  public NonSyncDataOutputStream(OutputStream stream) {
+    super(stream);
+  }
+
+  private void incrementWritten(int len) {
+    written += len;
+    if (written < 0) {
+      written = Integer.MAX_VALUE;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(int b) throws IOException {
+    out.write(b);
+    incrementWritten(1);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    out.write(b, off, len);
+    incrementWritten(len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 01bca8f..1dd7756 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -65,6 +63,8 @@ import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -2583,7 +2583,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
             + ", use NoOpVertexManager to replace it, vertexId=" + logIdentifier);
         LOG.debug("VertexReconfigureDoneEvent=" + reconfigureDoneEvent);
       }
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
       try {
         reconfigureDoneEvent.toProtoStream(out);
       } catch (IOException e) {
@@ -4458,7 +4458,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
         LOG.debug("initialize NoOpVertexManager");
       }
       configurationDoneEvent = new VertexConfigurationDoneEvent();
-      configurationDoneEvent.fromProtoStream(new ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
+      configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray()));
       String vertexName = getContext().getVertexName();
       if (getContext().getVertexNumTasks(vertexName) == -1) {
         Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism
must be called "

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
index 02728aa..4c0d201 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java
@@ -18,10 +18,7 @@
 
 package org.apache.tez.examples;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
@@ -41,6 +38,9 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -171,8 +171,8 @@ public class JoinDataGen extends TezExampleBase {
 
     public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
         throws IOException {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      DataOutputStream dos = new DataOutputStream(bos);
+      NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream();
+      NonSyncDataOutputStream dos = new NonSyncDataOutputStream(bos);
       dos.writeLong(streamOutputFileSize);
       dos.writeLong(hashOutputFileSize);
       dos.close();
@@ -183,7 +183,7 @@ public class JoinDataGen extends TezExampleBase {
     @Override
     public void initialize() throws Exception {
       byte[] payload = getContext().getUserPayload().deepCopyAsArray();
-      ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+      NonSyncByteArrayInputStream bis = new NonSyncByteArrayInputStream(payload);
       DataInputStream dis = new DataInputStream(bis);
       streamOutputFileSize = dis.readLong();
       hashOutputFileSize = dis.readLong();

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 6262e59..9b88c4d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -247,7 +247,7 @@ public class MRInputHelpers {
 
     ByteString.Output os = ByteString
         .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
-    oldSplit.write(new DataOutputStream(os));
+    oldSplit.write(new NonSyncDataOutputStream(os));
     ByteString splitBs = os.toByteString();
     builder.setSplitBytes(splitBs);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
index b4f3df3..fb42129 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.io.IOUtils;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
 import org.apache.tez.history.parser.datamodel.BaseParser;
 import org.apache.tez.history.parser.datamodel.Constants;
 import org.apache.tez.history.parser.datamodel.DagInfo;
@@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -176,7 +176,7 @@ public class ATSFileParser extends BaseParser implements ATSData {
 
   private JSONObject readJson(InputStream in) throws IOException, JSONException {
     //Read entire content to memory
-    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    final NonSyncByteArrayOutputStream bout = new NonSyncByteArrayOutputStream();
     IOUtils.copy(in, bout);
     return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 951ce30..9b88cfd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -29,6 +29,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -55,7 +56,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.BitSet;
@@ -337,7 +337,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
           ByteString compressedPartitionStats = proto.getPartitionStats();
           byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
               compressedPartitionStats);
-          ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
+          NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData);
           partitionStats.deserialize(new DataInputStream(bin));
 
           parsePartitionStats(partitionStats);

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
index e25a325..78f1f3b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java
@@ -18,11 +18,11 @@
 
 package org.apache.tez.runtime.library.common.shuffle;
 
-import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 import com.google.common.base.Preconditions;
@@ -45,7 +45,7 @@ public class MemoryFetchedInput extends FetchedInput {
 
   @Override
   public InputStream getInputStream() {
-    return new ByteArrayInputStream(byteStream.getBuffer());
+    return new NonSyncByteArrayInputStream(byteStream.getBuffer());
   }
 
   public byte[] getBytes() {

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
index 12fe057..41e8432 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -27,6 +26,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@@ -38,7 +38,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
 
-  private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput
{
+  private static class ByteArrayDataInput extends NonSyncByteArrayInputStream implements
DataInput {
 
     public ByteArrayDataInput(byte buf[], int offset, int length) {
       super(buf, offset, length);

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
index 17d57a6..d2778d8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.slf4j.Logger;
@@ -26,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@@ -40,7 +40,7 @@ public class InMemoryWriter extends Writer {
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
     super(null, null);
     this.out =
-      new DataOutputStream(new IFileOutputStream(arrayStream));
+      new NonSyncDataOutputStream(new IFileOutputStream(arrayStream));
   }
 
   public void append(Object key, Object value) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 897d7d7..609e9ff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -17,7 +17,6 @@
 */
 package org.apache.tez.runtime.library.common.sort.impl;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.BufferOverflowException;
@@ -46,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
 import org.apache.hadoop.io.RawComparator;
@@ -863,7 +863,7 @@ public class PipelinedSorter extends ExternalSorter {
     final byte[] rawkvmeta;
     final int kvmetabase;
     final ByteBuffer kvbuffer;
-    final DataOutputStream out;
+    final NonSyncDataOutputStream out;
     final RawComparator comparator;
     final byte[] imeta = new byte[METASIZE];
 
@@ -895,7 +895,7 @@ public class PipelinedSorter extends ExternalSorter {
       kvmeta = kvmetabuffer
                 .order(ByteOrder.nativeOrder())
                .asIntBuffer();
-      out = new DataOutputStream(
+      out = new NonSyncDataOutputStream(
               new BufferStreamWrapper(kvbuffer));
       this.comparator = comparator;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 69bfdb8..873d8e1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.runtime.library.common.sort.impl.dflt;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -43,6 +42,7 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -474,7 +474,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   /**
    * Inner class managing the spill of serialized records to disk.
    */
-  protected class BlockingBuffer extends DataOutputStream {
+  protected class BlockingBuffer extends NonSyncDataOutputStream {
 
     public BlockingBuffer() {
       super(new Buffer());

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 152096c..eff29a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.tez.runtime.library.common.writers;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -52,6 +51,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.OutputContext;
@@ -102,7 +102,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   @VisibleForTesting
   final BlockingQueue<WrappedBuffer> availableBuffers;
   private final ByteArrayOutputStream baos;
-  private final DataOutputStream dos;
+  private final NonSyncDataOutputStream dos;
   @VisibleForTesting
   WrappedBuffer currentBuffer;
   private final FileSystem rfs;
@@ -192,7 +192,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
     currentBuffer = buffers[0];
     baos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(baos);
+    dos = new NonSyncDataOutputStream(baos);
     keySerializer.open(dos);
     valSerializer.open(dos);
     rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();

http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
index fe7984b..5c93e87 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java
@@ -17,12 +17,9 @@
  */
 package org.apache.tez.mapreduce.examples;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -34,6 +31,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.io.NonSyncByteArrayInputStream;
+import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
+import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.GroupInputEdge;
@@ -172,8 +172,8 @@ public class MultipleCommitsExample extends TezExampleBase {
       }
       
       public UserPayload toUserPayload() throws IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        this.write(new DataOutputStream(out));
+        NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream();
+        this.write(new NonSyncDataOutputStream(out));
         return UserPayload.create(ByteBuffer.wrap(out.toByteArray()));
       }
 
@@ -181,7 +181,7 @@ public class MultipleCommitsExample extends TezExampleBase {
           throws IOException {
         MultipleOutputProcessorConfig config = new MultipleOutputProcessorConfig();
         config.readFields(new DataInputStream(
-            new ByteArrayInputStream(payload.deepCopyAsArray())));
+            new NonSyncByteArrayInputStream(payload.deepCopyAsArray())));
         return config;
       }
     }


Mime
View raw message