hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1373915 - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/message/
Date Thu, 16 Aug 2012 16:51:39 GMT
Author: tjungblut
Date: Thu Aug 16 16:51:39 2012
New Revision: 1373915

URL: http://svn.apache.org/viewvc?rev=1373915&view=rev
Log:
[HAMA-593]: Improve RPC scalability (Mayank Mishra via tjungblut)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Aug 16 16:51:39 2012
@@ -34,7 +34,8 @@ Release 0.5 - April 10, 2012 
    HAMA-595: Fix NullPointerException in Task Scheduler (surajmenon)
 
   IMPROVEMENTS
-
+   
+    HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
     HAMA-584: Change Pagerank IO format to human-readable text for easy debug (tjungblut
via edwardyoon)
     HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
     HAMA-582: Task's error logs should be displayed on client-end when job is failed (edwardyoon)

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu Aug 16 16:51:39 2012
@@ -140,6 +140,16 @@
     <value>10000</value>
     <description>The default timeout period for checking groom server health.</description>
   </property>
+
+  <property>
+    <name>hama.messenger.max.cached.connections</name>
+    <value>100</value>
+    <description>This changes the maximum number of connections that are cached
+    between the peers, normally a LRU cache is used. This affects the memory
+    consumption per task and the performance. Increasing it will give you a speed-up
+    but it trades more memory.
+    </description>
+  </property>
   
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Thu Aug 16 16:51:39 2012
@@ -64,6 +64,9 @@ public abstract class AbstractMessageMan
   // the task attempt id
   protected TaskAttemptID attemptId;
 
+  // to maximum cached connections in the concrete message manager
+  protected int maxCachedConnections = 100;
+
   // List of listeners for all the sent messages
   protected Queue<MessageEventListener<M>> messageListenerQueue;
 
@@ -81,9 +84,9 @@ public abstract class AbstractMessageMan
     this.peer = peer;
     this.conf = conf;
     this.peerAddress = peerAddress;
-    localQueue = getQueue();
-    localQueueForNextIteration = getSynchronizedQueue();
-    
+    this.localQueue = getQueue();
+    this.localQueueForNextIteration = getSynchronizedQueue();
+    this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
   }
 
   /*
@@ -252,36 +255,30 @@ public abstract class AbstractMessageMan
     }
   }
 
-  
-
   @Override
   public void registerListener(MessageEventListener<M> listener)
       throws IOException {
-    if(listener != null)
+    if (listener != null)
       this.messageListenerQueue.add(listener);
-    
+
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws
IOException{
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+      throws IOException {
     for (Writable message : bundle.getMessages()) {
-      loopBackMessage((M)message);
+      loopBackMessage(message);
     }
-    
+
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public void loopBackMessage(Writable message) throws IOException{
-    this.localQueueForNextIteration.add((M)message);
+  public void loopBackMessage(Writable message) throws IOException {
+    this.localQueueForNextIteration.add((M) message);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
-    notifyReceivedMessage((M)message);
-    
+    notifyReceivedMessage((M) message);
+
   }
-  
-  
-  
-  
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
Thu Aug 16 16:51:39 2012
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -38,20 +39,39 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.util.LRUCache;
 
 public final class AvroMessageManagerImpl<M extends Writable> extends
     CompressableMessageManager<M> implements Sender<M> {
 
   private NettyServer server = null;
 
+  // also cache the senders, getting a new sender from a transceiver generates
+  // exceptions
   private final HashMap<InetSocketAddress, Sender<M>> peers = new HashMap<InetSocketAddress,
Sender<M>>();
+  private LRUCache<InetSocketAddress, NettyTransceiver> peersLRUCache;
 
+  @SuppressWarnings("serial")
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress addr) {
     super.init(attemptId, peer, conf, addr);
     super.initCompression(conf);
     server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
+    peersLRUCache = new LRUCache<InetSocketAddress, NettyTransceiver>(
+        maxCachedConnections) {
+      @Override
+      protected final boolean removeEldestEntry(
+          Map.Entry<InetSocketAddress, NettyTransceiver> eldest) {
+        if (size() > this.capacity) {
+          NettyTransceiver client = eldest.getValue();
+          client.close();
+          peers.remove(eldest.getKey());
+          return true;
+        }
+        return false;
+      }
+    };
   }
 
   @Override
@@ -64,21 +84,32 @@ public final class AvroMessageManagerImp
     this.loopBackMessages(messages);
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
     AvroBSPMessageBundle<M> msg = new AvroBSPMessageBundle<M>();
     msg.setData(serializeMessage(bundle));
-    Sender<M> sender = peers.get(addr);
+    Sender<M> sender = getSender(addr);
+    sender.transfer(msg);
+  }
 
-    if (sender == null) {
-      NettyTransceiver client = new NettyTransceiver(addr);
-      sender = SpecificRequestor.getClient(Sender.class, client);
+  /**
+   * @param addr, socket address to which BSP Peer Connection will be
+   *          established
+   * @return BSP Peer Connection, tried to return cached connection, else
+   *         returns a new connection and caches it
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  private final Sender<M> getSender(InetSocketAddress addr) throws IOException {
+    NettyTransceiver client = peersLRUCache.get(addr);
+    if (client == null) {
+      client = new NettyTransceiver(addr);
+      Sender<M> sender = SpecificRequestor.getClient(Sender.class, client);
+      peersLRUCache.put(addr, client);
       peers.put(addr, sender);
     }
-
-    sender.transfer(msg);
+    return peers.get(addr);
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Thu Aug 16 16:51:39 2012
@@ -27,7 +27,8 @@ import org.apache.hama.bsp.message.compr
  * 
  * @param <M>
  */
-public abstract class CompressableMessageManager<M extends Writable> extends AbstractMessageManager<M>
{
+public abstract class CompressableMessageManager<M extends Writable> extends
+    AbstractMessageManager<M> {
 
   protected BSPMessageCompressor<M> compressor;
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Thu Aug 16 16:51:39 2012
@@ -19,7 +19,7 @@ package org.apache.hama.bsp.message;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +33,7 @@ import org.apache.hama.bsp.TaskAttemptID
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.CompressionUtil;
+import org.apache.hama.util.LRUCache;
 
 /**
  * Implementation of the {@link HadoopMessageManager}.
@@ -44,16 +45,30 @@ public final class HadoopMessageManagerI
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
 
-  private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers = new
HashMap<InetSocketAddress, HadoopMessageManager<M>>();
-
   private Server server = null;
 
+  private LRUCache<InetSocketAddress, HadoopMessageManager<M>> peersLRUCache
= null;
+
+  @SuppressWarnings("serial")
   @Override
   public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
     super.init(attemptId, peer, conf, peerAddress);
     super.initCompression(conf);
     startRPCServer(conf, peerAddress);
+    peersLRUCache = new LRUCache<InetSocketAddress, HadoopMessageManager<M>>(
+        maxCachedConnections) {
+      @Override
+      protected final boolean removeEldestEntry(
+          Map.Entry<InetSocketAddress, HadoopMessageManager<M>> eldest) {
+        if (size() > this.capacity) {
+          HadoopMessageManager<M> proxy = eldest.getValue();
+          RPC.stopProxy(proxy);
+          return true;
+        }
+        return false;
+      }
+    };
   }
 
   private final void startRPCServer(Configuration conf,
@@ -83,7 +98,6 @@ public final class HadoopMessageManagerI
       throws IOException {
 
     HadoopMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
-
     if (bspPeerConnection == null) {
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
@@ -101,16 +115,26 @@ public final class HadoopMessageManagerI
     }
   }
 
+  /**
+   * @param addr, socket address to which BSP Peer Connection will be
+   *          established
+   * @return BSP Peer Connection, tried to return cached connection, else
+   *         returns a new connection and caches it
+   * @throws IOException
+   */
   @SuppressWarnings("unchecked")
   protected final HadoopMessageManager<M> getBSPPeerConnection(
       InetSocketAddress addr) throws IOException {
-    HadoopMessageManager<M> peer = peers.get(addr);
-    if (peer == null) {
-      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
-          HamaRPCProtocolVersion.versionID, addr, this.conf);
-      this.peers.put(addr, peer);
+    HadoopMessageManager<M> bspPeerConnection;
+    if (!peersLRUCache.containsKey(addr)) {
+      bspPeerConnection = (HadoopMessageManager<M>) RPC.getProxy(
+          HadoopMessageManager.class, HamaRPCProtocolVersion.versionID, addr,
+          this.conf);
+      peersLRUCache.put(addr, bspPeerConnection);
+    } else {
+      bspPeerConnection = peersLRUCache.get(addr);
     }
-    return peer;
+    return bspPeerConnection;
   }
 
   @Override
@@ -135,5 +159,4 @@ public final class HadoopMessageManagerI
     return versionID;
   }
 
-  
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java Thu Aug 16
16:51:39 2012
@@ -100,7 +100,7 @@ public final class MemoryQueue<M extends
 
   @Override
   public void prepareWrite() {
-    
+
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Thu Aug
16 16:51:39 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.TaskAttemptID
 public interface MessageManager<M extends Writable> {
 
   public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
+  public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections";
 
   /**
    * Init can be used to start servers and initialize internal state. If you are
@@ -99,8 +100,9 @@ public interface MessageManager<M extend
   /**
    * Send the messages to self to receive in the next superstep.
    */
-  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws
IOException;
-  
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+      throws IOException;
+
   /**
    * Send the message to self to receive in the next superstep.
    */
@@ -115,6 +117,5 @@ public interface MessageManager<M extend
    */
   public void registerListener(MessageEventListener<M> listener)
       throws IOException;
-  
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1373915&r1=1373914&r2=1373915&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Thu Aug 16 16:51:39
2012
@@ -20,7 +20,7 @@ package org.apache.hama.bsp.message;
 import org.apache.hadoop.io.Writable;
 
 public interface Sender<M extends Writable> {
-  
+
   public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol
       .parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}");
 



Mime
View raw message