hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1663195 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ graph/src/main/java/org/apache/hama/graph/
Date Mon, 02 Mar 2015 02:20:01 GMT
Author: edwardyoon
Date: Mon Mar  2 02:20:00 2015
New Revision: 1663195

URL: http://svn.apache.org/r1663195
Log:
HAMA-929: InstantiationException occurs when create a new instance of Combiner

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Mon Mar  2 02:20:00
2015
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 
 /**
  * BSPMessageBundle stores a group of messages so that they can be sent in batch
@@ -42,9 +41,6 @@ public class BSPMessageBundle<M extends
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private BSPMessageCompressor<M> compressor = null;
-  private long threshold = 128;
-
   private String className = null;
   private int bundleSize = 0;
 
@@ -55,19 +51,6 @@ public class BSPMessageBundle<M extends
     bundleSize = 0;
   }
 
-  ByteArrayOutputStream mbos = new ByteArrayOutputStream();
-  DataOutputStream mdos = new DataOutputStream(mbos);
-  ByteArrayInputStream mbis = null;
-  DataInputStream mdis = null;
-
-  public byte[] serialize(M message) throws IOException {
-    mbos.reset();
-    message.write(mdos);
-    return mbos.toByteArray();
-  }
-
-  private byte[] msgBytes;
-
   /**
    * Add message to this bundle.
    * 
@@ -79,16 +62,6 @@ public class BSPMessageBundle<M extends
         className = message.getClass().getName();
       }
 
-      if (compressor != null) {
-        msgBytes = serialize(message);
-        if (msgBytes.length > threshold) {
-          bufferDos.writeBoolean(true);
-          msgBytes = compressor.compress(msgBytes);
-          bufferDos.writeInt(msgBytes.length);
-        } else {
-          bufferDos.writeBoolean(false);
-        }
-      }
       message.write(bufferDos);
       bundleSize++;
     } catch (IOException e) {
@@ -133,18 +106,7 @@ public class BSPMessageBundle<M extends
           }
 
           msg = ReflectionUtils.newInstance(clazz, null);
-
-          if (compressor != null && dis.readBoolean()) {
-            int length = dis.readInt();
-            msgBytes = new byte[length];
-            dis.readFully(msgBytes);
-
-            mbis = new ByteArrayInputStream(compressor.decompress(msgBytes));
-            mdis = new DataInputStream(mbis);
-            msg.readFields(mdis);
-          } else {
-            msg.readFields(dis);
-          }
+          msg.readFields(dis);
 
         } catch (IOException ie) {
           LOG.error(ie);
@@ -167,11 +129,6 @@ public class BSPMessageBundle<M extends
     return bundleSize;
   }
 
-  public void setCompressor(BSPMessageCompressor<M> compressor, long threshold) {
-    this.compressor = compressor;
-    this.threshold = threshold;
-  }
-
   /**
    * @return the byte length of messages
    * @throws IOException

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Mar  2 02:20:00
2015
@@ -249,7 +249,7 @@ public final class BSPPeerImpl<K1, V1, K
     try {
       if (splitClass != null) {
         inputSplit = (InputSplit) ReflectionUtils.newInstance(
-            getConfiguration().getClassByName(splitClass), getConfiguration());
+            conf.getClassByName(splitClass), conf);
       }
     } catch (ClassNotFoundException exp) {
       IOException wrap = new IOException("Split class " + splitClass
@@ -257,7 +257,7 @@ public final class BSPPeerImpl<K1, V1, K
       wrap.initCause(exp);
       throw wrap;
     }
-    
+
     if (inputSplit != null) {
       DataInputBuffer splitBuffer = new DataInputBuffer();
       splitBuffer.reset(split.getBytes(), 0, split.getLength());

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Mar  2 02:20:00
2015
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
@@ -354,17 +353,9 @@ public class LocalBSPRunner implements J
       peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
           bundle.getLength());
 
-      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
-        bundle.setCompressor(compressor,
-            conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 512));
-      }
-
-      Iterator<M> it = bundle.iterator();
-      while (it.hasNext()) {
-        MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-            1L);
-      }
+      MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+          bundle.size());
     }
 
     @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Mon Mar  2 02:20:00 2015
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
@@ -95,7 +94,7 @@ public abstract class AbstractMessageMan
 
     this.compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
     this.outgoingMessageManager = getOutgoingMessageManager();
-    this.outgoingMessageManager.init(conf, compressor);
+    this.outgoingMessageManager.init(conf);
   }
 
   /*
@@ -256,13 +255,11 @@ public abstract class AbstractMessageMan
 
   @Override
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
-    if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
-      bundle.setCompressor(compressor,
-          conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
-    }
-
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bundle.size());
     this.localQueueForNextIteration.addBundle(bundle);
+    
+    // TODO checkpoint bundle itself instead of unpacked messages. -- edwardyoon
+    // notifyReceivedMessage(bundle);
   }
 
   @SuppressWarnings("unchecked")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
Mon Mar  2 02:20:00 2015
@@ -21,17 +21,14 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.util.BSPNetUtils;
 
 public abstract class AbstractOutgoingMessageManager<M extends Writable>
     implements OutgoingMessageManager<M> {
 
   protected HamaConfiguration conf;
-  protected BSPMessageCompressor<M> compressor;
   
   protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
   protected HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles =
 new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
@@ -48,10 +45,6 @@ public abstract class AbstractOutgoingMe
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
       BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
-      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
-        bundle.setCompressor(compressor,
-            conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
-      }
       outgoingBundles.put(targetPeerAddress, bundle);
     }
     return targetPeerAddress;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Mon Mar  2 02:20:00 2015
@@ -17,6 +17,10 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
@@ -26,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
@@ -118,9 +123,20 @@ public final class HamaAsyncMessageManag
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
-          bundle.getLength());
-      bspPeerConnection.put(bundle);
+      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
+        ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+        DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+        bundle.write(bufferDos);
+
+        byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+            compressed.length);
+        bspPeerConnection.put(compressed);
+      } else {
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+            bundle.getLength());
+        bspPeerConnection.put(bundle);
+      }
     }
   }
 
@@ -155,6 +171,18 @@ public final class HamaAsyncMessageManag
     loopBackBundle(bundle);
   }
 
+  @Override
+  public void put(byte[] compressedBundle) throws IOException {
+    byte[] decompressed = compressor.decompress(compressedBundle);
+
+    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+    DataInputStream dis = new DataInputStream(bis);
+    bundle.readFields(dis);
+
+    loopBackBundle(bundle);
+  }
+
   @Override
   public final long getProtocolVersion(String arg0, long arg1)
       throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java Mon
Mar  2 02:20:00 2015
@@ -45,5 +45,7 @@ public interface HamaMessageManager<M ex
    * @param messages
    */
   public void put(BSPMessageBundle<M> messages) throws IOException;
+  
+  public void put(byte[] compressedBundle) throws IOException;
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Mar  2 02:20:00 2015
@@ -17,6 +17,10 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
@@ -26,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
@@ -118,14 +123,27 @@ public final class HamaMessageManagerImp
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, bundle.getLength());
-      bspPeerConnection.put(bundle);
+      System.out.println(conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false));
+      
+      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
+        ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+        DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+        bundle.write(bufferDos);
+
+        byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+            compressed.length);
+        bspPeerConnection.put(compressed);
+      } else {
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+            bundle.getLength());
+        bspPeerConnection.put(bundle);
+      }
     }
   }
 
   /**
-   * @param addr socket address to which BSP Peer Connection will be
-   *          established
+   * @param addr socket address to which BSP Peer Connection will be established
    * @return BSP Peer Connection, tried to return cached connection, else
    *         returns a new connection and caches it
    * @throws IOException
@@ -155,6 +173,18 @@ public final class HamaMessageManagerImp
     loopBackBundle(bundle);
   }
 
+  @Override
+  public final void put(byte[] compressedBundle) throws IOException {
+    byte[] decompressed = compressor.decompress(compressedBundle);
+
+    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+    DataInputStream dis = new DataInputStream(bis);
+    bundle.readFields(dis);
+
+    loopBackBundle(bundle);
+  }
+
   @Override
   public final long getProtocolVersion(String arg0, long arg1)
       throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Mon Mar  2 02:20:00 2015
@@ -24,11 +24,10 @@ import java.util.Map.Entry;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 
 public interface OutgoingMessageManager<M extends Writable> {
 
-  public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor);
+  public void init(HamaConfiguration conf);
 
   public void addMessage(String peerName, M msg);
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Mon Mar  2 02:20:00 2015
@@ -22,12 +22,11 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.ReflectionUtils;
 
 public class OutgoingPOJOMessageBundle<M extends Writable> extends
     AbstractOutgoingMessageManager<M> {
@@ -36,14 +35,16 @@ public class OutgoingPOJOMessageBundle<M
 
   @SuppressWarnings("unchecked")
   @Override
-  public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor) {
+  public void init(HamaConfiguration conf) {
     this.conf = conf;
-    this.compressor = compressor;
 
     final String combinerName = conf.get(Constants.COMBINER_CLASS);
     if (combinerName != null) {
-      this.combiner = (Combiner<M>) ReflectionUtils.newInstance(
-          conf.getClass(combinerName, Combiner.class), conf);
+      try {
+        this.combiner = (Combiner<M>) ReflectionUtils.newInstance(combinerName);
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
     }
   }
 
@@ -55,8 +56,6 @@ public class OutgoingPOJOMessageBundle<M
       BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
       bundle.addMessage(msg);
       BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
-      combined.setCompressor(compressor,
-          conf.getLong("hama.messenger.compression.threshold", 128));
       combined.addMessage(combiner.combine(bundle));
       outgoingBundles.put(targetPeerAddress, combined);
     } else {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1663195&r1=1663194&r2=1663195&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Mon Mar  2 02:20:00 2015
@@ -31,7 +31,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
-import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.ReflectionUtils;
 
 public class OutgoingVertexMessageManager<M extends Writable> extends
     AbstractOutgoingMessageManager<GraphJobMessage> {
@@ -44,17 +44,16 @@ public class OutgoingVertexMessageManage
 
   @SuppressWarnings("unchecked")
   @Override
-  public void init(HamaConfiguration conf,
-      BSPMessageCompressor<GraphJobMessage> compressor) {
+  public void init(HamaConfiguration conf) {
     this.conf = conf;
-    this.compressor = compressor;
 
-    if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
-        Combiner.class)) {
-
-      combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
-          .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
-              conf);
+    final String combinerName = conf.get(Constants.COMBINER_CLASS);
+    if (combinerName != null) {
+      try {
+        combiner = (Combiner<Writable>) ReflectionUtils.newInstance(combinerName);
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
     }
   }
 



Mime
View raw message