incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1331732 - in /incubator/hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/message/
Date Sat, 28 Apr 2012 09:40:47 GMT
Author: edwardyoon
Date: Sat Apr 28 09:40:46 2012
New Revision: 1331732

URL: http://svn.apache.org/viewvc?rev=1331732&view=rev
Log:
Improve message buffering to save memory

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java   (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java   (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat Apr 28 09:40:46 2012
@@ -16,6 +16,7 @@ Release 0.5 - April 10, 2012 
 
   IMPROVEMENTS
 
+    HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
     HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
     HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
     HAMA-555: Separate bin and src distributions (edwardyoon)

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Sat Apr 28 09:40:46 2012
@@ -81,6 +81,11 @@
     <description>Temporary directory on the local filesystem.</description>
   </property>
   <property>
+    <name>bsp.disk.queue.dir</name>
+    <value>${hama.tmp.dir}/messages/</value>
+    <description>Temporary directory on the local message buffer on disk.</description>
+  </property>
+  <property>
     <name>bsp.child.java.opts</name>
     <value>-Xmx512m</value>
     <description>Java opts for the groom server child processes.  

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sat Apr 28 09:40:46 2012
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -37,6 +36,7 @@ import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
+import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -165,7 +165,7 @@ public final class BSPPeerImpl<K1, V1, K
         TaskStatus.Phase.STARTING, counters));
 
     messenger = MessageManagerFactory.getMessageManager(conf);
-    messenger.init(this, conf, peerAddress);
+    messenger.init(taskId, this, conf, peerAddress);
 
     final String combinerName = conf.get("bsp.combiner.class");
     if (combinerName != null) {
@@ -294,7 +294,9 @@ public final class BSPPeerImpl<K1, V1, K
       InterruptedException {
     long startBarrier = System.currentTimeMillis();
     enterBarrier();
-    Iterator<Entry<InetSocketAddress, LinkedList<M>>> it = messenger
+    // normally all messages should been send now, finalizing the send phase
+    messenger.finishSendPhase();
+    Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
         .getMessageIterator();
 
     boolean shouldCheckPoint = false;
@@ -304,7 +306,7 @@ public final class BSPPeerImpl<K1, V1, K
     }
 
     while (it.hasNext()) {
-      Entry<InetSocketAddress, LinkedList<M>> entry = it.next();
+      Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
       final Iterable<M> messages = entry.getValue();
 
@@ -357,16 +359,33 @@ public final class BSPPeerImpl<K1, V1, K
 
   public final void close() throws SyncException, IOException,
       InterruptedException {
+    // there are many catches, because we want to close always every component
+    // even if the one before failed.
     if (in != null) {
-      in.close();
+      try {
+        in.close();
+      } catch (Exception e) {
+        LOG.error(e);
+      }
     }
     if (outWriter != null) {
-      outWriter.close();
+      try {
+        outWriter.close();
+      } catch (Exception e) {
+        LOG.error(e);
+      }
     }
     this.clear();
-    syncClient.close();
-
-    messenger.close();
+    try {
+      syncClient.close();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    try {
+      messenger.close();
+    } catch (Exception e) {
+      LOG.error(e);
+    }
   }
 
   @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sat Apr 28 09:40:46 2012
@@ -45,8 +45,10 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.bsp.message.MemoryQueue;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
+import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -332,14 +334,15 @@ public class LocalBSPRunner implements J
     @SuppressWarnings("rawtypes")
     private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
-    private final HashMap<InetSocketAddress, LinkedList<M>> localOutgoingMessages = new HashMap<InetSocketAddress, LinkedList<M>>();
+    private final HashMap<InetSocketAddress, MessageQueue<M>> localOutgoingMessages = new HashMap<InetSocketAddress, MessageQueue<M>>();
     private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
     private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
 
     private BSPPeer<?, ?, ?, ?, M> peer;
 
     @Override
-    public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
+    public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+        Configuration conf, InetSocketAddress peerAddress) {
       this.peer = peer;
       managerMap.put(peerAddress, this);
     }
@@ -365,9 +368,9 @@ public class LocalBSPRunner implements J
         inetSocketAddress = BSPNetUtils.getAddress(peerName);
         socketCache.put(peerName, inetSocketAddress);
       }
-      LinkedList<M> msgs = localOutgoingMessages.get(inetSocketAddress);
+      MessageQueue<M> msgs = localOutgoingMessages.get(inetSocketAddress);
       if (msgs == null) {
-        msgs = new LinkedList<M>();
+        msgs = new MemoryQueue<M>();
       }
       msgs.add(msg);
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
@@ -380,12 +383,13 @@ public class LocalBSPRunner implements J
         throws IOException {
       for (M value : bundle.getMessages()) {
         managerMap.get(addr).localIncomingMessages.add(value);
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+            1L);
       }
     }
 
     @Override
-    public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
+    public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
       return localOutgoingMessages.entrySet().iterator();
     }
 
@@ -399,6 +403,12 @@ public class LocalBSPRunner implements J
       return localIncomingMessages.size();
     }
 
+    @Override
+    public void finishSendPhase() throws IOException {
+      // TODO Auto-generated method stub
+
+    }
+
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.BSPNetUtils;
+
+public abstract class AbstractMessageManager<M extends Writable> implements
+    MessageManager<M>, Configurable {
+
+  private static final Log LOG = LogFactory
+      .getLog(AbstractMessageManager.class);
+
+  // conf is injected via reflection of the factory
+  protected Configuration conf;
+  protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+  protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues = new HashMap<InetSocketAddress, MessageQueue<M>>();
+  protected MessageQueue<M> localQueue;
+  // this must be a synchronized implementation: this is accessed per RPC
+  protected SynchronizedQueue<M> localQueueForNextIteration;
+  protected BSPPeer<?, ?, ?, ?, M> peer;
+  // the peer address of this peer
+  protected InetSocketAddress peerAddress;
+  // the task attempt id
+  protected TaskAttemptID attemptId;
+
+  @Override
+  public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+      Configuration conf, InetSocketAddress peerAddress) {
+    this.attemptId = attemptId;
+    this.peer = peer;
+    this.conf = conf;
+    this.peerAddress = peerAddress;
+    localQueue = getQueue();
+    localQueue.init(conf, attemptId);
+    localQueueForNextIteration = getSynchronizedQueue();
+    localQueueForNextIteration.init(conf, attemptId);
+  }
+
+  @Override
+  public void close() {
+    Collection<MessageQueue<M>> values = outgoingQueues.values();
+    for (MessageQueue<M> msgQueue : values) {
+      msgQueue.close();
+    }
+    localQueue.close();
+  }
+
+  @Override
+  public void finishSendPhase() throws IOException {
+    Collection<MessageQueue<M>> values = outgoingQueues.values();
+    for (MessageQueue<M> msgQueue : values) {
+      msgQueue.prepareRead();
+    }
+  }
+
+  @Override
+  public final M getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  @Override
+  public final int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  @Override
+  public final void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+    localQueueForNextIteration.prepareRead();
+    localQueue.prepareWrite();
+    localQueue.addAll(localQueueForNextIteration.getMessageQueue());
+    localQueue.prepareRead();
+    localQueueForNextIteration.clear();
+  }
+
+  @Override
+  public void send(String peerName, M msg) throws IOException {
+    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
+    if (queue == null) {
+      queue = getQueue();
+    }
+    queue.add(msg);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+    outgoingQueues.put(targetPeerAddress, queue);
+  }
+
+  @Override
+  public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
+    return this.outgoingQueues.entrySet().iterator();
+  }
+
+  /**
+   * Returns a new queue implementation based on what was configured. If nothing
+   * has been configured for "hama.messenger.queue.class" then the
+   * {@link MemoryQueue} is used.
+   * 
+   * @return a <b>new</b> queue implementation.
+   */
+  protected MessageQueue<M> getQueue() {
+    Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
+    @SuppressWarnings("unchecked")
+    MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils
+        .newInstance(queueClass, conf);
+    newInstance.init(conf, attemptId);
+    return newInstance;
+  }
+
+  protected SynchronizedQueue<M> getSynchronizedQueue() {
+    return SynchronizedQueue.synchronize(getQueue());
+  }
+
+  public final Configuration getConf() {
+    return conf;
+  }
+
+  public final void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sat Apr 28 09:40:46 2012
@@ -24,12 +24,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -41,8 +37,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
-import org.apache.hama.util.BSPNetUtils;
 
 public final class AvroMessageManagerImpl<M extends Writable> extends
     CompressableMessageManager<M> implements Sender<M> {
@@ -50,39 +46,24 @@ public final class AvroMessageManagerImp
   private NettyServer server = null;
 
   private final HashMap<InetSocketAddress, Sender<M>> peers = new HashMap<InetSocketAddress, Sender<M>>();
-  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
-
-  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<M>>();
-  private Deque<M> localQueue = new LinkedList<M>();
-  // this must be a synchronized implementation: this is accessed per RPC
-  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
-
-  private BSPPeer<?, ?, ?, ?, M> peer;
 
   @Override
-  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
-      InetSocketAddress addr) {
-    this.peer = peer;
+  public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+      Configuration conf, InetSocketAddress addr) {
+    super.init(attemptId, peer, conf, addr);
     super.initCompression(conf);
     server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
   }
 
   @Override
   public void close() {
+    super.close();
     server.close();
   }
 
-  @Override
-  public void clearOutgoingQueues() {
-    this.outgoingQueues.clear();
-    localQueue.addAll(localQueueForNextIteration);
-    localQueueForNextIteration.clear();
-  }
-
   public void put(BSPMessageBundle<M> messages) {
-    peer.incrementCounter(
-        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, messages.getMessages()
-            .size());
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+        messages.getMessages().size());
     Iterator<M> iterator = messages.getMessages().iterator();
     while (iterator.hasNext()) {
       this.localQueueForNextIteration.add(iterator.next());
@@ -90,11 +71,6 @@ public final class AvroMessageManagerImp
     }
   }
 
-  @Override
-  public int getNumCurrentMessages() {
-    return localQueue.size();
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
@@ -125,42 +101,19 @@ public final class AvroMessageManagerImp
     return null;
   }
 
-  @Override
-  public M getCurrentMessage() throws IOException {
-    return localQueue.poll();
-  }
-
-  @Override
-  public void send(String peerName, M msg) throws IOException {
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = BSPNetUtils.getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
-    if (queue == null) {
-      queue = new LinkedList<M>();
-    }
-    queue.add(msg);
-    outgoingQueues.put(targetPeerAddress, queue);
-  }
-
   private final BSPMessageBundle<M> deserializeMessage(ByteBuffer buffer)
       throws IOException {
     BSPMessageBundle<M> msg = new BSPMessageBundle<M>();
     byte[] byteArray = buffer.array();
     if (compressor == null) {
-      peer.incrementCounter(
-          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED, byteArray.length);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_RECEIVED,
+          byteArray.length);
       ByteArrayInputStream inArray = new ByteArrayInputStream(byteArray);
       DataInputStream in = new DataInputStream(inArray);
       msg.readFields(in);
     } else {
-      peer.incrementCounter(
-          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED, byteArray.length);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_RECEIVED,
+          byteArray.length);
       msg = compressor.decompressBundle(new BSPCompressedBundle(byteArray));
     }
 
@@ -175,20 +128,16 @@ public final class AvroMessageManagerImp
       msg.write(out);
       out.close();
       byte[] byteArray = outArray.toByteArray();
-      peer.incrementCounter(
-          BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, byteArray.length);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+          byteArray.length);
       return ByteBuffer.wrap(byteArray);
     } else {
       BSPCompressedBundle compMsgBundle = compressor.compressBundle(msg);
       byte[] data = compMsgBundle.getData();
-      peer.incrementCounter(
-          BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT, data.length);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_BYTES_SENT,
+          data.length);
       return ByteBuffer.wrap(data);
     }
   }
 
-  @Override
-  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
-    return this.outgoingQueues.entrySet().iterator();
-  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java Sat Apr 28 09:40:46 2012
@@ -27,8 +27,7 @@ import org.apache.hama.bsp.message.compr
  * 
  * @param <M>
  */
-public abstract class CompressableMessageManager<M extends Writable> implements
-    MessageManager<M> {
+public abstract class CompressableMessageManager<M extends Writable> extends AbstractMessageManager<M> {
 
   protected BSPMessageCompressor<M> compressor;
 

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A disk based queue that is backed by a sequencefile. <br/>
+ * Structure is as follows: <br/>
+ * If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/>
+ * ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/>
+ * An ongoing sequencenumber will be appended to prevent inner collisions,
+ * however the job_id dir will never be deleted. So you need a cronjob to do the
+ * cleanup for you. <br/>
+ * <b>This is currently not intended to be production ready</b>
+ */
+public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+
+  public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
+
+  private final Log LOG = LogFactory.getLog(DiskQueue.class);
+
+  private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
+  private static final int MAX_RETRIES = 4;
+  private static final NullWritable NULL_WRITABLE = NullWritable.get();
+
+  private int size = 0;
+  // injected via reflection
+  private Configuration conf;
+  private FileSystem fs;
+
+  private SequenceFile.Writer writer;
+  private SequenceFile.Reader reader;
+  private Path queuePath;
+  private TaskAttemptID id;
+  private final ObjectWritable writable = new ObjectWritable();
+
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+    this.id = id;
+    writable.setConf(conf);
+    try {
+      fs = FileSystem.get(conf);
+      String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY);
+      Path queueDir = null;
+      if (configuredQueueDir == null) {
+        String hamaTmpDir = conf.get("hama.tmp.dir");
+        if (hamaTmpDir != null) {
+          queueDir = createDiskQueuePath(id, hamaTmpDir);
+        } else {
+          // use some local tmp dir
+          queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
+        }
+      } else {
+        queueDir = createDiskQueuePath(id, configuredQueueDir);
+      }
+      fs.mkdirs(queueDir);
+      queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++)
+          + "_messages.seq");
+      prepareWrite();
+    } catch (IOException e) {
+      // we can't recover if something bad happens here..
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    closeInternal(true);
+    try {
+      fs.delete(queuePath.getParent(), true);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Close our writer internal, basically should be called after the computation
+   * phase ended.
+   */
+  private void closeInternal(boolean delete) {
+    try {
+      if (writer != null) {
+        writer.close();
+        writer = null;
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      if (fs != null && delete) {
+        try {
+          fs.delete(queuePath, true);
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+      if (writer != null) {
+        try {
+          writer.close();
+          writer = null;
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+          reader = null;
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void prepareRead() {
+    // make sure we've closed
+    closeInternal(false);
+    try {
+      reader = new SequenceFile.Reader(fs, queuePath, conf);
+    } catch (IOException e) {
+      // can't recover from that
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void prepareWrite() {
+    try {
+      writer = new SequenceFile.Writer(fs, conf, queuePath,
+          ObjectWritable.class, NullWritable.class);
+    } catch (IOException e) {
+      // can't recover from that
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public final void addAll(Collection<M> col) {
+    for (M item : col) {
+      add(item);
+    }
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      add(poll);
+    }
+  }
+
+  @Override
+  public final void add(M item) {
+    size++;
+    try {
+      writer.append(new ObjectWritable(item), NULL_WRITABLE);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public final void clear() {
+    closeInternal(true);
+    size = 0;
+    init(conf, id);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public final M poll() {
+    size--;
+    int tries = 1;
+    while (tries <= MAX_RETRIES) {
+      try {
+        boolean next = reader.next(writable, NULL_WRITABLE);
+        if (next) {
+          return (M) writable.get();
+        } else {
+          closeInternal(false);
+          return null;
+        }
+      } catch (IOException e) {
+        LOG.error("Retrying for the " + tries + "th time!", e);
+      }
+      tries++;
+    }
+    throw new RuntimeException("Message couldn't be read for " + tries
+        + " times! Giving up!");
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return new DiskIterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  private class DiskIterator implements Iterator<M> {
+
+    public DiskIterator() {
+      prepareRead();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (size == 0) {
+        closeInternal(false);
+      }
+      return size != 0;
+    }
+
+    @Override
+    public M next() {
+      return poll();
+    }
+
+    @Override
+    public void remove() {
+      // no-op
+    }
+
+  }
+
+  /**
+   * Creates a generic Path based on the configured path and the task attempt id
+   * to store disk sequence files. <br/>
+   * Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/
+   */
+  public static Path createDiskQueuePath(TaskAttemptID id, String configuredPath) {
+    return new Path(new Path(new Path(configuredPath, "diskqueue"), id
+        .getJobID().toString()), id.getTaskID().toString());
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sat Apr 28 09:40:46 2012
@@ -19,12 +19,7 @@ package org.apache.hama.bsp.message;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,8 +30,8 @@ import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
-import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.util.CompressionUtil;
 
 /**
@@ -49,22 +44,14 @@ public final class HadoopMessageManagerI
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
 
-  private Server server = null;
-  private Configuration conf;
-
   private final HashMap<InetSocketAddress, HadoopMessageManager<M>> peers = new HashMap<InetSocketAddress, HadoopMessageManager<M>>();
-  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
 
-  private final HashMap<InetSocketAddress, LinkedList<M>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<M>>();
-  private Deque<M> localQueue = new LinkedList<M>();
-  // this must be a synchronized implementation: this is accessed per RPC
-  private final ConcurrentLinkedQueue<M> localQueueForNextIteration = new ConcurrentLinkedQueue<M>();
-  private BSPPeer<?, ?, ?, ?, M> peer;
+  private Server server = null;
 
   @Override
-  public final void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf, InetSocketAddress peerAddress) {
-    this.peer = peer;
-    this.conf = conf;
+  public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+      Configuration conf, InetSocketAddress peerAddress) {
+    super.init(attemptId, peer, conf, peerAddress);
     super.initCompression(conf);
     startRPCServer(conf, peerAddress);
   }
@@ -85,54 +72,13 @@ public final class HadoopMessageManagerI
 
   @Override
   public final void close() {
+    super.close();
     if (server != null) {
       server.stop();
     }
   }
 
   @Override
-  public final M getCurrentMessage() throws IOException {
-    return localQueue.poll();
-  }
-
-  @Override
-  public final void send(String peerName, M msg) throws IOException {
-    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = BSPNetUtils.getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-    LinkedList<M> queue = outgoingQueues.get(targetPeerAddress);
-    if (queue == null) {
-      queue = new LinkedList<M>();
-    }
-    queue.add(msg);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
-    outgoingQueues.put(targetPeerAddress, queue);
-  }
-
-  @Override
-  public final Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator() {
-    return this.outgoingQueues.entrySet().iterator();
-  }
-
-  @SuppressWarnings("unchecked")
-  protected final HadoopMessageManager<M> getBSPPeerConnection(
-      InetSocketAddress addr) throws IOException {
-    HadoopMessageManager<M> peer = peers.get(addr);
-    if (peer == null) {
-      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
-          HadoopMessageManager.versionID, addr, this.conf);
-      this.peers.put(addr, peer);
-    }
-    return peer;
-  }
-
-  @Override
   public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException {
 
@@ -155,18 +101,22 @@ public final class HadoopMessageManagerI
     }
   }
 
-  @Override
-  public final void clearOutgoingQueues() {
-    this.outgoingQueues.clear();
-    localQueue.addAll(localQueueForNextIteration);
-    localQueueForNextIteration.clear();
+  @SuppressWarnings("unchecked")
+  protected final HadoopMessageManager<M> getBSPPeerConnection(
+      InetSocketAddress addr) throws IOException {
+    HadoopMessageManager<M> peer = peers.get(addr);
+    if (peer == null) {
+      peer = (HadoopMessageManager<M>) RPC.getProxy(HadoopMessageManager.class,
+          HadoopMessageManager.versionID, addr, this.conf);
+      this.peers.put(addr, peer);
+    }
+    return peer;
   }
 
   @Override
   public final void put(M msg) {
     this.localQueueForNextIteration.add(msg);
-    peer.incrementCounter(
-        BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
   }
 
   @Override
@@ -185,11 +135,6 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final int getNumCurrentMessages() {
-    return localQueue.size();
-  }
-
-  @Override
   public final long getProtocolVersion(String arg0, long arg1)
       throws IOException {
     return versionID;

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * LinkedList backed queue structure for bookkeeping messages.
+ */
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
+
+  private final Deque<M> deque = new LinkedList<M>();
+  private Configuration conf;
+
+  @Override
+  public final void addAll(Collection<M> col) {
+    deque.addAll(col);
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      deque.add(poll);
+    }
+  }
+
+  @Override
+  public final void add(M item) {
+    deque.add(item);
+  }
+
+  @Override
+  public final void clear() {
+    deque.clear();
+  }
+
+  @Override
+  public final M poll() {
+    return deque.poll();
+  }
+
+  @Override
+  public final int size() {
+    return deque.size();
+  }
+
+  @Override
+  public final Iterator<M> iterator() {
+    return deque.iterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  // not doing much here
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void prepareRead() {
+
+  }
+
+  @Override
+  public void prepareWrite() {
+    
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sat Apr 28 09:40:46 2012
@@ -20,13 +20,13 @@ package org.apache.hama.bsp.message;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * This manager takes care of the messaging. It is responsible to launch a
@@ -34,15 +34,16 @@ import org.apache.hama.bsp.BSPPeer;
  * 
  */
 public interface MessageManager<M extends Writable> {
+  
+  public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
 
   /**
-   * Init can be used to start servers and initialize internal state.
+   * Init can be used to start servers and initialize internal state. If you are
+   * implementing a subclass, please call the super version of this method.
    * 
-   * @param conf
-   * @param peerAddress
    */
-  public void init(BSPPeer<?, ?, ?, ?, M> peer, Configuration conf,
-      InetSocketAddress peerAddress);
+  public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+      Configuration conf, InetSocketAddress peerAddress);
 
   /**
    * Close is called after a task ran. Should be used to cleanup things e.G.
@@ -53,7 +54,6 @@ public interface MessageManager<M extend
   /**
    * Get the current message.
    * 
-   * @return
    * @throws IOException
    */
   public M getCurrentMessage() throws IOException;
@@ -61,25 +61,26 @@ public interface MessageManager<M extend
   /**
    * Send a message to the peer.
    * 
-   * @param peerName
-   * @param msg
    * @throws IOException
    */
   public void send(String peerName, M msg) throws IOException;
 
   /**
+   * Should be called when all messages were send with send().
+   * 
+   * @throws IOException
+   */
+  public void finishSendPhase() throws IOException;
+
+  /**
    * Returns an iterator of messages grouped by peer.
    * 
-   * @return
    */
-  public Iterator<Entry<InetSocketAddress, LinkedList<M>>> getMessageIterator();
+  public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator();
 
   /**
    * This is the real transferring to a host with a bundle.
    * 
-   * @param addr
-   * @param bundle
-   * @throws IOException
    */
   public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
       throws IOException;
@@ -92,7 +93,6 @@ public interface MessageManager<M extend
   /**
    * Gets the number of messages in the current queue.
    * 
-   * @return
    */
   public int getNumCurrentMessages();
 

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Simple queue interface.
+ */
+public interface MessageQueue<M> extends Iterable<M>, Configurable {
+
+  /**
+   * Used to initialize the queue.
+   */
+  public void init(Configuration conf, TaskAttemptID id);
+
+  /**
+   * Finally close the queue. Commonly used to free resources.
+   */
+  public void close();
+
+  /**
+   * Called to prepare a queue for reading.
+   */
+  public void prepareRead();
+
+  /**
+   * Called to prepare a queue for writing.
+   */
+  public void prepareWrite();
+
+  /**
+   * Adds a whole Java Collection to the implementing queue.
+   */
+  public void addAll(Collection<M> col);
+
+  /**
+   * Adds the other queue to this queue.
+   */
+  public void addAll(MessageQueue<M> otherqueue);
+
+  /**
+   * Adds a single item to the implementing queue.
+   */
+  public void add(M item);
+
+  /**
+   * Clears all entries in the given queue.
+   */
+  public void clear();
+
+  /**
+   * Polls for the next item in the queue (FIFO).
+   * 
+   * @return a new item or null if none are present.
+   */
+  public M poll();
+
+  /**
+   * @return how many items are in the queue.
+   */
+  public int size();
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A global mutex based synchronized queue.
+ */
+public final class SynchronizedQueue<T> {
+
+  private final MessageQueue<T> queue;
+  private final Object mutex;
+
+  private SynchronizedQueue(MessageQueue<T> queue) {
+    this.queue = queue;
+    this.mutex = new Object();
+  }
+
+  private SynchronizedQueue(MessageQueue<T> queue, Object mutex) {
+    this.queue = queue;
+    this.mutex = mutex;
+  }
+
+  public Iterator<T> iterator() {
+    synchronized (mutex) {
+      return queue.iterator();
+    }
+  }
+
+  public void setConf(Configuration conf) {
+    synchronized (mutex) {
+      queue.setConf(conf);
+    }
+  }
+
+  public Configuration getConf() {
+    synchronized (mutex) {
+      return queue.getConf();
+    }
+  }
+
+  public void init(Configuration conf, TaskAttemptID id) {
+    synchronized (mutex) {
+      queue.init(conf, id);
+    }
+  }
+
+  public void close() {
+    synchronized (mutex) {
+    }
+    queue.close();
+  }
+
+  public void prepareRead() {
+    synchronized (mutex) {
+      queue.prepareRead();
+    }
+  }
+
+  public void addAll(Collection<T> col) {
+    synchronized (mutex) {
+      queue.addAll(col);
+    }
+  }
+
+  public void add(T item) {
+    synchronized (mutex) {
+      queue.add(item);
+    }
+  }
+
+  public void clear() {
+    synchronized (mutex) {
+      queue.clear();
+    }
+  }
+
+  public Object poll() {
+    synchronized (mutex) {
+      return queue.poll();
+    }
+  }
+
+  public int size() {
+    synchronized (mutex) {
+      return queue.size();
+    }
+  }
+
+  public MessageQueue<T> getMessageQueue() {
+    synchronized (mutex) {
+      return queue;
+    }
+  }
+
+  /*
+   * static constructor methods to be type safe
+   */
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+    return new SynchronizedQueue<T>(queue);
+  }
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+      Object mutex) {
+    return new SynchronizedQueue<T>(queue, mutex);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Sat Apr 28 09:40:46 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.DiskQueue;
 import org.apache.hama.examples.ClassSerializePrinting;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.zookeeper.CreateMode;
@@ -48,6 +49,7 @@ public class TestBSPMasterGroomServer ex
 
   private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
   static String TMP_OUTPUT = "/tmp/test-example/";
+  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
   static Path OUTPUT_PATH = new Path(TMP_OUTPUT + "serialout");
 
   private HamaConfiguration configuration;
@@ -58,6 +60,7 @@ public class TestBSPMasterGroomServer ex
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Sat Apr 28 09:40:46 2012
@@ -30,6 +30,7 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
 import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.bsp.messages.BooleanMessage;
@@ -45,10 +46,13 @@ public class TestAvroMessageManager exte
 
   private static final int SUM = DOUBLE_MSG_COUNT + BOOL_MSG_COUNT
       + INT_MSG_COUNT;
+  
+  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
 
   public void testAvroMessenger() throws Exception {
     BSPMessageBundle<Writable> randomBundle = getRandomBundle();
     Configuration conf = new Configuration();
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     MessageManager<Writable> messageManager = MessageManagerFactory
         .getMessageManager(conf);
     conf.set(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
@@ -61,8 +65,8 @@ public class TestAvroMessageManager exte
 
     BSPPeer<?, ?, ?, ?, Writable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, Writable>(
         conf, FileSystem.get(conf), new Counters());
-
-    messageManager.init(dummyPeer, conf, peer);
+    TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
+    messageManager.init(id, dummyPeer, conf, peer);
 
     messageManager.transfer(peer, randomBundle);
 

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java?rev=1331732&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java Sat Apr 28 09:40:46 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.junit.Test;
+
+public class TestDiskQueue extends TestCase {
+
+  static Configuration conf = new Configuration();
+  static {
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY,
+        TestAvroMessageManager.TMP_OUTPUT_PATH);
+  }
+
+  @Test
+  public void testDiskQueue() throws Exception {
+    DiskQueue<IntWritable> queue = getQueue();
+    checkQueue(queue);
+    queue.close();
+  }
+
+  @Test
+  public void testMultipleIterations() throws Exception {
+    DiskQueue<IntWritable> queue = getQueue();
+    for (int superstep = 0; superstep < 15; superstep++) {
+      checkQueue(queue);
+    }
+    queue.close();
+  }
+
+  public DiskQueue<IntWritable> getQueue() {
+    TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
+    DiskQueue<IntWritable> queue = new DiskQueue<IntWritable>();
+    // normally this is injected via reflection
+    queue.setConf(conf);
+    queue.init(conf, id);
+    return queue;
+  }
+
+  public void checkQueue(DiskQueue<IntWritable> queue) {
+    queue.prepareWrite();
+    for (int i = 0; i < 10; i++) {
+      queue.add(new IntWritable(i));
+    }
+
+    queue.addAll(Arrays.asList(new IntWritable(11), new IntWritable(12),
+        new IntWritable(13)));
+    assertEquals(13, queue.size());
+    queue.prepareRead();
+
+    for (int i = 0; i < 9; i++) {
+      assertEquals(i, queue.poll().get());
+    }
+
+    queue.clear();
+
+    assertEquals(0, queue.size());
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1331732&r1=1331731&r2=1331732&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat Apr 28 09:40:46 2012
@@ -19,7 +19,6 @@ package org.apache.hama.bsp.message;
 
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map.Entry;
 
 import junit.framework.TestCase;
@@ -32,12 +31,31 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
+import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {
 
-  public void testMessaging() throws Exception {
+  public static final String TMP_OUTPUT_PATH = "/tmp/messageQueue";
+  // increment is here to solve race conditions in parallel execution to choose
+  // other ports.
+  public static volatile int increment = 1;
+
+  public void testMemoryMessaging() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MessageManager.QUEUE_TYPE_CLASS,
+        MemoryQueue.class.getCanonicalName());
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    messagingInternal(conf);
+  }
+
+  public void testDiskMessaging() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    messagingInternal(conf);
+  }
+
+  private void messagingInternal(Configuration conf) throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
     MessageManager<IntWritable> messageManager = MessageManagerFactory
@@ -46,18 +64,20 @@ public class TestHadoopMessageManager ex
     assertTrue(messageManager instanceof HadoopMessageManagerImpl);
 
     InetSocketAddress peer = new InetSocketAddress(
-        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
+        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
+            + (increment++));
     BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable>(
         conf, FileSystem.get(conf), new Counters());
-    messageManager.init(dummyPeer, conf, peer);
+    TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
+    messageManager.init(id, dummyPeer, conf, peer);
     String peerName = peer.getHostName() + ":" + peer.getPort();
 
     messageManager.send(peerName, new IntWritable(1337));
 
-    Iterator<Entry<InetSocketAddress, LinkedList<IntWritable>>> messageIterator = messageManager
+    Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>> messageIterator = messageManager
         .getMessageIterator();
 
-    Entry<InetSocketAddress, LinkedList<IntWritable>> entry = messageIterator
+    Entry<InetSocketAddress, MessageQueue<IntWritable>> entry = messageIterator
         .next();
 
     assertEquals(entry.getKey(), peer);
@@ -77,5 +97,6 @@ public class TestHadoopMessageManager ex
     IntWritable currentMessage = messageManager.getCurrentMessage();
 
     assertEquals(currentMessage.get(), 1337);
+    messageManager.close();
   }
 }



Mime
View raw message