hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1619042 - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/
Date Wed, 20 Aug 2014 07:36:29 GMT
Author: edwardyoon
Date: Wed Aug 20 07:36:29 2014
New Revision: 1619042

URL: http://svn.apache.org/r1619042
Log:
HAMA-914: Boolean flag (isCompressed) is required only when runtime compression is enabled.

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug 20 07:36:29 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-914: Boolean flag (isCompressed) is required only when runtime compression is enabled
(edwardyoon)
    HAMA-910: Web UI Improvement (Victor Lee via edwardyoon)
    HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via edwardyoon)
    HAMA-823: Remove javadoc warnings (Victor Lee via edwardyoon)  

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Wed Aug 20 07:36:29 2014
@@ -284,6 +284,11 @@
   </property>
 
   <property>
+    <name>hama.messenger.runtime.compression</name>
+    <value>false</value>
+    <description>True if you want to enable runtime compression</description>
+  </property>
+  <property>
     <name>hama.messenger.compression.class</name>
     <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
     <description>The message compression algorithm to choose. Default is null.</description>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed Aug 20 07:36:29
2014
@@ -87,17 +87,20 @@ public class BSPMessageBundle<M extends 
     try {
       serialized = serialize(message);
 
-      if (compressor != null && serialized.length > threshold) {
-        bufferDos.writeBoolean(true);
-        compressed = compressor.compress(serialized);
-        bufferDos.writeInt(compressed.length);
-        bufferDos.write(compressed);
-
-        bundleLength += compressed.length;
+      if (compressor != null) {
+        if (serialized.length > threshold) {
+          bufferDos.writeBoolean(true);
+          compressed = compressor.compress(serialized);
+          bufferDos.writeInt(compressed.length);
+          bufferDos.write(compressed);
+          bundleLength += compressed.length;
+        } else {
+          bufferDos.writeBoolean(false);
+          bufferDos.write(serialized);
+          bundleLength += serialized.length;
+        }
       } else {
-        bufferDos.writeBoolean(false);
         bufferDos.write(serialized);
-
         bundleLength += serialized.length;
       }
     } catch (IOException e) {
@@ -114,7 +117,7 @@ public class BSPMessageBundle<M extends 
   public byte[] getBuffer() {
     return byteBuffer.toByteArray();
   }
-  
+
   public Iterator<M> iterator() {
     bis = new ByteArrayInputStream(byteBuffer.toByteArray());
     dis = new DataInputStream(bis);
@@ -140,10 +143,13 @@ public class BSPMessageBundle<M extends 
       @Override
       public M next() {
         boolean isCompressed = false;
-        try {
-          isCompressed = dis.readBoolean();
-        } catch (IOException e1) {
-          e1.printStackTrace();
+
+        if (compressor != null) {
+          try {
+            isCompressed = dis.readBoolean();
+          } catch (IOException e1) {
+            e1.printStackTrace();
+          }
         }
 
         Class<M> clazz = null;
@@ -152,10 +158,12 @@ public class BSPMessageBundle<M extends 
         } catch (ClassNotFoundException e) {
           LOG.error("Class was not found.", e);
         }
+
         msg = ReflectionUtils.newInstance(clazz, null);
 
         try {
           if (isCompressed) {
+            // LOG.debug(">>>>> decompressing .........");
             int length = dis.readInt();
             compressed = new byte[length];
             dis.readFully(compressed);
@@ -215,7 +223,7 @@ public class BSPMessageBundle<M extends 
   @Override
   public void readFields(DataInput in) throws IOException {
     this.bundleSize = in.readInt();
-    
+
     if (this.bundleSize > 0) {
       className = in.readUTF();
       int bytesLength = in.readInt();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed Aug 20 07:36:29
2014
@@ -353,8 +353,11 @@ public class LocalBSPRunner implements J
         throws IOException {
       peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
           bundle.getLength());
-      bundle.setCompressor(compressor,
-          conf.getLong("hama.messenger.compression.threshold", 512));
+
+      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+        bundle.setCompressor(compressor,
+            conf.getLong("hama.messenger.compression.threshold", 512));
+      }
 
       Iterator<M> it = bundle.iterator();
       while (it.hasNext()) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Wed Aug 20 07:36:29 2014
@@ -279,8 +279,10 @@ public abstract class AbstractMessageMan
 
   @Override
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
-    bundle.setCompressor(compressor,
-        conf.getLong("hama.messenger.compression.threshold", 128));
+    if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+      bundle.setCompressor(compressor,
+          conf.getLong("hama.messenger.compression.threshold", 128));
+    }
 
     Iterator<? extends Writable> it = bundle.iterator();
     while (it.hasNext()) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Wed Aug 20 07:36:29 2014
@@ -86,8 +86,10 @@ public class OutgoingPOJOMessageBundle<M
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
       BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
-      bundle.setCompressor(compressor,
-          conf.getLong("hama.messenger.compression.threshold", 128));
+      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+        bundle.setCompressor(compressor,
+            conf.getLong("hama.messenger.compression.threshold", 128));
+      }
       outgoingBundles.put(targetPeerAddress, bundle);
     }
     return targetPeerAddress;

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Wed Aug
20 07:36:29 2014
@@ -50,7 +50,8 @@ public class TestBSPMasterGroomServer ex
   public TestBSPMasterGroomServer() {
     configuration = new HamaConfiguration();
     configuration.set("bsp.master.address", "localhost");
-    configuration.set("hama.child.redirect.log.console", "true");
+    configuration.setBoolean("hama.child.redirect.log.console", true);
+    configuration.setBoolean("hama.messenger.runtime.compression", true);
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1619042&r1=1619041&r2=1619042&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Wed Aug 20 07:36:29 2014
@@ -119,7 +119,10 @@ public class OutgoingVertexMessagesManag
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
       BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>();
-      bundle.setCompressor(compressor, conf.getLong("hama.messenger.compression.threshold",
128));
+      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+        bundle.setCompressor(compressor,
+            conf.getLong("hama.messenger.compression.threshold", 128));
+      }
       outgoingBundles.put(targetPeerAddress, bundle);
     }
     return targetPeerAddress;



Mime
View raw message