tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-596. Change MRHelpers.serializeConf* methods to use Protobuf for serialization. Contributed by Mohammad Kamrul Islam
Date Thu, 13 Feb 2014 00:08:20 GMT
Updated Branches:
  refs/heads/master c50152c71 -> ac471ada3


TEZ-596. Change MRHelpers.serializeConf* methods to use Protobuf for
serialization. Contributed by Mohammad Kamrul Islam


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ac471ada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ac471ada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ac471ada

Branch: refs/heads/master
Commit: ac471ada3adccb8e99c7fdd785128a0a92360e86
Parents: c50152c
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Feb 12 16:07:47 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Feb 12 16:07:47 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezUtils.java    | 147 ++++++++++++-------
 .../org/apache/tez/common/TestTezUtils.java     |  77 ++++++++++
 2 files changed, 168 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ac471ada/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 4e433aa..3df9adb 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -18,11 +18,12 @@
 package org.apache.tez.common;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
@@ -32,8 +33,6 @@ import java.util.zip.InflaterInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -43,16 +42,14 @@ import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 
 public class TezUtils {
-  
+
   private static final Log LOG = LogFactory.getLog(TezUtils.class);
-  
-  public static void addUserSpecifiedTezConfiguration(Configuration conf) 
-      throws IOException {
+
+  public static void addUserSpecifiedTezConfiguration(Configuration conf) throws IOException
{
     FileInputStream confPBBinaryStream = null;
     ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
     try {
-      confPBBinaryStream = new FileInputStream(
-          TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+      confPBBinaryStream = new FileInputStream(TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
       confProtoBuilder.mergeFrom(confPBBinaryStream);
     } finally {
       if (confPBBinaryStream != null) {
@@ -63,54 +60,101 @@ public class TezUtils {
     ConfigurationProto confProto = confProtoBuilder.build();
 
     List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-    if(kvPairList != null && !kvPairList.isEmpty()) {
-      for(PlanKeyValuePair kvPair : kvPairList) {
+    if (kvPairList != null && !kvPairList.isEmpty()) {
+      for (PlanKeyValuePair kvPair : kvPairList) {
         conf.set(kvPair.getKey(), kvPair.getValue());
       }
     }
   }
-  
-  public static ByteString createByteStringFromConf(Configuration conf)
-      throws IOException {
+
+  /**
+   * Convert a Configuration to compressed ByteString using Protocol buffer
+   * 
+   * @param conf
+   *          : Configuration to be converted
+   * @return PB ByteString (compressed)
+   * @throws IOException
+   */
+  public static ByteString createByteStringFromConf(Configuration conf) throws IOException
{
     Preconditions.checkNotNull(conf, "Configuration must be specified");
     ByteString.Output os = ByteString.newOutput();
-    //SnappyOutputStream compressOs = new SnappyOutputStream(os);
-    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(Deflater.BEST_SPEED));
-    DataOutputStream dos = new DataOutputStream(compressOs);
-    conf.write(dos);
-    dos.close();
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
+        new Deflater(Deflater.BEST_SPEED));
+    try {
+      writeConfInPB(compressOs, conf);
+    } finally {
+      if (compressOs != null) {
+        compressOs.close();
+      }
+    }
     return os.toByteString();
   }
 
-  public static byte[] createUserPayloadFromConf(Configuration conf)
-      throws IOException {
-    Preconditions.checkNotNull(conf, "Configuration must be specified");
-    DataOutputBuffer dob = new DataOutputBuffer();
-    conf.write(dob);
-    return compressBytes(dob.getData());
+  /**
+   * Convert a Configuration to compressed user pay load (i.e. byte[]) using
+   * Protocol buffer
+   * 
+   * @param conf
+   *          : Configuration to be converted
+   * @return compressed pay load
+   * @throws IOException
+   */
+  public static byte[] createUserPayloadFromConf(Configuration conf) throws IOException {
+    return createByteStringFromConf(conf).toByteArray();
   }
 
-  public static Configuration createConfFromByteString(ByteString byteString)
-      throws IOException {
+  /**
+   * Convert compressed byte string to a Configuration object using protocol
+   * buffer
+   * 
+   * @param byteString
+   *          :compressed conf in Protocol buffer
+   * @return Configuration
+   * @throws IOException
+   */
+  public static Configuration createConfFromByteString(ByteString byteString) throws IOException
{
     Preconditions.checkNotNull(byteString, "ByteString must be specified");
-//    SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());
+    // SnappyInputStream uncompressIs = new
+    // SnappyInputStream(byteString.newInput());
     InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
-    DataInputStream dataInputStream = new DataInputStream(uncompressIs);
+    ConfigurationProto confProto = ConfigurationProto.parseFrom(uncompressIs);
     Configuration conf = new Configuration(false);
-    conf.readFields(dataInputStream);
+    readConfFromPB(confProto, conf);
     return conf;
   }
-  
-  public static Configuration createConfFromUserPayload(byte[] bb)
-      throws IOException {
-    // TODO Avoid copy ?
-    Preconditions.checkNotNull(bb, "Bytes must be specified");
-    byte[] uncompressed = uncompressBytes(bb);
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(uncompressed, 0, uncompressed.length);
-    Configuration conf = new Configuration(false);
-    conf.readFields(dib);
-    return conf;
+
+  /**
+   * Convert compressed pay load in byte[] to a Configuration object using
+   * protocol buffer
+   * 
+   * @param bb
+   *          : compressed pay load
+   * @return Configuration
+   * @throws IOException
+   */
+  public static Configuration createConfFromUserPayload(byte[] bb) throws IOException {
+    return createConfFromByteString(ByteString.copyFrom(bb));
+  }
+
+  private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException
{
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+    Iterator<Entry<String, String>> iter = conf.iterator();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
+      PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+      kvp.setKey(entry.getKey());
+      kvp.setValue(entry.getValue());
+      confProtoBuilder.addConfKeyValues(kvp);
+    }
+    ConfigurationProto confProto = confProtoBuilder.build();
+    confProto.writeTo(dos);
+  }
+
+  private static void readConfFromPB(ConfigurationProto confProto, Configuration conf) {
+    List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
+    for (PlanKeyValuePair setting : settingList) {
+      conf.set(setting.getKey(), setting.getValue());
+    }
   }
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {
@@ -121,8 +165,8 @@ public class TezUtils {
     byte[] compressed = compressBytesInflateDeflate(inBytes);
     if (LOG.isDebugEnabled()) {
       sw.stop();
-      LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: "
-          + compressed.length + ", CompressTime: " + sw.elapsedMillis());
+      LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+          + ", CompressTime: " + sw.elapsedMillis());
     }
     return compressed;
   }
@@ -135,21 +179,12 @@ public class TezUtils {
     byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
     if (LOG.isDebugEnabled()) {
       sw.stop();
-      LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: "
-          + uncompressed.length + ", UncompressTimeTaken: "
-          + sw.elapsedMillis());
+      LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+          + ", UncompressTimeTaken: " + sw.elapsedMillis());
     }
     return uncompressed;
   }
-  
-//  private static byte[] compressBytesSnappy(byte[] inBytes) throws IOException {
-//    return Snappy.compress(inBytes);
-//  }
-//
-//  private static byte[] uncompressBytesSnappy(byte[] inBytes) throws IOException {
-//    return Snappy.uncompress(inBytes);
-//  }  
-  
+
   private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
     Deflater deflater = new Deflater(Deflater.BEST_SPEED);
     deflater.setInput(inBytes);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ac471ada/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
new file mode 100644
index 0000000..2d132a1
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestTezUtils {
+  @Test
+  public void testByteStringToAndFromConf() throws IOException {
+    Configuration conf = getConf();
+    Assert.assertEquals(conf.size(), 6);
+    ByteString bsConf = TezUtils.createByteStringFromConf(conf);
+    conf.clear();
+    Assert.assertEquals(conf.size(), 0);
+    conf = TezUtils.createConfFromByteString(bsConf);
+    Assert.assertEquals(conf.size(), 6);
+    checkConf(conf);
+  }
+
+  @Test
+  public void testPayloadToAndFromConf() throws IOException {
+    Configuration conf = getConf();
+    Assert.assertEquals(conf.size(), 6);
+    byte[] bConf = TezUtils.createUserPayloadFromConf(conf);
+    conf.clear();
+    Assert.assertEquals(conf.size(), 0);
+    conf = TezUtils.createConfFromUserPayload(bConf);
+    Assert.assertEquals(conf.size(), 6);
+    checkConf(conf);
+  }
+
+  private Configuration getConf() {
+    Configuration conf = new Configuration(false);
+    conf.set("test1", "value1");
+    conf.setBoolean("test2", true);
+    conf.setDouble("test3", 1.2345);
+    conf.setInt("test4", 34567);
+    conf.setLong("test5", 1234567890L);
+    conf.setStrings("test6", "S1", "S2", "S3");
+    return conf;
+  }
+
+  private void checkConf(Configuration conf) {
+    Assert.assertEquals(conf.get("test1"), "value1");
+    Assert.assertTrue(conf.getBoolean("test2", false));
+    Assert.assertEquals(conf.getDouble("test3", 0), 1.2345, 1e-15);
+    Assert.assertEquals(conf.getInt("test4", 0), 34567);
+    Assert.assertEquals(conf.getLong("test5", 0), 1234567890L);
+    String tmp[] = conf.getStrings("test6");
+    Assert.assertEquals(tmp.length, 3);
+    Assert.assertEquals(tmp[0], "S1");
+    Assert.assertEquals(tmp[1], "S2");
+    Assert.assertEquals(tmp[2], "S3");
+
+  }
+}


Mime
View raw message