hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1663476 - in /hama/trunk: ./ conf/ core/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/test/jav...
Date Mon, 02 Mar 2015 23:33:59 GMT
Author: edwardyoon
Date: Mon Mar  2 23:33:59 2015
New Revision: 1663476

URL: http://svn.apache.org/r1663476
Log:
HAMA-928: Fix compressor bugs

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
Modified:
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/pom.xml

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Mon Mar  2 23:33:59 2015
@@ -290,15 +290,9 @@
   </property>
   <property>
     <name>hama.messenger.compression.class</name>
-    <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+    <value>org.apache.hama.bsp.message.compress.Bzip2Compressor</value>
     <description>The message compression algorithm to choose. Default is null.</description>
   </property>
-  <property>
-    <name>hama.messenger.compression.threshold</name>
-    <value>128</value>
-    <description>The Compressor threshold sets the level at which compression begins.

-    The default is 128 bytes.</description>
-  </property>
   
   <property>
     <name>hama.zookeeper.quorum</name>

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Mon Mar  2 23:33:59 2015
@@ -55,6 +55,11 @@
       <artifactId>commons-cli</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.9</version>
+    </dependency>
+    <dependency>
       <groupId>commons-configuration</groupId>
       <artifactId>commons-configuration</artifactId>
     </dependency>

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Mar  2 23:33:59 2015
@@ -164,7 +164,6 @@ public interface Constants {
 
   // Other constants
   static final String MESSENGER_RUNTIME_COMPRESSION = "hama.messenger.runtime.compression";
-  static final String MESSENGER_COMPRESSION_THRESHOLD = "hama.messenger.compression.threshold";
   
   /**
    * An empty instance.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Mon Mar  2 23:33:59 2015
@@ -422,15 +422,6 @@ public class BSPJob extends BSPJobContex
     conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
   }
 
-  /**
-   * Set compression threshold in bytes.
-   * 
-   * @param ct
-   */
-  public void setCompressionThreshold(long ct) {
-    conf.setLong("hama.messenger.compression.threshold", ct);
-  }
-
   public void setMessageQueueBehaviour(String queueBehaviour) {
     if (queueBehaviour.equals(MessageQueue.PERSISTENT_QUEUE))
       conf.setBoolean(MessageQueue.PERSISTENT_QUEUE, true);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Mar  2 23:33:59
2015
@@ -61,7 +61,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ,
MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    TOTAL_DECOMPRESSED_BYTES, SUPERSTEP_SUM, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ,
TOTAL_MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, TIME_IN_SYNC_MS, TOTAL_COMPRESSED_BYTES_TRANSFERED
   }
 
   private final HamaConfiguration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Mar  2 23:33:59
2015
@@ -350,7 +350,7 @@ public class LocalBSPRunner implements J
     @Override
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
           bundle.getLength());
 
       MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Mon Mar  2 23:33:59 2015
@@ -129,11 +129,11 @@ public final class HamaAsyncMessageManag
         bundle.write(bufferDos);
 
         byte[] compressed = compressor.compress(byteBuffer.toByteArray());
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
             compressed.length);
         bspPeerConnection.put(compressed);
       } else {
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
             bundle.getLength());
         bspPeerConnection.put(bundle);
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Mar  2 23:33:59 2015
@@ -123,20 +123,17 @@ public final class HamaMessageManagerImp
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      System.out.println(conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false));
-      
       if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
         ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
         DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
         bundle.write(bufferDos);
 
         byte[] compressed = compressor.compress(byteBuffer.toByteArray());
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
-            compressed.length);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_COMPRESSED_BYTES_TRANSFERED,
compressed.length);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_DECOMPRESSED_BYTES, byteBuffer.size());
         bspPeerConnection.put(compressed);
       } else {
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
-            bundle.getLength());
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED, bundle.getLength());
         bspPeerConnection.put(bundle);
       }
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
Mon Mar  2 23:33:59 2015
@@ -37,7 +37,7 @@ public class BSPMessageCompressorFactory
       try {
         return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
             .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
-                SnappyCompressor.class.getCanonicalName())), conf);
+                Bzip2Compressor.class.getCanonicalName())), conf);
       } catch (ClassNotFoundException e) {
         e.printStackTrace();
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
Mon Mar  2 23:33:59 2015
@@ -23,42 +23,37 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
 
 public class Bzip2Compressor<M extends Writable> extends
     BSPMessageCompressor<M> {
 
-  private final BZip2Codec codec = new BZip2Codec();
-
   @Override
   public byte[] compress(byte[] bytes) {
-    ByteArrayOutputStream bos = null;
-    CompressionOutputStream sos = null;
-    DataOutputStream dos = null;
-    byte[] compressedBytes = null;
+    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+    DataInputStream in = new DataInputStream(bis);
+
+    ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(outBuffer);
 
+    CompressorOutputStream cos = null;
     try {
-      bos = new ByteArrayOutputStream();
-      sos = codec.createOutputStream(bos);
-      dos = new DataOutputStream(sos);
-      dos.close(); // Flush the stream as no more data will be sent.
-
-      compressedBytes = bos.toByteArray();
-    } catch (IOException ioe) {
-      LOG.error("Unable to compress", ioe);
-    } finally {
-      try {
-        sos.close();
-        bos.close();
-      } catch (IOException e) {
-        LOG.warn("Failed to close compression streams.", e);
-      }
+      cos = new CompressorStreamFactory().createCompressorOutputStream("bzip2",
+          out);
+      IOUtils.copy(in, cos);
+      cos.close();
+    } catch (CompressorException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
     }
-    return compressedBytes;
+
+    return outBuffer.toByteArray();
   }
 
   /**
@@ -70,29 +65,23 @@ public class Bzip2Compressor<M extends W
    */
   @Override
   public byte[] decompress(byte[] compressedBytes) {
-    ByteArrayInputStream bis = null;
-    CompressionInputStream sis = null;
-    DataInputStream dis = null;
-    byte[] bytes = null;
+    ByteArrayInputStream bis = new ByteArrayInputStream(compressedBytes);
+    DataInputStream in = new DataInputStream(bis);
 
+    ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(outBuffer);
     try {
-      bis = new ByteArrayInputStream(compressedBytes);
-      sis = codec.createInputStream(bis);
-      dis = new DataInputStream(sis);
-      bytes = IOUtils.toByteArray(dis);
-    } catch (IOException ioe) {
-      LOG.error("Unable to decompress.", ioe);
-    } finally {
-      try {
-        dis.close();
-        sis.close();
-        bis.close();
-      } catch (IOException e) {
-        LOG.warn("Failed to close decompression streams.", e);
-      }
-    }
 
-    return bytes;
+      final CompressorInputStream cin = new CompressorStreamFactory()
+          .createCompressorInputStream("bzip2", in);
+      IOUtils.copy(cin, out);
+      in.close();
+    } catch (CompressorException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return outBuffer.toByteArray();
   }
 
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Mon Mar
 2 23:33:59 2015
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
 import org.apache.hama.examples.ClassSerializePrinting;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
@@ -85,8 +85,7 @@ public class TestBSPMasterGroomServer ex
     bsp.setOutputValueClass(Text.class);
     bsp.setOutputPath(OUTPUT_PATH);
 
-    bsp.setCompressionCodec(SnappyCompressor.class);
-    bsp.setCompressionThreshold(40);
+    bsp.setCompressionCodec(Bzip2Compressor.class);
 
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
Mon Mar  2 23:33:59 2015
@@ -22,11 +22,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Iterator;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hama.bsp.BSPMessageBundle;
 
 public class TestBSPMessageCompressor extends TestCase {
 
@@ -37,33 +39,35 @@ public class TestBSPMessageCompressor ex
 
     assertNull(compressor);
     configuration.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
-        SnappyCompressor.class, BSPMessageCompressor.class);
+        Bzip2Compressor.class, BSPMessageCompressor.class);
     compressor = new BSPMessageCompressorFactory<IntWritable>()
         .getCompressor(configuration);
 
     assertNotNull(compressor);
 
-    IntWritable a = new IntWritable(123);
-    IntWritable b = new IntWritable(321);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(bos);
-    a.write(dos);
-    b.write(dos);
+    BSPMessageBundle<IntWritable> a = new BSPMessageBundle<IntWritable>();
+    for (int i = 0; i < 10000; i++) {
+      a.addMessage(new IntWritable(i));
+    }
+
+    ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+    DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+    a.write(bufferDos);
 
-    byte[] x = bos.toByteArray();
-
-    byte[] compressed = compressor.compress(x);
+    byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+    assertTrue(byteBuffer.size() > compressed.length);
     byte[] decompressed = compressor.decompress(compressed);
 
     ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
-    DataInputStream dis = new DataInputStream(bis);
+    DataInputStream in = new DataInputStream(bis);
 
-    IntWritable c = new IntWritable();
-    c.readFields(dis);
-    assertEquals(123, c.get());
-
-    IntWritable d = new IntWritable();
-    d.readFields(dis);
-    assertEquals(321, d.get());
+    BSPMessageBundle<IntWritable> b = new BSPMessageBundle<IntWritable>();
+    b.readFields(in);
+    Iterator<IntWritable> it = b.iterator();
+    int counter = 0;
+    while (it.hasNext()) {
+      assertTrue(it.next().get() == counter);
+      counter++;
+    }
   }
 }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Mon Mar  2
23:33:59 2015
@@ -39,7 +39,6 @@ import org.apache.hama.bsp.ClusterStatus
 import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.bsp.sync.SyncException;
 
 public class PiEstimator {
@@ -117,8 +116,6 @@ public class PiEstimator {
     HamaConfiguration conf = new HamaConfiguration();
 
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
-    bsp.setCompressionCodec(SnappyCompressor.class);
-    bsp.setCompressionThreshold(40);
 
     // Set the job name
     bsp.setJobName("Pi Estimation Example");

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java Mon
Mar  2 23:33:59 2015
@@ -30,7 +30,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.ml.semiclustering.SemiClusterMessage;
 import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -52,8 +52,7 @@ public class SemiClusterJobDriver {
       InterruptedException, ClassNotFoundException {
     GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
 
-    semiClusterJob.setCompressionCodec(SnappyCompressor.class);
-    semiClusterJob.setCompressionThreshold(10);
+    semiClusterJob.setCompressionCodec(Bzip2Compressor.class);
 
     semiClusterJob
         .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
(original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
Mon Mar  2 23:33:59 2015
@@ -19,9 +19,7 @@
 package org.apache.hama.examples;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -45,7 +43,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.ml.semiclustering.SemiClusterMessage;
 import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -53,7 +51,6 @@ import org.apache.hama.ml.semiclustering
 import org.apache.hama.ml.semiclustering.SemiClusteringVertex;
 import org.junit.Test;
 
-@SuppressWarnings("unused")
 public class SemiClusterMatchingTest extends TestCase {
   private static String INPUT = "src/test/resources/semiclustering.txt";
   private static String OUTPUT = "/tmp/graph-semiCluster";
@@ -196,8 +193,7 @@ public class SemiClusterMatchingTest ext
       GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
       semiClusterJob.setMaxIteration(15);
 
-      semiClusterJob.setCompressionCodec(SnappyCompressor.class);
-      semiClusterJob.setCompressionThreshold(10);
+      semiClusterJob.setCompressionCodec(Bzip2Compressor.class);
 
       semiClusterJob
           .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Mar 
2 23:33:59 2015
@@ -36,7 +36,7 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
 import org.apache.hama.commons.io.TextArrayWritable;
 import org.apache.hama.graph.example.PageRank;
 import org.apache.hama.graph.example.PageRank.PagerankSeqReader;
@@ -85,7 +85,7 @@ public class TestSubmitGraphJob extends
     // set the defaults
     bsp.setMaxIteration(30);
 
-    bsp.setCompressionCodec(SnappyCompressor.class);
+    bsp.setCompressionCodec(Bzip2Compressor.class);
     bsp.setAggregatorClass(AverageAggregator.class);
 
     bsp.setInputFormat(SequenceFileInputFormat.class);

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Mar  2 23:33:59 2015
@@ -92,6 +92,7 @@
     <commons-lang.version>2.6</commons-lang.version>
     <commons-httpclient.version>3.0.1</commons-httpclient.version>
     <commons-io.version>2.4</commons-io.version>
+    <commons-compress.version>1.9</commons-compress.version>
     <hadoop.version>1.2.0</hadoop.version>
     <protobuf.version>2.5.0</protobuf.version>
     <jetty.version>6.1.14</jetty.version>
@@ -219,6 +220,11 @@
         <version>${commons-cli.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-compress</artifactId>
+        <version>${commons-compress.version}</version>
+      </dependency>
+      <dependency>
         <groupId>commons-configuration</groupId>
         <artifactId>commons-configuration</artifactId>
         <version>${commons-configuration.version}</version>



Mime
View raw message