tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject tez git commit: TEZ-3140. Reduce AM memory usage during serialization
Date Mon, 07 Mar 2016 15:57:40 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 765c75b37 -> 570a190c6


TEZ-3140. Reduce AM memory usage during serialization


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/570a190c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/570a190c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/570a190c

Branch: refs/heads/branch-0.7
Commit: 570a190c6e9362783d1e93343118d28cf1e031d5
Parents: 765c75b
Author: Rohini Palaniswamy <rohini@apache.org>
Authored: Mon Mar 7 07:57:22 2016 -0800
Committer: Rohini Palaniswamy <rohini@apache.org>
Committed: Mon Mar 7 07:57:22 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/EntityDescriptor.java    | 104 +++++++++++++++++--
 .../tez/dag/api/TestEntityDescriptor.java       |  63 +++++++++++
 3 files changed, 157 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73329e..2e2f852 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3140. Reduce AM memory usage during serialization
   TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application.

   TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3149. Tez-tools: Add username in DagInfo

http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
index d02bddd..dcc4ebf 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
@@ -21,10 +21,11 @@ package org.apache.tez.dag.api;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-
 import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
@@ -32,7 +33,7 @@ import com.google.common.base.Preconditions;
 
 /**
  * Describes a given user code entity. Consists of the name of the class implementing
- * the user logic and a payload that can be used to configure an object instance of 
+ * the user logic and a payload that can be used to configure an object instance of
  * that class. In addition some history information can be set for logging/debugging.
   * <br>This is not supposed to be extended by users. Users are expected to use the
derived
   * classes for specific entities
@@ -41,6 +42,7 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("unchecked")
 public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements
Writable {
 
+  private static final int SERIALIZE_BUFFER_SIZE = 8192;
   private UserPayload userPayload = null;
   private String className;
   protected String historyText;
@@ -48,7 +50,7 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>>
implements
   @Private // for Writable
   public EntityDescriptor() {
   }
-  
+
   public EntityDescriptor(String className) {
     this.className = className;
   }
@@ -91,7 +93,7 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>>
implements
   public String getClassName() {
     return this.className;
   }
-  
+
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, className);
@@ -100,17 +102,25 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>>
implements
     if (bb == null) {
       out.writeInt(-1);
     } else {
-      int size = bb.limit() - bb.position();
+      int size = bb.remaining();
       if (size == 0) {
         out.writeInt(-1);
       } else {
         out.writeInt(size);
-        byte[] bytes = new byte[size];
-        // This modified the ByteBuffer, and primarily works since UserPayload.getByteBuffer
-        // return a new copy each time
-        bb.get(bytes);
-        // TODO: TEZ-305 - should be more efficient by using protobuf serde.
-        out.write(bytes);
+        if (out instanceof DataOutputBuffer) {
+          DataOutputBuffer buf = (DataOutputBuffer) out;
+          buf.write(new ByteBufferDataInput(bb), size);
+        } else {
+          // This code is just for fallback in case serialization is changed to
+          // use something other than DataOutputBuffer.
+          int len;
+          byte[] buf = new byte[SERIALIZE_BUFFER_SIZE];
+          do {
+            len = Math.min(bb.remaining(), SERIALIZE_BUFFER_SIZE);
+            bb.get(buf, 0, len);
+            out.write(buf, 0, len);
+          } while (bb.remaining() > 0);
+        }
       }
       out.writeInt(userPayload.getVersion());
     }
@@ -134,4 +144,76 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>>
implements
         userPayload == null ? false : userPayload.getPayload() == null ? false : true;
     return "ClassName=" + className + ", hasPayload=" + hasPayload;
   }
+
+  private static class ByteBufferDataInput implements DataInput {
+
+    private final ByteBuffer bb;
+
+    public ByteBufferDataInput(ByteBuffer bb) {
+      this.bb = bb;
+    }
+
+    @Override
+    public void readFully(byte[] b) throws IOException {
+      bb.get(b, 0, bb.remaining());
+    }
+
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException {
+      bb.get(b, off, len);
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public boolean readBoolean() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public byte readByte() throws IOException {
+      return bb.get();
+    }
+    @Override
+    public int readUnsignedByte() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public short readShort() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public int readUnsignedShort() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public char readChar() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public int readInt() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public long readLong() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public float readFloat() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public double readDouble() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public String readLine() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public String readUTF() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java
new file mode 100644
index 0000000..1e8a99d
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEntityDescriptor {
+
+  @Test
+  public void testEntityDescriptorHadoopSerialization() throws IOException {
+    // This tests the alternate serialization code path
+    // if the DataOutput is not DataOutputBuffer
+    Configuration conf = new Configuration(true);
+    String confVal = RandomStringUtils.random(10000, true, true);
+    conf.set("testKey", confVal);
+    UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+    InputDescriptor entityDescriptor =
+        InputDescriptor.create("inputClazz").setUserPayload(payload)
+        .setHistoryText("Bar123");
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bos);
+    entityDescriptor.write(out);
+    out.close();
+
+    InputDescriptor deserialized = InputDescriptor.create("dummy");
+    deserialized.readFields(new DataInputStream(new ByteArrayInputStream(bos.toByteArray())));
+
+    Assert.assertEquals(entityDescriptor.getClassName(), deserialized.getClassName());
+    // History text is not serialized when sending to tasks
+    Assert.assertNull(deserialized.getHistoryText());
+    Assert.assertArrayEquals(payload.deepCopyAsArray(), deserialized.getUserPayload().deepCopyAsArray());
+    Configuration deserializedConf = TezUtils.createConfFromUserPayload(deserialized.getUserPayload());
+    Assert.assertEquals(confVal, deserializedConf.get("testKey"));
+  }
+
+}


Mime
View raw message