hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1406895 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/
Date Thu, 08 Nov 2012 02:20:14 GMT
Author: edwardyoon
Date: Thu Nov  8 02:20:13 2012
New Revision: 1406895

URL: http://svn.apache.org/viewvc?rev=1406895&view=rev
Log:
Add getApproximateSize to BSPMessageBundle

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Nov  8 02:20:13 2012
@@ -19,6 +19,7 @@ Release 0.6 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-644: Add getApproximateSize to BSPMessageBundle (edwardyoon)
    HAMA-655: Add Exception handling for parsing of vertex (edwardyoon)
    HAMA-646: Add deleting temporary files method in TestSubmitGraphJob (Yuesheng Hu via edwardyoon)
    HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon & tjungblut) 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Nov  8 02:20:13
2012
@@ -17,13 +17,16 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -58,6 +61,7 @@ public class BSPMessageBundle<M extends 
       LinkedList<M> list = new LinkedList<M>();
       list.add(message);
       messages.put(className, list);
+      list = null;
     } else {
       messages.get(className).add(message);
     }
@@ -73,6 +77,42 @@ public class BSPMessageBundle<M extends 
     return mergeList;
   }
 
+  /**
+   * @return the approximate size of bundle object
+   * @throws IOException
+   */
+  public long getApproximateSize() throws IOException {
+    int sample = 20;
+    int sum = 0;
+    int totalMsgs = 0;
+    int classNames = 0;
+    DataOutputStream dos = null;
+
+    for (Map.Entry<String, LinkedList<M>> e : messages.entrySet()) {
+      classNames += e.getKey().length();
+      LinkedList<M> c = e.getValue();
+
+      if (messages.size() == 1 && c.size() < sample) {
+        dos = new DataOutputStream(new ByteArrayOutputStream());
+        write(dos);
+        dos.close();
+        return dos.size();
+      }
+
+      totalMsgs += c.size();
+      for (int i = 0; i < sample; i++) {
+        int idx = (int) (Math.random() * (c.size() - 1));
+        dos = new DataOutputStream(new ByteArrayOutputStream());
+        c.get(idx).write(dos);
+        dos.close();
+        sum += dos.size();
+      }
+    }
+
+    int avgSize = sum / (sample * messages.size());
+    return (totalMsgs * avgSize) + classNames + 4;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     // writes the k/v mapping size

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Thu Nov  8 02:20:13 2012
@@ -32,7 +32,6 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
-import org.apache.hama.util.CompressionUtil;
 import org.apache.hama.util.LRUCache;
 
 /**
@@ -102,18 +101,12 @@ public final class HadoopMessageManagerI
           + " to transfer messages to!");
     } else {
       if (compressor != null) {
-        float bundleSize = CompressionUtil.getBundleSize(bundle);
-        if (bundleSize > conf.getLong("hama.messenger.compression.threshold",
-            1048576)) {
+        if (bundle.getApproximateSize() > conf.getLong(
+            "hama.messenger.compression.threshold", 1048576)) {
           BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
-          if (CompressionUtil.getCompressionRatio(
-              (float) compMsgBundle.getData().length, bundleSize) < 1.0) {
-            bspPeerConnection.put(compMsgBundle);
-          } else {
-            bspPeerConnection.put(bundle);
-          }
+          bspPeerConnection.put(compMsgBundle);
         } else {
-          bspPeerConnection.put(bundle); 
+          bspPeerConnection.put(bundle);
         }
       } else {
         bspPeerConnection.put(bundle);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java Thu Nov  8
02:20:13 2012
@@ -28,10 +28,26 @@ import java.util.Arrays;
 
 import junit.framework.TestCase;
 
-import org.apache.hadoop.io.BytesWritable;;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
 
 public class TestBSPMessageBundle extends TestCase {
 
+  public void testApproximateSize() throws IOException {
+    BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+    for (int i = 0; i < 100; i++) {
+      bundle.addMessage(new IntWritable(i));
+    }
+
+    assertTrue(bundle.getApproximateSize() > 400
+        && bundle.getApproximateSize() < 500);
+
+    bundle = new BSPMessageBundle<IntWritable>();
+    bundle.addMessage(new IntWritable(1));
+    assertTrue(bundle.getApproximateSize() > 40
+        && bundle.getApproximateSize() < 50);
+  }
+
   public void testEmpty() throws IOException {
     BSPMessageBundle<BytesWritable> bundle = new BSPMessageBundle<BytesWritable>();
     // Serialize it.



Mime
View raw message