incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1330190 - in /incubator/hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/message/
Date Wed, 25 Apr 2012 10:51:16 GMT
Author: edwardyoon
Date: Wed Apr 25 10:51:14 2012
New Revision: 1330190

URL: http://svn.apache.org/viewvc?rev=1330190&view=rev
Log:
rollback to r.1330131

Removed:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Apr 25 10:51:14 2012
@@ -16,7 +16,6 @@ Release 0.5 - April 10, 2012 
 
   IMPROVEMENTS
 
-    HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
     HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
     HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
     HAMA-555: Separate bin and src distributions (edwardyoon)

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Wed Apr 25 10:51:14 2012
@@ -81,11 +81,6 @@
     <description>Temporary directory on the local filesystem.</description>
   </property>
   <property>
-    <name>bsp.disk.queue.dir</name>
-    <value>${hama.tmp.dir}/messages/</value>
-    <description>Temporary directory on the local message buffer on disk.</description>
-  </property>
-  <property>
     <name>bsp.child.java.opts</name>
     <value>-Xmx512m</value>
     <description>Java opts for the groom server child processes.  

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Wed Apr 25
10:51:14 2012
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +37,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -165,12 +165,12 @@ public final class BSPPeerImpl<K1, V1, K
         TaskStatus.Phase.STARTING, counters));
 
     messenger = MessageManagerFactory.getMessageManager(conf);
-    messenger.init(taskId, this, conf, peerAddress);
+    messenger.init(this, conf, peerAddress);
 
     final String combinerName = conf.get("bsp.combiner.class");
     if (combinerName != null) {
-      combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
-          .getClassByName(combinerName), conf);
+      combiner = (Combiner<M>) ReflectionUtils.newInstance(
+          conf.getClassByName(combinerName), conf);
     }
 
   }
@@ -184,8 +184,8 @@ public final class BSPPeerImpl<K1, V1, K
 
     String outdir = null;
     if (conf.get("bsp.output.dir") != null) {
-      Path outputDir = new Path(conf.get("bsp.output.dir", "tmp-"
-          + System.currentTimeMillis()), Task.getOutputName(partition));
+      Path outputDir = new Path(conf.get("bsp.output.dir",
+          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
       outdir = outputDir.makeQualified(fs).toString();
     }
     outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
@@ -247,11 +247,12 @@ public final class BSPPeerImpl<K1, V1, K
 
     checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
     if (LOG.isDebugEnabled())
-      LOG.debug(new StringBuffer(1000).append("Enabled = ").append(
-          conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)).append(
-          " checkPointInterval = ").append(checkPointInterval).append(
-          " lastCheckPointStep = ").append(lastCheckPointStep).append(
-          " getSuperstepCount() = ").append(getSuperstepCount()).toString());
+      LOG.debug(new StringBuffer(1000).append("Enabled = ")
+          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+          .append(" checkPointInterval = ").append(checkPointInterval)
+          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+          .append(" getSuperstepCount() = ").append(getSuperstepCount())
+          .toString());
 
     return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
         && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep))
>= checkPointInterval));
@@ -293,9 +294,7 @@ public final class BSPPeerImpl<K1, V1, K
       InterruptedException {
     long startBarrier = System.currentTimeMillis();
     enterBarrier();
-    // normally all messages should been send now, finalizing the send phase
-    messenger.finishSendPhase();
-    Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
+    Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
         .getMessageIterator();
 
     boolean shouldCheckPoint = false;
@@ -305,7 +304,7 @@ public final class BSPPeerImpl<K1, V1, K
     }
 
     while (it.hasNext()) {
-      Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
+      Entry<InetSocketAddress, LinkedList<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
       final Iterable<M> messages = entry.getValue();
 
@@ -347,44 +346,27 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   protected final void enterBarrier() throws SyncException {
-    syncClient.enterBarrier(taskId.getJobID(), taskId, currentTaskStatus
-        .getSuperstepCount());
+    syncClient.enterBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
   protected final void leaveBarrier() throws SyncException {
-    syncClient.leaveBarrier(taskId.getJobID(), taskId, currentTaskStatus
-        .getSuperstepCount());
+    syncClient.leaveBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
   public final void close() throws SyncException, IOException,
       InterruptedException {
-    // there are many catches, because we want to close always every component
-    // even if the one before failed.
     if (in != null) {
-      try {
-        in.close();
-      } catch (Exception e) {
-        LOG.error(e);
-      }
+      in.close();
     }
     if (outWriter != null) {
-      try {
-        outWriter.close();
-      } catch (Exception e) {
-        LOG.error(e);
-      }
+      outWriter.close();
     }
     this.clear();
-    try {
-      syncClient.close();
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    try {
-      messenger.close();
-    } catch (Exception e) {
-      LOG.error(e);
-    }
+    syncClient.close();
+
+    messenger.close();
   }
 
   @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed Apr
25 10:51:14 2012
@@ -45,10 +45,8 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
-import org.apache.hama.bsp.message.MemoryQueue;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -334,15 +332,14 @@ public class LocalBSPRunner implements J
     @SuppressWarnings("rawtypes")
     private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager>
managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
-    private final HashMap<InetSocketAddress, MessageQueue<M>> localOutgoingMessages
= new HashMap<InetSocketAddress, MessageQueue<M>>();
+    private final HashMap<InetSocketAddress, LinkedList<M>> localOutgoingMessages
= new HashMap<InetSocketAddress, LinkedList<M>>();
     private static final ConcurrentHashMap<String, InetSocketAddress> socketCache =
new ConcurrentHashMap<String, InetSocketAddress>();
     private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
 
     private BSPPeer<?, ?, ?, ?, M> peer;
 
     @Override
-    public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-        Configuration conf, InetSocketAddress peerAddress) {
+    public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress
peerAddress) {
       this.peer = peer;
       managerMap.put(peerAddress, this);
     }
@@ -368,9 +365,9 @@ public class LocalBSPRunner implements J
         inetSocketAddress = BSPNetUtils.getAddress(peerName);
         socketCache.put(peerName, inetSocketAddress);
       }
-      MessageQueue<M> msgs = localOutgoingMessages.get(inetSocketAddress);
+      LinkedList<M> msgs = localOutgoingMessages.get(inetSocketAddress);
       if (msgs == null) {
-        msgs = new MemoryQueue<M>();
+        msgs = new LinkedList<M>();
       }
       msgs.add(msg);
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
@@ -383,13 +380,12 @@ public class LocalBSPRunner implements J
         throws IOException {
       for (M value : bundle.getMessages()) {
         managerMap.get(addr).localIncomingMessages.add(value);
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-            1L);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
       }
     }
 
     @Override
-    public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator()
{
+    public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator()
{
       return localOutgoingMessages.entrySet().iterator();
     }
 
@@ -403,12 +399,6 @@ public class LocalBSPRunner implements J
       return localIncomingMessages.size();
     }
 
-    @Override
-    public void finishSendPhase() throws IOException {
-      // TODO Auto-generated method stub
-
-    }
-
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
Wed Apr 25 10:51:14 2012
@@ -24,8 +24,12 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -37,8 +41,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
-import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.util.BSPNetUtils;
 
 public final class AvroMessageManagerImpl<M extends Writable> extends
     CompressableMessageManager<M> implements Sender<M> {
@@ -46,24 +50,39 @@ public final class AvroMessageManagerImp
   private NettyServer server = null;
 
   private final HashMap<InetSocketAddress, Sender<M>> peers = new HashMap<InetSocketAddress,
Sender<M>>();
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
+
+  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new
HashMap<InetSocketAddress, LinkedList<M>>();
+  private Deque<M> localQueue = new LinkedList<M>();
+  // this must be a synchronized implementation: this is accessed per RPC
+  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
+
+  private BSPPeer<?, ?, ?, ?, M> peer;
 
   @Override
-  public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress addr) {
-    super.init(attemptId, peer, conf, addr);
+  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+      InetSocketAddress addr) {
+    this.peer = peer;
     super.initCompression(conf);
     server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
   }
 
   @Override
   public void close() {
-    super.close();
     server.close();
   }
 
+  @Override
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+    localQueue.addAll(localQueueForNextIteration);
+    localQueueForNextIteration.clear();
+  }
+
   public void put(BSPMessageBundle<M> messages) {
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-        messages.getMessages().size());
+    peer.incrementCounter(
+        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, messages.getMessages()
+            .size());
     Iterator<M> iterator = messages.getMessages().iterator();
     while (iterator.hasNext()) {
       this.localQueueForNextIteration.add(iterator.next());
@@ -71,6 +90,11 @@ public final class AvroMessageManagerImp
     }
   }
 
+  @Override
+  public int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
@@ -101,19 +125,42 @@ public final class AvroMessageManagerImp
     return null;
   }
 
+  @Override
+  public M getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  @Override
+  public void send(String peerName, M msg) throws IOException {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
+    if (queue == null) {
+      queue = new LinkedList<M>();
+    }
+    queue.add(msg);
+    outgoingQueues.put(targetPeerAddress, queue);
+  }
+
   private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
       throws IOException {
     BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
     byte[] byteArray = buffer.array();
     if (compressor == null) {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED,
-          byteArray.length);
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED, byteArray.length);
       ByteArrayInputStream inArray = new ByteArrayInputStream(byteArray);
       DataInputStream in = new DataInputStream(inArray);
       msg.readFields(in);
     } else {
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED,
-          byteArray.length);
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED, byteArray.length);
       msg = compressor.decompressBundle(new BSPCompressedBundle(byteArray));
     }
 
@@ -128,16 +175,20 @@ public final class AvroMessageManagerImp
       msg.write(out);
       out.close();
       byte[] byteArray = outArray.toByteArray();
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
-          byteArray.length);
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, byteArray.length);
       return ByteBuffer.wrap(byteArray);
     } else {
       BSPCompressedBundle compMsgBundle = compressor.compressBundle(msg);
       byte[] data = compMsgBundle.getData();
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT,
-          data.length);
+      peer.incrementCounter(
+          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT, data.length);
       return ByteBuffer.wrap(data);
     }
   }
 
+  @Override
+  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator()
{
+    return this.outgoingQueues.entrySet().iterator();
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Wed Apr 25 10:51:14 2012
@@ -27,7 +27,8 @@ import org.apache.hama.bsp.message.compr
  * 
  * @param <M>
  */
-public abstract class CompressableMessageManager<M extends Writable> extends AbstractMessageManager<M>
{
+public abstract class CompressableMessageManager<M extends Writable> implements
+    MessageManager<M> {
 
   protected BSPMessageCompressor<M> compressor;
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Wed Apr 25 10:51:14 2012
@@ -19,7 +19,12 @@ package org.apache.hama.bsp.message;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,8 +35,8 @@ import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
-import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
+import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.util.CompressionUtil;
 
 /**
@@ -44,14 +49,22 @@ public final class HadoopMessageManagerI
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
 
+  private Server server = null;
+  private Configuration conf;
+
   private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers = new
HashMap<InetSocketAddress, HadoopMessageManager<M>>();
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
 
-  private Server server = null;
+  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new
HashMap<InetSocketAddress, LinkedList<M>>();
+  private Deque<M> localQueue = new LinkedList<M>();
+  // this must be a synchronized implementation: this is accessed per RPC
+  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
+  private BSPPeer<?, ?, ?, ?, M> peer;
 
   @Override
-  public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress peerAddress) {
-    super.init(attemptId, peer, conf, peerAddress);
+  public final void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress
peerAddress) {
+    this.peer = peer;
+    this.conf = conf;
     super.initCompression(conf);
     startRPCServer(conf, peerAddress);
   }
@@ -72,13 +85,54 @@ public final class HadoopMessageManagerI
 
   @Override
   public final void close() {
-    super.close();
     if (server != null) {
       server.stop();
     }
   }
 
   @Override
+  public final M getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  @Override
+  public final void send(String peerName, M msg) throws IOException {
+    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
+    if (queue == null) {
+      queue = new LinkedList<M>();
+    }
+    queue.add(msg);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+    outgoingQueues.put(targetPeerAddress, queue);
+  }
+
+  @Override
+  public final Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator()
{
+    return this.outgoingQueues.entrySet().iterator();
+  }
+
+  @SuppressWarnings("unchecked")
+  protected final HadoopMessageManager<M> getBSPPeerConnection(
+      InetSocketAddress addr) throws IOException {
+    HadoopMessageManager<M> peer = peers.get(addr);
+    if (peer == null) {
+      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
+          HadoopMessageManager.versionID, addr, this.conf);
+      this.peers.put(addr, peer);
+    }
+    return peer;
+  }
+
+  @Override
   public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
 
@@ -101,22 +155,18 @@ public final class HadoopMessageManagerI
     }
   }
 
-  @SuppressWarnings("unchecked")
-  protected final HadoopMessageManager<M> getBSPPeerConnection(
-      InetSocketAddress addr) throws IOException {
-    HadoopMessageManager<M> peer = peers.get(addr);
-    if (peer == null) {
-      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
-          HadoopMessageManager.versionID, addr, this.conf);
-      this.peers.put(addr, peer);
-    }
-    return peer;
+  @Override
+  public final void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+    localQueue.addAll(localQueueForNextIteration);
+    localQueueForNextIteration.clear();
   }
 
   @Override
   public final void put(M msg) {
     this.localQueueForNextIteration.add(msg);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+    peer.incrementCounter(
+        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
   }
 
   @Override
@@ -135,6 +185,11 @@ public final class HadoopMessageManagerI
   }
 
   @Override
+  public final int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  @Override
   public final long getProtocolVersion(String arg0, long arg1)
       throws IOException {
     return versionID;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Wed Apr 25 10:51:14 2012
@@ -20,13 +20,13 @@ package org.apache.hama.bsp.message;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * This manager takes care of the messaging. It is responsible to launch a
@@ -34,16 +34,15 @@ import org.apache.hama.bsp.TaskAttemptID
  * 
  */
 public interface MessageManager<M extends Writable> {
-  
-  public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
 
   /**
-   * Init can be used to start servers and initialize internal state. If you are
-   * implementing a subclass, please call the super version of this method.
+   * Init can be used to start servers and initialize internal state.
    * 
+   * @param conf
+   * @param peerAddress
    */
-  public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress peerAddress);
+  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
+      InetSocketAddress peerAddress);
 
   /**
    * Close is called after a task ran. Should be used to cleanup things e.G.
@@ -54,6 +53,7 @@ public interface MessageManager<M extend
   /**
    * Get the current message.
    * 
+   * @return
    * @throws IOException
    */
   public M getCurrentMessage() throws IOException;
@@ -61,26 +61,25 @@ public interface MessageManager<M extend
   /**
    * Send a message to the peer.
    * 
+   * @param peerName
+   * @param msg
    * @throws IOException
    */
   public void send(String peerName, M msg) throws IOException;
 
   /**
-   * Should be called when all messages were send with send().
-   * 
-   * @throws IOException
-   */
-  public void finishSendPhase() throws IOException;
-
-  /**
    * Returns an iterator of messages grouped by peer.
    * 
+   * @return
    */
-  public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator();
+  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator();
 
   /**
    * This is the real transferring to a host with a bundle.
    * 
+   * @param addr
+   * @param bundle
+   * @throws IOException
    */
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException;
@@ -93,6 +92,7 @@ public interface MessageManager<M extend
   /**
    * Gets the number of messages in the current queue.
    * 
+   * @return
    */
   public int getNumCurrentMessages();
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Wed Apr 25 10:51:14 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.DiskQueue;
 import org.apache.hama.examples.ClassSerializePrinting;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
@@ -49,7 +48,6 @@ public class TestBSPMasterGroomServer ex
 
   private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
   static String TMP_OUTPUT = "/tmp/test-example/";
-  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
   static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
 
   private HamaConfiguration configuration;
@@ -60,7 +58,6 @@ public class TestBSPMasterGroomServer ex
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
Wed Apr 25 10:51:14 2012
@@ -30,7 +30,6 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
-import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
 import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.bsp.messages.BooleanMessage;
@@ -46,13 +45,10 @@ public class TestAvroMessageManager exte
 
   private static final int SUM = DOUBLE_MSG_COUNT + BOOL_MSG_COUNT
       + INT_MSG_COUNT;
-  
-  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
 
   public void testAvroMessenger() throws Exception {
     BSPMessageBundle<Writable> randomBundle = getRandomBundle();
     Configuration conf = new Configuration();
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     MessageManager<Writable> messageManager = MessageManagerFactory
         .getMessageManager(conf);
     conf.set(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
@@ -65,8 +61,8 @@ public class TestAvroMessageManager exte
 
     BSPPeer<?, ?, ?, ?, Writable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable,
NullWritable, NullWritable, Writable>(
         conf, FileSystem.get(conf), new Counters());
-    TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
-    messageManager.init(id, dummyPeer, conf, peer);
+
+    messageManager.init(dummyPeer, conf, peer);
 
     messageManager.transfer(peer, randomBundle);
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1330190&r1=1330189&r2=1330190&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Wed Apr 25 10:51:14 2012
@@ -19,6 +19,7 @@ package org.apache.hama.bsp.message;
 
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import junit.framework.TestCase;
@@ -31,31 +32,12 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
-import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {
 
-  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
-  // increment is here to solve race conditions in parallel execution to choose
-  // other ports.
-  public static volatile int increment = 1;
-
-  public void testMemoryMessaging() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(MessageManager.QUEUE_TYPE_CLASS,
-        MemoryQueue.class.getCanonicalName());
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    messagingInternal(conf);
-  }
-
-  public void testDiskMessaging() throws Exception {
+  public void testMessaging() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    messagingInternal(conf);
-  }
-
-  private void messagingInternal(Configuration conf) throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
     MessageManager<IntWritable> messageManager = MessageManagerFactory
@@ -64,20 +46,18 @@ public class TestHadoopMessageManager ex
     assertTrue(messageManager instanceof HadoopMessageManagerImpl);
 
     InetSocketAddress peer = new InetSocketAddress(
-        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
-            + (increment++));
+        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
     BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable,
NullWritable, NullWritable, IntWritable>(
         conf, FileSystem.get(conf), new Counters());
-    TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
-    messageManager.init(id, dummyPeer, conf, peer);
+    messageManager.init(dummyPeer, conf, peer);
     String peerName = peer.getHostName() + ":" + peer.getPort();
 
     messageManager.send(peerName, new IntWritable(1337));
 
-    Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>> messageIterator
= messageManager
+    Iterator<Entry<InetSocketAddress, LinkedList<IntWritable>>> messageIterator
= messageManager
         .getMessageIterator();
 
-    Entry<InetSocketAddress, MessageQueue<IntWritable>> entry = messageIterator
+    Entry<InetSocketAddress, LinkedList<IntWritable>> entry = messageIterator
         .next();
 
     assertEquals(entry.getKey(), peer);
@@ -97,6 +77,5 @@ public class TestHadoopMessageManager ex
     IntWritable currentMessage = messageManager.getCurrentMessage();
 
     assertEquals(currentMessage.get(), 1337);
-    messageManager.close();
   }
 }



Mime
View raw message