hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1620823 - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/util/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/ha...
Date Wed, 27 Aug 2014 09:06:50 GMT
Author: edwardyoon
Date: Wed Aug 27 09:06:50 2014
New Revision: 1620823

URL: http://svn.apache.org/r1620823
Log:
HAMA-915: Add Kryo serializer (edwardyoon)

Added:
    hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java   (with props)
    hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java   (with props)
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    hama/trunk/pom.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug 27 09:06:50 2014
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-915: Add Kryo serializer (edwardyoon)
    HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)
    HAMA-863: Implement SparseVector (Yexi Jiang)
 

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Wed Aug 27 09:06:50 2014
@@ -135,6 +135,10 @@
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed Aug 27 09:06:50
2014
@@ -47,36 +47,26 @@ public class BSPMessageBundle<M extends 
 
   private String className = null;
   private int bundleSize = 0;
-  private int bundleLength = 0;
 
-  ByteArrayOutputStream byteBuffer = null;
-  DataOutputStream bufferDos = null;
-
-  ByteArrayInputStream bis = null;
-  DataInputStream dis = null;
+  private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+  private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
 
   public BSPMessageBundle() {
-    byteBuffer = new ByteArrayOutputStream();
-    bufferDos = new DataOutputStream(byteBuffer);
-
     bundleSize = 0;
-    bundleLength = 0;
   }
 
-  ByteArrayOutputStream mbos = null;
-  DataOutputStream mdos = null;
+  ByteArrayOutputStream mbos = new ByteArrayOutputStream();
+  DataOutputStream mdos = new DataOutputStream(mbos);
   ByteArrayInputStream mbis = null;
   DataInputStream mdis = null;
 
   public byte[] serialize(M message) throws IOException {
-    mbos = new ByteArrayOutputStream();
-    mdos = new DataOutputStream(mbos);
+    mbos.reset();
     message.write(mdos);
     return mbos.toByteArray();
   }
 
-  private byte[] compressed;
-  private byte[] serialized;
+  private byte[] msgBytes;
 
   /**
    * Add message to this bundle.
@@ -85,46 +75,42 @@ public class BSPMessageBundle<M extends 
    */
   public void addMessage(M message) {
     try {
-      serialized = serialize(message);
+      if (className == null) {
+        className = message.getClass().getName();
+      }
+
+      msgBytes = serialize(message);
 
       if (compressor != null) {
-        if (serialized.length > threshold) {
+        if (msgBytes.length > threshold) {
           bufferDos.writeBoolean(true);
-          compressed = compressor.compress(serialized);
-          bufferDos.writeInt(compressed.length);
-          bufferDos.write(compressed);
-          bundleLength += compressed.length;
+          msgBytes = compressor.compress(msgBytes);
+          bufferDos.writeInt(msgBytes.length);
         } else {
           bufferDos.writeBoolean(false);
-          bufferDos.write(serialized);
-          bundleLength += serialized.length;
         }
-      } else {
-        bufferDos.write(serialized);
-        bundleLength += serialized.length;
       }
+      bufferDos.write(msgBytes);
+      bundleSize++;
     } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error(e);
     }
-
-    if (className == null) {
-      className = message.getClass().getName();
-    }
-    bundleSize++;
   }
 
   public byte[] getBuffer() {
     return byteBuffer.toByteArray();
   }
 
+  private ByteArrayInputStream bis = null;
+  private DataInputStream dis = null;
+
   public Iterator<M> iterator() {
     bis = new ByteArrayInputStream(byteBuffer.toByteArray());
     dis = new DataInputStream(bis);
 
     Iterator<M> it = new Iterator<M>() {
+      Class<M> clazz = null;
       M msg;
-      byte[] decompressed;
 
       @Override
       public boolean hasNext() {
@@ -142,43 +128,34 @@ public class BSPMessageBundle<M extends 
       @SuppressWarnings("unchecked")
       @Override
       public M next() {
-        boolean isCompressed = false;
-
-        if (compressor != null) {
-          try {
-            isCompressed = dis.readBoolean();
-          } catch (IOException e1) {
-            e1.printStackTrace();
+        try {
+          if (clazz == null) {
+            clazz = (Class<M>) Class.forName(className);
           }
-        }
 
-        Class<M> clazz = null;
-        try {
-          clazz = (Class<M>) Class.forName(className);
-        } catch (ClassNotFoundException e) {
-          LOG.error("Class was not found.", e);
-        }
+          msg = ReflectionUtils.newInstance(clazz, null);
+          boolean isCompressed = false;
 
-        msg = ReflectionUtils.newInstance(clazz, null);
+          if (compressor != null) {
+            isCompressed = dis.readBoolean();
+          }
 
-        try {
           if (isCompressed) {
-            // LOG.debug(">>>>> decompressing .........");
             int length = dis.readInt();
-            compressed = new byte[length];
-            dis.readFully(compressed);
-            decompressed = compressor.decompress(compressed);
+            msgBytes = new byte[length];
+            dis.readFully(msgBytes);
 
-            mbis = new ByteArrayInputStream(decompressed);
+            mbis = new ByteArrayInputStream(compressor.decompress(msgBytes));
             mdis = new DataInputStream(mbis);
             msg.readFields(mdis);
           } else {
             msg.readFields(dis);
           }
 
-        } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+        } catch (IOException ie) {
+          LOG.error(ie);
+        } catch (ClassNotFoundException ce) {
+          LOG.error("Class was not found.", ce);
         }
 
         return msg;
@@ -206,7 +183,7 @@ public class BSPMessageBundle<M extends 
    * @throws IOException
    */
   public long getLength() {
-    return bundleLength;
+    return byteBuffer.size();
   }
 
   @Override
@@ -214,9 +191,8 @@ public class BSPMessageBundle<M extends 
     out.writeInt(bundleSize);
     if (bundleSize > 0) {
       out.writeUTF(className);
-      byte[] messages = byteBuffer.toByteArray();
-      out.writeInt(messages.length);
-      out.write(messages);
+      out.writeInt(byteBuffer.size());
+      out.write(byteBuffer.toByteArray());
     }
   }
 

Added: hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java?rev=1620823&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java Wed Aug 27 09:06:50
2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoSerializer {
+  public static final int BUFFER_SIZE = 1024;
+
+  private final Kryo kryo;
+  private final Class<?> clazz;
+  private final byte[] buffer = new byte[BUFFER_SIZE];
+  private final Output output = new Output(buffer, -1);
+  private final Input input = new Input(buffer);
+
+  public KryoSerializer(Class<?> clazz) {
+    kryo = new Kryo();
+    kryo.setReferences(false);
+    kryo.register(clazz);
+    this.clazz = clazz;
+  }
+
+  public byte[] serialize(Object obj) throws IOException {
+    output.setBuffer(buffer, -1);
+    kryo.writeObject(output, obj);
+    return output.toBytes();
+  }
+
+  public Object deserialize(byte[] bytes) throws IOException {
+    input.setBuffer(bytes);
+    return kryo.readObject(input, clazz);
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java?rev=1620823&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java Wed Aug 27
09:06:50 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+public class TestKryoSerializer extends TestCase {
+
+  ByteArrayOutputStream mbos = null;
+  DataOutputStream mdos = null;
+  ByteArrayInputStream mbis = null;
+  DataInputStream mdis = null;
+
+  public void testSerialization() throws Exception {
+    KryoSerializer k = new KryoSerializer(DoubleWritable.class);
+    
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < 10000000; i++) {
+      DoubleWritable x = new DoubleWritable(i + 0.2);
+      byte[] bytes = k.serialize(x);
+      DoubleWritable y = (DoubleWritable) k.deserialize(bytes);
+      assertEquals(x, y);
+    }
+    System.out.println("Finished in "
+        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+  }
+
+}

Propchange: hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Wed Aug 27
09:06:50 2014
@@ -79,7 +79,7 @@ public class PageRankTest extends TestCa
 
   private void generateTestData() {
     try {
-      FastGraphGen.main(new String[] { "400", "10", INPUT, "2" });
+      FastGraphGen.main(new String[] { "4000", "100", INPUT, "3" });
     } catch (Exception e) {
       e.printStackTrace();
     }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Wed Aug 27
09:06:50 2014
@@ -19,10 +19,6 @@ package org.apache.hama.graph;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -32,6 +28,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.KryoSerializer;
 
 /**
  * Stores the serialized vertices into a memory-based list. It doesn't allow
@@ -50,11 +47,6 @@ public final class ListVerticesInfo<V ex
   private boolean lockedAdditions = false;
   private int index = 0;
 
-  private ByteArrayOutputStream bos = null;
-  private DataOutputStream dos = null;
-  private ByteArrayInputStream bis = null;
-  private DataInputStream dis = null;
-
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
@@ -130,19 +122,15 @@ public final class ListVerticesInfo<V ex
     };
   }
 
+  private final KryoSerializer kryo = new KryoSerializer(GraphJobRunner.VERTEX_CLASS);
+
   public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
-    bos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(bos);
-    vertex.write(dos);
-    return bos.toByteArray();
+    return kryo.serialize(vertex);
   }
 
+  @SuppressWarnings("unchecked")
   public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
-    bis = new ByteArrayInputStream(serialized);
-    dis = new DataInputStream(bis);
-    v = GraphJobRunner.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-
-    v.readFields(dis);
+    v = (Vertex<V, E, M>) kryo.deserialize(serialized);
     v.setRunner(runner);
     return v;
   }

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Wed Aug 27 09:06:50 2014
@@ -101,6 +101,7 @@
     <log4j.version>1.2.16</log4j.version>
     <zookeeper.version>3.4.5</zookeeper.version>
     <ant.version>1.7.1</ant.version>
+    <kryo.version>2.20</kryo.version>
   </properties>
 
   <repositories>
@@ -193,9 +194,14 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
-	<groupId>org.xerial.snappy</groupId>
-	<artifactId>snappy-java</artifactId>
-	<version>1.0.5</version>
+	    <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>1.0.5</version>
+      </dependency>
+       <dependency>
+         <groupId>com.esotericsoftware.kryo</groupId>
+         <artifactId>kryo</artifactId>
+         <version>${kryo.version}</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>



Mime
View raw message