hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1390242 - in /hama/trunk: conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/main/java/org/apache/hama/util/ core/src/test/java/or...
Date Wed, 26 Sep 2012 00:48:59 GMT
Author: edwardyoon
Date: Wed Sep 26 00:48:58 2012
New Revision: 1390242

URL: http://svn.apache.org/viewvc?rev=1390242&view=rev
Log:
Add compressor threshold.

Modified:
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
    hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Wed Sep 26 00:48:58 2012
@@ -205,6 +205,17 @@
     <name>hama.messenger.class</name>
     <value>org.apache.hama.bsp.message.HadoopMessageManagerImpl</value>
   </property>
+
+  <property>
+    <name>hama.messenger.compression.class</name>
+    <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+    <description>The message compression algorithm to choose.</description>
+  </property>
+  <property>
+    <name>hama.messenger.compression.threshold</name>
+    <value>1048576</value>
+    <description>The Compressor threshold sets the level at which compression begins.</description>
+  </property>
   
   <property>
     <name>hama.zookeeper.quorum</name>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Sep 26 00:48:58 2012
@@ -264,7 +264,8 @@ public class BSPJob extends BSPJobContex
   /**
    * Sets the compression codec that should be used to compress messages.
    */
-  public void setCompressionCodec(Class<? extends BSPMessageCompressor<?>> clazz)
{
+  @SuppressWarnings({ "rawtypes" })
+  public void setCompressionCodec(Class<? extends BSPMessageCompressor> clazz) {
     conf.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS, clazz,
         BSPMessageCompressor.class);
   }
@@ -396,4 +397,13 @@ public class BSPJob extends BSPJobContex
   protected void setCheckPointFlag(boolean enableCheckPoint) {
     conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
   }
+
+  /**
+   * Set compression threshold in bytes.
+   * 
+   * @param ct
+   */
+  public void setCompressionThreshold(long ct) {
+    conf.setLong("hama.messenger.compression.threshold", ct);
+  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed Sep 26 00:48:58
2012
@@ -17,8 +17,10 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Wed Sep 26 00:48:58 2012
@@ -96,13 +96,14 @@ public final class HadoopMessageManagerI
   @Override
   public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
-
     HadoopMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
     if (bspPeerConnection == null) {
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      if (compressor != null) {
+      if (compressor != null
+          && CompressionUtil.getBundleSize(bundle) > conf.getLong(
+              "hama.messenger.compression.threshold", 1048576)) {
         BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
         if (CompressionUtil.getCompressionRatio(compMsgBundle, bundle) < 1.0) {
           bspPeerConnection.put(compMsgBundle);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Wed
Sep 26 00:48:58 2012
@@ -35,7 +35,7 @@ public class MessageManagerFactory {
       Configuration conf) throws ClassNotFoundException {
     return (MessageManager<M>) ReflectionUtils.newInstance(conf
         .getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
-            org.apache.hama.bsp.message.AvroMessageManagerImpl.class
+            org.apache.hama.bsp.message.HadoopMessageManagerImpl.class
                 .getCanonicalName())), conf);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
Wed Sep 26 00:48:58 2012
@@ -26,7 +26,7 @@ import org.apache.hama.bsp.BSPMessageBun
  * Provides utilities for compressing and decompressing BSPMessageBundle.
  * 
  */
-public interface BSPMessageCompressor<M extends Writable> {
+public abstract class BSPMessageCompressor<M extends Writable> {
 
   public static final Log LOG = LogFactory.getLog(BSPMessageCompressor.class);
 
@@ -37,7 +37,7 @@ public interface BSPMessageCompressor<M 
    * @param bundle
    * @return
    */
-  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
+  public abstract BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
 
   /**
    * Decompresses a BSPCompressedBundle and returns the corresponding
@@ -46,5 +46,5 @@ public interface BSPMessageCompressor<M 
    * @param compMsgBundle
    * @return
    */
-  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle);
+  public abstract BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle);
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
Wed Sep 26 00:48:58 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hama.bsp.BSPMessageBundle;
 
-public class Bzip2Compressor<M extends Writable> implements
+public class Bzip2Compressor<M extends Writable> extends
     BSPMessageCompressor<M> {
 
   private final BZip2Codec codec = new BZip2Codec();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
Wed Sep 26 00:48:58 2012
@@ -28,7 +28,7 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
-public class SnappyCompressor<M extends Writable> implements
+public class SnappyCompressor<M extends Writable> extends
     BSPMessageCompressor<M> {
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java Wed Sep 26 00:48:58
2012
@@ -50,4 +50,15 @@ public class CompressionUtil {
     return (compLen / bos.toByteArray().length);
   }
 
+  public static long getBundleSize(BSPMessageBundle<?> bundle)
+      throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    bundle.write(dos);
+
+    dos.close();
+    bos.close();
+
+    return bos.toByteArray().length;
+  }
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Wed Sep
26 00:48:58 2012
@@ -32,6 +32,7 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.DiskQueue;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.examples.ClassSerializePrinting;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
@@ -84,6 +85,9 @@ public class TestBSPMasterGroomServer ex
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(Text.class);
     bsp.setOutputPath(OUTPUT_PATH);
+    
+    bsp.setCompressionCodec(SnappyCompressor.class);
+    bsp.setCompressionThreshold(40);
 
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);



Mime
View raw message