incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1291977 - in /incubator/hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/message/c...
Date Tue, 21 Feb 2012 20:09:01 GMT
Author: tjungblut
Date: Tue Feb 21 20:09:00 2012
New Revision: 1291977

URL: http://svn.apache.org/viewvc?rev=1291977&view=rev
Log:
[HAMA-367]: Runtime Compression of BSP Messages to Improve the Performance


Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java   (with
props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java
  (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
  (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/pom.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
    incubator/hama/trunk/pom.xml

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Feb 21 20:09:00 2012
@@ -4,6 +4,7 @@ Release 0.5 - Unreleased
 
   NEW FEATURES
 
+   HAMA-367: Runtime Compression of BSP Messages to Improve the Performance (Apurv Verma
via tjungblut)
    HAMA-501: Add Avro RPC (tjungblut)
    HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
 

Modified: incubator/hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/pom.xml (original)
+++ incubator/hama/trunk/core/pom.xml Tue Feb 21 20:09:00 2012
@@ -41,6 +41,16 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
+      <version>0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>10.0.1</version>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
Tue Feb 21 20:09:00 2012
@@ -23,8 +23,9 @@ import org.apache.avro.specific.Specific
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hadoop.io.Writable;
 
-public final class AvroBSPMessageBundle<M extends Writable> extends SpecificRecordBase
implements
-    SpecificRecord {
+@SuppressWarnings("deprecation")
+public final class AvroBSPMessageBundle<M extends Writable> extends
+    SpecificRecordBase implements SpecificRecord {
   public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
       .parse("{\"type\":\"record\",\"name\":\"AvroBSPMessage\",\"namespace\":\"de.jungblut.avro\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}");
   @Deprecated
@@ -87,16 +88,17 @@ public final class AvroBSPMessageBundle<
    * AvroBSPMessage instance
    */
   public final static AvroBSPMessageBundle.Builder newBuilder(
-      AvroBSPMessageBundle other) {
+      AvroBSPMessageBundle<?> other) {
     return new AvroBSPMessageBundle.Builder(other);
   }
 
   /**
    * RecordBuilder for AvroBSPMessage instances.
    */
-  public final static class Builder extends
-      org.apache.avro.specific.SpecificRecordBuilderBase<AvroBSPMessageBundle>
-      implements org.apache.avro.data.RecordBuilder<AvroBSPMessageBundle> {
+  public final static class Builder
+      extends
+      org.apache.avro.specific.SpecificRecordBuilderBase<AvroBSPMessageBundle<?>>
+      implements org.apache.avro.data.RecordBuilder<AvroBSPMessageBundle<?>>
{
 
     private java.nio.ByteBuffer data;
 
@@ -111,7 +113,7 @@ public final class AvroBSPMessageBundle<
     }
 
     /** Creates a Builder by copying an existing AvroBSPMessage instance */
-    private Builder(AvroBSPMessageBundle other) {
+    private Builder(AvroBSPMessageBundle<?> other) {
       super(AvroBSPMessageBundle.SCHEMA$);
       if (isValidValue(fields[0], other.data)) {
         data = (java.nio.ByteBuffer) clone(other.data);
@@ -154,9 +156,10 @@ public final class AvroBSPMessageBundle<
     }
 
     @Override
-    public final AvroBSPMessageBundle build() {
+    public final AvroBSPMessageBundle<?> build() {
       try {
-        AvroBSPMessageBundle record = new AvroBSPMessageBundle();
+        @SuppressWarnings("rawtypes")
+        AvroBSPMessageBundle<?> record = new AvroBSPMessageBundle();
         record.data = fieldSetFlags[0] ? this.data
             : (java.nio.ByteBuffer) getDefaultValue(fields[0]);
         return record;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
Tue Feb 21 20:09:00 2012
@@ -36,17 +36,14 @@ import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.util.BSPNetUtils;
 
-public class AvroMessageManagerImpl<M extends Writable> implements MessageManager<M>,
Sender<M> {
-
-  private static final Log LOG = LogFactory
-      .getLog(AvroMessageManagerImpl.class);
+public class AvroMessageManagerImpl<M extends Writable> extends
+    CompressableMessageManager<M> implements Sender<M> {
 
   private NettyServer server = null;
 
@@ -60,6 +57,7 @@ public class AvroMessageManagerImpl<M ex
 
   @Override
   public void init(Configuration conf, InetSocketAddress addr) {
+    super.initCompression(conf);
     server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
   }
 
@@ -86,6 +84,7 @@ public class AvroMessageManagerImpl<M ex
     return localQueue.size();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
@@ -98,7 +97,7 @@ public class AvroMessageManagerImpl<M ex
       sender = (Sender<M>) SpecificRequestor.getClient(Sender.class, client);
       peers.put(addr, sender);
     }
-    
+
     sender.transfer(msg);
   }
 
@@ -122,7 +121,6 @@ public class AvroMessageManagerImpl<M ex
 
   @Override
   public void send(String peerName, M msg) throws IOException {
-    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
     if (peerSocketCache.containsKey(peerName)) {
@@ -142,22 +140,30 @@ public class AvroMessageManagerImpl<M ex
   private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
       throws IOException {
     BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
-
-    ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
-    DataInputStream in = new DataInputStream(inArray);
-    msg.readFields(in);
+    if (compressor == null) {
+      ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+      DataInputStream in = new DataInputStream(inArray);
+      msg.readFields(in);
+    } else {
+      msg = compressor
+          .decompressBundle(new BSPCompressedBundle(buffer.array()));
+    }
 
     return msg;
   }
 
   private final ByteBuffer serializeMessage(BSPMessageBundle<M> msg)
       throws IOException {
-    ByteArrayOutputStream outArray = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(outArray);
-    msg.write(out);
-    out.close();
-    System.out.println("serialized " + outArray.size() + " bytes");
-    return ByteBuffer.wrap(outArray.toByteArray());
+    if (compressor == null) {
+      ByteArrayOutputStream outArray = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(outArray);
+      msg.write(out);
+      out.close();
+      return ByteBuffer.wrap(outArray.toByteArray());
+    } else {
+      BSPCompressedBundle compMsgBundle = compressor.compressBundle(msg);
+      return ByteBuffer.wrap(compMsgBundle.getData());
+    }
   }
 
   @Override

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
+
+/**
+ * Abstract message layer that can be used to compress messages.
+ * 
+ * @param <M>
+ */
+public abstract class CompressableMessageManager<M extends Writable> implements
+    MessageManager<M> {
+
+  protected BSPMessageCompressor<M> compressor;
+
+  protected void initCompression(Configuration conf) {
+    compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
Tue Feb 21 20:09:00 2012
@@ -19,13 +19,15 @@ package org.apache.hama.bsp.message;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 
 /**
  * Hadoop RPC Interface for messaging.
  * 
  */
-public interface HadoopMessageManager<M extends Writable> extends HamaRPCProtocolVersion
{
+public interface HadoopMessageManager<M extends Writable> extends
+    HamaRPCProtocolVersion {
 
   /**
    * This method puts a message for the next iteration. Accessed concurrently
@@ -43,4 +45,12 @@ public interface HadoopMessageManager<M 
    */
   public void put(BSPMessageBundle<M> messages);
 
+  /**
+   * This method puts a compressed message bundle for the next iteration.
+   * Accessed concurrently from protocol, this must be sychronized internally.
+   * 
+   * @param compMsgBundle
+   */
+  public void put(BSPCompressedBundle compMsgBundle);
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Tue Feb 21 20:09:00 2012
@@ -33,14 +33,16 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.CompressionUtil;
 
 /**
  * Implementation of the {@link HadoopMessageManager}.
  * 
  */
-public final class HadoopMessageManagerImpl<M extends Writable> implements MessageManager<M>,
-    HadoopMessageManager<M> {
+public final class HadoopMessageManagerImpl<M extends Writable> extends
+    CompressableMessageManager<M> implements HadoopMessageManager<M> {
 
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
@@ -48,7 +50,7 @@ public final class HadoopMessageManagerI
   private Server server = null;
   private Configuration conf;
 
-  private final HashMap<InetSocketAddress, HadoopMessageManager> peers = new HashMap<InetSocketAddress,
HadoopMessageManager>();
+  private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers = new
HashMap<InetSocketAddress, HadoopMessageManager<M>>();
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
 
   private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new
HashMap<InetSocketAddress, LinkedList<M>>();
@@ -59,6 +61,7 @@ public final class HadoopMessageManagerI
   @Override
   public final void init(Configuration conf, InetSocketAddress peerAddress) {
     this.conf = conf;
+    super.initCompression(conf);
     startRPCServer(conf, peerAddress);
   }
 
@@ -112,11 +115,12 @@ public final class HadoopMessageManagerI
     return this.outgoingQueues.entrySet().iterator();
   }
 
-  protected final HadoopMessageManager getBSPPeerConnection(
+  @SuppressWarnings("unchecked")
+  protected final HadoopMessageManager<M> getBSPPeerConnection(
       InetSocketAddress addr) throws IOException {
-    HadoopMessageManager peer = peers.get(addr);
+    HadoopMessageManager<M> peer = peers.get(addr);
     if (peer == null) {
-      peer = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class,
+      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
           HadoopMessageManager.versionID, addr, this.conf);
       this.peers.put(addr, peer);
     }
@@ -127,13 +131,22 @@ public final class HadoopMessageManagerI
   public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
 
-    HadoopMessageManager bspPeerConnection = this.getBSPPeerConnection(addr);
+    HadoopMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
 
     if (bspPeerConnection == null) {
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      bspPeerConnection.put(bundle);
+      if (compressor != null) {
+        BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
+        if (CompressionUtil.getCompressionRatio(compMsgBundle, bundle) < 1.0) {
+          bspPeerConnection.put(compMsgBundle);
+        } else {
+          bspPeerConnection.put(bundle);
+        }
+      } else {
+        bspPeerConnection.put(bundle);
+      }
     }
   }
 
@@ -157,6 +170,14 @@ public final class HadoopMessageManagerI
   }
 
   @Override
+  public final void put(BSPCompressedBundle compMsgBundle) {
+    BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
+    for (M message : bundle.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
   public final int getNumCurrentMessages() {
     return localQueue.size();
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
Tue Feb 21 20:09:00 2012
@@ -22,7 +22,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class MessageManagerFactory<M extends Writable> {
-  public static final String MESSAGE_MANAGER_CLASS = "hama.messanger.class";
+  public static final String MESSAGE_MANAGER_CLASS = "hama.messenger.class";
 
   /**
    * Returns a messenger via reflection based on what was configured.
@@ -30,6 +30,7 @@ public class MessageManagerFactory<M ext
    * @param conf
    * @return
    */
+  @SuppressWarnings("unchecked")
   public MessageManager<M> getMessageManager(Configuration conf)
       throws ClassNotFoundException {
     return (MessageManager<M>) ReflectionUtils.newInstance(conf

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A compressed representation of BSPMessageBundle.
+ * 
+ */
+public final class BSPCompressedBundle implements Writable{
+
+  private byte[] data;
+
+  public BSPCompressedBundle(){		
+  }
+
+  public BSPCompressedBundle(byte[] data){
+    this.data = data;
+  }
+
+  public byte[] getData() {
+    return data;
+  }
+
+  public void setData(byte[] data) {
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {		
+    int len = in.readInt();
+    data    = new byte[len];
+    in.readFully(data);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPCompressedBundle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+/**
+ * Provides utilities for compressing and decompressing BSPMessageBundle.
+ * 
+ */
+public interface BSPMessageCompressor<M extends Writable> {
+
+  public static final Log LOG = LogFactory.getLog(BSPMessageCompressor.class);
+
+  /**
+   * Compresses the BSPMessageBundle and returns it in form of a
+   * BSPCompressedBundle.
+   * 
+   * @param bundle
+   * @return
+   */
+  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
+
+  /**
+   * Decompresses a BSPCompressedBundle and returns the corresponding
+   * BSPMessageBundle.
+   * 
+   * @param compMsgBundle
+   * @return
+   */
+  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle);
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BSPMessageCompressorFactory<M extends Writable> {
+
+  public static final String COMPRESSION_CODEC_CLASS = "hama.messenger.compression.class";
+
+  /**
+   * Returns a compressor via reflection based on what was configured.
+   * 
+   * @param conf
+   * @return
+   */
+  @SuppressWarnings("unchecked")
+  public BSPMessageCompressor<M> getCompressor(Configuration conf) {
+    try {
+      return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
+          .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
+              SnappyCompressor.class.getCanonicalName())), conf);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+public class Bzip2Compressor<M extends Writable> implements
+    BSPMessageCompressor<M> {
+
+  private final BZip2Codec codec = new BZip2Codec();
+
+  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
+    BSPCompressedBundle compMsgBundle = null;
+    ByteArrayOutputStream bos = null;
+    CompressionOutputStream sos = null;
+    DataOutputStream dos = null;
+
+    try {
+      bos = new ByteArrayOutputStream();
+      sos = codec.createOutputStream(bos);
+      dos = new DataOutputStream(sos);
+
+      bundle.write(dos);
+      dos.close(); // Flush the stream as no more data will be sent.
+
+      byte[] data = bos.toByteArray();
+      compMsgBundle = new BSPCompressedBundle(data);
+
+    } catch (IOException ioe) {
+      LOG.error("Unable to compress", ioe);
+    } finally {
+      try {
+        sos.close();
+        bos.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close compression streams.", e);
+      }
+    }
+    return compMsgBundle;
+  }
+
+  /**
+   * Decompresses a BSPCompressedBundle and returns the corresponding
+   * BSPMessageBundle.
+   * 
+   * @param compMsgBundle
+   * @return
+   */
+  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+    ByteArrayInputStream bis = null;
+    CompressionInputStream sis = null;
+    DataInputStream dis = null;
+    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+
+    try {
+      byte[] data = compMsgBundle.getData();
+      bis = new ByteArrayInputStream(data);
+      sis = codec.createInputStream(bis);
+      dis = new DataInputStream(sis);
+
+      bundle.readFields(dis);
+
+    } catch (IOException ioe) {
+      LOG.error("Unable to decompress.", ioe);
+    } finally {
+      try {
+        dis.close();
+        sis.close();
+        bis.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close decompression streams.", e);
+      }
+    }
+
+    return bundle;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+public class SnappyCompressor<M extends Writable> implements
+    BSPMessageCompressor<M> {
+
+  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
+    BSPCompressedBundle compMsgBundle = null;
+    ByteArrayOutputStream bos = null;
+    SnappyOutputStream sos = null;
+    DataOutputStream dos = null;
+
+    try {
+      bos = new ByteArrayOutputStream();
+      sos = new SnappyOutputStream(bos);
+      dos = new DataOutputStream(sos);
+
+      bundle.write(dos);
+      dos.close(); // Flush the stream as no more data will be sent.
+
+      byte[] data = bos.toByteArray();
+      compMsgBundle = new BSPCompressedBundle(data);
+
+    } catch (IOException ioe) {
+      LOG.error("Unable to compress", ioe);
+    } finally {
+      try {
+        sos.close();
+        bos.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close compression streams.", e);
+      }
+    }
+    return compMsgBundle;
+  }
+
+  /**
+   * Decompresses a BSPCompressedBundle and returns the corresponding
+   * BSPMessageBundle.
+   * 
+   * @param compMsgBundle
+   * @return
+   */
+  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+    ByteArrayInputStream bis = null;
+    SnappyInputStream sis = null;
+    DataInputStream dis = null;
+    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+
+    try {
+      byte[] data = compMsgBundle.getData();
+      bis = new ByteArrayInputStream(data);
+      sis = new SnappyInputStream(bis);
+      dis = new DataInputStream(sis);
+
+      bundle.readFields(dis);
+
+    } catch (IOException ioe) {
+      LOG.error("Unable to decompress.", ioe);
+    } finally {
+      try {
+        dis.close();
+        sis.close();
+        bis.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close decompression streams.", e);
+      }
+    }
+
+    return bundle;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java Tue
Feb 21 20:09:00 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+
+public class CompressionUtil {
+
+  /**
+   * Calculates the compression ratio. A compression ratio of less than 1 is
+   * desirable.
+   * 
+   * @param compMsgBundle
+   * @param bundle
+   * @return
+   * @throws IOException
+   */
+  public static float getCompressionRatio(BSPCompressedBundle compMsgBundle,
+      BSPMessageBundle<?> bundle) throws IOException {
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    bundle.write(dos);
+
+    dos.close();
+    bos.close();
+
+    float compLen = compMsgBundle.getData().length;
+
+    return (compLen / bos.toByteArray().length);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java
(added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.compress;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+public class TestBSPCompressBundle extends TestCase {
+
+  public void testBundle() throws Exception {
+    BSPCompressedBundle bundle = new BSPCompressedBundle(
+        "Hello World!".getBytes());
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    bundle.write(dos);
+    byte[] byteArray = bos.toByteArray();
+    ByteArrayInputStream in = new ByteArrayInputStream(byteArray);
+    DataInputStream dis = new DataInputStream(in);
+    BSPCompressedBundle b = new BSPCompressedBundle();
+    b.readFields(dis);
+
+    String string = new String(b.getData());
+    assertEquals("Hello World!", string);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPCompressBundle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1291977&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
(added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
Tue Feb 21 20:09:00 2012
@@ -0,0 +1,37 @@
+package org.apache.hama.bsp.message.compress;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.IntegerMessage;
+
+public class TestBSPMessageCompressor extends TestCase {
+
+  public void testCompression() {
+    BSPMessageCompressor<IntegerMessage> compressor = new BSPMessageCompressorFactory<IntegerMessage>()
+        .getCompressor(new Configuration());
+
+    int n = 20;
+    BSPMessageBundle<IntegerMessage> bundle = new BSPMessageBundle<IntegerMessage>();
+    IntegerMessage[] dmsg = new IntegerMessage[n];
+
+    for (int i = 1; i <= n; i++) {
+      dmsg[i - 1] = new IntegerMessage("" + i, i);
+      bundle.addMessage(dmsg[i - 1]);
+    }
+
+    BSPCompressedBundle compBundle = compressor.compressBundle(bundle);
+    BSPMessageBundle<IntegerMessage> uncompBundle = compressor
+        .decompressBundle(compBundle);
+
+    int i = 1;
+    for (BSPMessage msg : uncompBundle.getMessages()) {
+      assertEquals(msg.getData(), i);
+      i++;
+    }
+
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1291977&r1=1291976&r2=1291977&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Tue Feb 21 20:09:00 2012
@@ -101,6 +101,16 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.iq80.snappy</groupId>
+        <artifactId>snappy</artifactId>
+        <version>0.2</version>
+      </dependency>
+      <dependency>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+        <version>10.0.1</version> 
+      </dependency>    
+      <dependency>
         <groupId>commons-logging</groupId>
         <artifactId>commons-logging</artifactId>
         <version>${commons-logging.version}</version>



Mime
View raw message