hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1560236 - in /hama/trunk/core/src/main/java/org/apache/hama/bsp/message: AbstractMessageManager.java OutgoingMessageManager.java OutgoingPOJOMessageBundle.java
Date Wed, 22 Jan 2014 02:14:22 GMT
Author: edwardyoon
Date: Wed Jan 22 02:14:22 2014
New Revision: 1560236

URL: http://svn.apache.org/r1560236
Log:
Minor refactor.

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Wed Jan 22 02:14:22 2014
@@ -30,11 +30,9 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
-import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
@@ -80,7 +78,6 @@ public abstract class AbstractMessageMan
    * TaskAttemptID, org.apache.hama.bsp.BSPPeer,
    * org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
    */
-  @SuppressWarnings("unchecked")
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
@@ -92,18 +89,7 @@ public abstract class AbstractMessageMan
     this.localQueueForNextIteration = getSynchronizedReceiverQueue();
     this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
     this.outgoingMessageManager = getOutgoingMessageManager();
-
-    final String combinerName = conf.get(Constants.COMBINER_CLASS);
-    if (combinerName != null) {
-      try {
-        Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
-            .getClassByName(combinerName));
-        this.outgoingMessageManager.setCombiner(combiner);
-      } catch (ClassNotFoundException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-    }
+    this.outgoingMessageManager.init(conf);
   }
 
   /*

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Wed Jan 22 02:14:22 2014
@@ -21,18 +21,19 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.Combiner;
 
 public interface OutgoingMessageManager<M extends Writable> {
 
-  public void setCombiner(Combiner<M> combiner);
-
+  public void init(Configuration conf);
+  
   public void addMessage(String peerName, M msg);
 
   public void clear();
 
   public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator();
 
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1560236&r1=1560235&r2=1560236&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Wed Jan 22 02:14:22 2014
@@ -22,10 +22,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.ReflectionUtils;
 
 public class OutgoingPOJOMessageBundle<M extends Writable> implements
     OutgoingMessageManager<M> {
@@ -34,13 +37,38 @@ public class OutgoingPOJOMessageBundle<M
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
   private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new
HashMap<InetSocketAddress, BSPMessageBundle<M>>();
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void setCombiner(Combiner<M> combiner) {
-    this.combiner = combiner;
+  public void init(Configuration conf) {
+    final String combinerName = conf.get(Constants.COMBINER_CLASS);
+    if (combinerName != null) {
+      try {
+        Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+            .getClassByName(combinerName));
+        this.combiner = combiner;
+      } catch (ClassNotFoundException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
   }
 
   @Override
   public void addMessage(String peerName, M msg) {
+    InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+    if (combiner != null) {
+      BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
+      bundle.addMessage(msg);
+      BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+      combined.addMessage(combiner.combine(bundle.getMessages()));
+      outgoingBundles.put(targetPeerAddress, combined);
+    } else {
+      outgoingBundles.get(targetPeerAddress).addMessage(msg);
+    }
+  }
+
+  private InetSocketAddress getSocketAddress(String peerName) {
     InetSocketAddress targetPeerAddress = null;
     // Get socket for target peer.
     if (peerSocketCache.containsKey(peerName)) {
@@ -53,16 +81,7 @@ public class OutgoingPOJOMessageBundle<M
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
       outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
     }
-
-    if (combiner != null) {
-      BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
-      bundle.addMessage(msg);
-      BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
-      combined.addMessage(combiner.combine(bundle.getMessages()));
-      outgoingBundles.put(targetPeerAddress, combined);
-    } else {
-      outgoingBundles.get(targetPeerAddress).addMessage(msg);
-    }
+    return targetPeerAddress;
   }
 
   @Override



Mime
View raw message