hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1559885 - in /hama/trunk: ./ c++/src/main/native/examples/conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/ham...
Date Tue, 21 Jan 2014 01:28:58 GMT
Author: edwardyoon
Date: Tue Jan 21 01:28:58 2014
New Revision: 1559885

URL: http://svn.apache.org/r1559885
Log:
HAMA-853: Refactor Outgoing message manager (edwardyoon)

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
  (with props)
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
  (with props)
Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueueTransfer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueueTransfer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueueTransfer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransfer.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Jan 21 01:28:58 2014
@@ -23,6 +23,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-853: Refactor Outgoing message manager (edwardyoon)
    HAMA-852: Add MessageClass property in BSPJob (Martin Illecker)
    HAMA-843: Message communication overhead between master aggregation and vertex computation
supersteps (edwardyoon)
    HAMA-838: Refactor aggregators (Anastasis Andronidis)

Modified: hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/matrixmultiplication.xml?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml Tue Jan 21 01:28:58
2014
@@ -58,8 +58,8 @@
     <value>org.apache.hama.commons.io.PipesKeyValueWritable</value>
   </property>
   <property>
-    <name>hama.messenger.xfer.queue.class</name>
-    <value>org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer</value>
+    <name>hama.messenger.receive.queue.class</name>
+    <value>org.apache.hama.bsp.message.queue.SortedMemoryQueue</value>
   </property>
   <property>
     <name>bsp.input.partitioner.class</name>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Jan 21 01:28:58
2014
@@ -42,7 +42,6 @@ import org.apache.hama.bsp.ft.BSPFaultTo
 import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
@@ -90,7 +89,6 @@ public final class BSPPeerImpl<K1, V1, K
   private InetSocketAddress peerAddress;
 
   private Counters counters;
-  private Combiner<M> combiner;
 
   private FaultTolerantPeerService<M> faultToleranceService;
 
@@ -122,7 +120,8 @@ public final class BSPPeerImpl<K1, V1, K
    * @param dfs is the Hadoop FileSystem.
    * @param counters is the counters from outside.
    */
-  public BSPPeerImpl(final HamaConfiguration conf, FileSystem dfs, Counters counters) {
+  public BSPPeerImpl(final HamaConfiguration conf, FileSystem dfs,
+      Counters counters) {
     this(conf, dfs);
     this.counters = counters;
   }
@@ -169,14 +168,14 @@ public final class BSPPeerImpl<K1, V1, K
 
     // This function call may change the current peer address
     initializeMessaging();
-    
+
     conf.set(Constants.PEER_HOST, peerAddress.getHostName());
     conf.setInt(Constants.PEER_PORT, peerAddress.getPort());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Initialized Messaging service.");
     }
-    
+
     initializeIO();
     initializeSyncService(superstep, state);
 
@@ -190,12 +189,6 @@ public final class BSPPeerImpl<K1, V1, K
     setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
         stateString, peerAddress.getHostName(), phase, counters));
 
-    final String combinerName = conf.get(Constants.COMBINER_CLASS);
-    if (combinerName != null) {
-      combiner = (Combiner<M>) ReflectionUtils.newInstance(
-          conf.getClassByName(combinerName), conf);
-    }
-
     if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Fault tolerance enabled.");
@@ -372,16 +365,15 @@ public final class BSPPeerImpl<K1, V1, K
       InterruptedException {
 
     // normally all messages should been send now, finalizing the send phase
-    messenger.finishSendPhase();
-    Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
-        .getMessageIterator();
+    Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it = messenger
+        .getOutgoingBundles();
 
     while (it.hasNext()) {
-      Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
+      Entry<InetSocketAddress, BSPMessageBundle<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
-      final Iterable<M> messages = entry.getValue();
 
-      final BSPMessageBundle<M> bundle = combineMessages(messages);
+      final BSPMessageBundle<M> bundle = entry.getValue();
+
       // remove this message during runtime to save a bit of memory
       it.remove();
       try {
@@ -389,10 +381,6 @@ public final class BSPPeerImpl<K1, V1, K
       } catch (Exception e) {
         LOG.error("Error while sending messages", e);
       }
-      MessageQueue<M> msgQueue = (MessageQueue<M>) messages;
-      if (msgQueue != null) {
-        msgQueue.close();
-      }
     }
 
     if (this.faultToleranceService != null) {
@@ -415,7 +403,7 @@ public final class BSPPeerImpl<K1, V1, K
     }
 
     // Clear outgoing queues.
-    messenger.clearOutgoingQueues();
+    messenger.clearOutgoingMessages();
 
     leaveBarrier();
 
@@ -437,18 +425,6 @@ public final class BSPPeerImpl<K1, V1, K
 
   }
 
-  private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
-    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
-    if (combiner != null) {
-      bundle.addMessage(combiner.combine(messages));
-    } else {
-      for (M message : messages) {
-        bundle.addMessage(message);
-      }
-    }
-    return bundle;
-  }
-
   protected final void enterBarrier() throws SyncException {
     syncClient.enterBarrier(taskId.getJobID(), taskId,
         currentTaskStatus.getSuperstepCount());
@@ -520,7 +496,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public final void clear() {
-    messenger.clearOutgoingQueues();
+    messenger.clearOutgoingMessages();
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -19,8 +19,6 @@ package org.apache.hama.bsp.message;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map.Entry;
@@ -32,16 +30,18 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SingleLockQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
-import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.ReflectionUtils;
 
 /**
  * Abstract baseclass that should contain all information and services needed
@@ -56,9 +56,8 @@ public abstract class AbstractMessageMan
 
   // conf is injected via reflection of the factory
   protected Configuration conf;
-  protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
-  protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues
= new HashMap<InetSocketAddress, MessageQueue<M>>();
 
+  protected OutgoingMessageManager<M> outgoingMessageManager;
   protected MessageQueue<M> localQueue;
 
   // this must be a synchronized implementation: this is accessed per RPC
@@ -81,6 +80,7 @@ public abstract class AbstractMessageMan
    * TaskAttemptID, org.apache.hama.bsp.BSPPeer,
    * org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
    */
+  @SuppressWarnings("unchecked")
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
@@ -91,6 +91,19 @@ public abstract class AbstractMessageMan
     this.localQueue = getReceiverQueue();
     this.localQueueForNextIteration = getSynchronizedReceiverQueue();
     this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
+    this.outgoingMessageManager = getOutgoingMessageManager();
+
+    final String combinerName = conf.get(Constants.COMBINER_CLASS);
+    if (combinerName != null) {
+      try {
+        Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+            .getClassByName(combinerName));
+        this.outgoingMessageManager.setCombiner(combiner);
+      } catch (ClassNotFoundException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
   }
 
   /*
@@ -100,10 +113,7 @@ public abstract class AbstractMessageMan
   @Override
   public void close() {
     try {
-      Collection<MessageQueue<M>> values = outgoingQueues.values();
-      for (MessageQueue<M> msgQueue : values) {
-        msgQueue.close();
-      }
+      outgoingMessageManager.clear();
       localQueue.close();
       // remove possible disk queues from the path
       try {
@@ -121,18 +131,6 @@ public abstract class AbstractMessageMan
 
   /*
    * (non-Javadoc)
-   * @see org.apache.hama.bsp.message.MessageManager#finishSendPhase()
-   */
-  @Override
-  public void finishSendPhase() throws IOException {
-    Collection<MessageQueue<M>> values = outgoingQueues.values();
-    for (MessageQueue<M> msgQueue : values) {
-      msgQueue.prepareRead();
-    }
-  }
-
-  /*
-   * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage()
    */
   @Override
@@ -154,7 +152,7 @@ public abstract class AbstractMessageMan
    * @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues()
    */
   @Override
-  public final void clearOutgoingQueues() {
+  public final void clearOutgoingMessages() {
     if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
         && localQueue.size() > 0) {
 
@@ -199,21 +197,8 @@ public abstract class AbstractMessageMan
    */
   @Override
   public void send(String peerName, M msg) throws IOException {
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = BSPNetUtils.getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-    MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
-    if (queue == null) {
-      queue = getSenderQueue();
-    }
-    queue.add(msg);
+    outgoingMessageManager.addMessage(peerName, msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
-    outgoingQueues.put(targetPeerAddress, queue);
     notifySentMessage(peerName, msg);
   }
 
@@ -222,24 +207,16 @@ public abstract class AbstractMessageMan
    * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
    */
   @Override
-  public final Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator()
{
-    return this.outgoingQueues.entrySet().iterator();
+  public final Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>>
getOutgoingBundles() {
+    return this.outgoingMessageManager.getBundleIterator();
   }
 
-  /**
-   * Returns a new queue implementation based on what was configured. If nothing
-   * has been configured for "hama.messenger.queue.class" then the
-   * {@link MemoryQueue} is used. If you have scalability issues, then better
-   * use {@link DiskQueue}.
-   * 
-   * @return a <b>new</b> queue implementation.
-   */
-  protected MessageQueue<M> getSenderQueue() {
+  protected OutgoingMessageManager<M> getOutgoingMessageManager() {
     @SuppressWarnings("unchecked")
-    MessageQueue<M> queue = MessageTransferQueueFactory
-        .getMessageTransferQueue(conf).getSenderQueue(conf);
-    queue.init(conf, attemptId);
-    return queue;
+    OutgoingMessageManager<M> messageManager = ReflectionUtils.newInstance(conf
+        .getClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+            OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class));
+    return messageManager;
   }
 
   /**
@@ -252,16 +229,13 @@ public abstract class AbstractMessageMan
    */
   protected MessageQueue<M> getReceiverQueue() {
     @SuppressWarnings("unchecked")
-    MessageQueue<M> queue = MessageTransferQueueFactory
-        .getMessageTransferQueue(conf).getReceiverQueue(conf);
+    MessageQueue<M> queue = ReflectionUtils.newInstance(conf.getClass(
+        MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+        MessageQueue.class));
     queue.init(conf, attemptId);
     return queue;
   }
 
-  protected SynchronizedQueue<M> getSynchronizedSenderQueue() {
-    return SingleLockQueue.synchronize(getSenderQueue());
-  }
-
   protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
     return SingleLockQueue.synchronize(getReceiverQueue());
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Tue Jan
21 01:28:58 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.MessageQueue;
 
 /**
  * This manager takes care of the messaging. It is responsible to launch a
@@ -36,9 +35,8 @@ import org.apache.hama.bsp.message.queue
  */
 public interface MessageManager<M extends Writable> {
 
+  public static final String OUTGOING_MESSAGE_MANAGER_CLASS = "hama.messenger.outgoing.message.manager.class";
   public static final String RECEIVE_QUEUE_TYPE_CLASS = "hama.messenger.receive.queue.class";
-  public static final String SENDER_QUEUE_TYPE_CLASS = "hama.messenger.sender.queue.class";
-  public static final String TRANSFER_QUEUE_TYPE_CLASS = "hama.messenger.xfer.queue.class";
   public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections";
 
   /**
@@ -75,17 +73,10 @@ public interface MessageManager<M extend
   public void send(String peerName, M msg) throws IOException;
 
   /**
-   * Should be called when all messages were send with send().
-   * 
-   * @throws IOException
-   */
-  public void finishSendPhase() throws IOException;
-
-  /**
-   * Returns an iterator of messages grouped by peer.
+   * Returns an bundle of messages grouped by peer.
    * 
    */
-  public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator();
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getOutgoingBundles();
 
   /**
    * This is the real transferring to a host with a bundle.
@@ -97,7 +88,7 @@ public interface MessageManager<M extend
   /**
    * Clears the outgoing queue. Can be used to switch queues.
    */
-  public void clearOutgoingQueues();
+  public void clearOutgoingMessages();
 
   /**
    * Gets the number of messages in the current queue.

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1559885&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
(added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+
+public interface OutgoingMessageManager<M extends Writable> {
+
+  public void setCombiner(Combiner<M> combiner);
+
+  public void addMessage(String peerName, M msg);
+
+  public void clear();
+
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator();
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1559885&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
(added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
Tue Jan 21 01:28:58 2014
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingPOJOMessageBundle<M extends Writable> implements
+    OutgoingMessageManager<M> {
+
+  private Combiner<M> combiner;
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
+  private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new
HashMap<InetSocketAddress, BSPMessageBundle<M>>();
+
+  @Override
+  public void setCombiner(Combiner<M> combiner) {
+    this.combiner = combiner;
+  }
+
+  @Override
+  public void addMessage(String peerName, M msg) {
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+
+    if (!outgoingBundles.containsKey(targetPeerAddress)) {
+      outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
+    }
+
+    if (combiner != null) {
+      BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
+      bundle.addMessage(msg);
+      BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+      combined.addMessage(combiner.combine(bundle.getMessages()));
+      outgoingBundles.put(targetPeerAddress, combined);
+    } else {
+      outgoingBundles.get(targetPeerAddress).addMessage(msg);
+    }
+  }
+
+  @Override
+  public void clear() {
+    outgoingBundles.clear();
+  }
+
+  @Override
+  public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator()
{
+    return outgoingBundles.entrySet().iterator();
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Tue Jan
21 01:28:58 2014
@@ -90,7 +90,7 @@ public final class MemoryQueue<M extends
 
   @Override
   public void close() {
-
+    this.clear();
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Jan 21 01:28:58 2014
@@ -94,7 +94,7 @@ public final class SortedMemoryQueue<M e
 
   @Override
   public void close() {
-
+    this.clear();;
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Tue
Jan 21 01:28:58 2014
@@ -45,8 +45,7 @@ import org.apache.hama.bsp.message.io.Sp
  * 
  * @param <M>
  */
-public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M>
-    implements MessageTransferProtocol<M> {
+public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M>
{
 
   private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
 
@@ -345,16 +344,6 @@ public class SpillingQueue<M extends Wri
   }
 
   @Override
-  public MessageQueue<M> getSenderQueue(Configuration conf) {
-    return this;
-  }
-
-  @Override
-  public MessageQueue<M> getReceiverQueue(Configuration conf) {
-    return this;
-  }
-
-  @Override
   public void add(BSPMessageBundle<M> bundle) {
     try {
       this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Jan 21 01:28:58
2014
@@ -52,7 +52,6 @@ import org.apache.hama.bsp.ft.AsyncRcvdM
 import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageEventListener;
 import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncEvent;
@@ -102,11 +101,7 @@ public class TestCheckpoint extends Test
     }
 
     @Override
-    public void finishSendPhase() throws IOException {
-    }
-
-    @Override
-    public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator()
{
+    public Iterator<Entry<InetSocketAddress, BSPMessageBundle<Text>>> getOutgoingBundles()
{
       return null;
     }
 
@@ -118,7 +113,7 @@ public class TestCheckpoint extends Test
     }
 
     @Override
-    public void clearOutgoingQueues() {
+    public void clearOutgoingMessages() {
     }
 
     @Override

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue Jan 21 01:28:58
2014
@@ -39,32 +39,32 @@ public class TestPersistQueue extends Te
 
   public void testDiskQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.DiskQueueTransfer");
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.DiskQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }
 
   public void testMemoryQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.MemoryQueueTransfer");
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.MemoryQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }
 
   public void testSortedQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer");
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SortedMemoryQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }
 
   public void testSpillingQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.SpillingQueueTransfer");
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SpillingQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Tue Jan 21 01:28:58 2014
@@ -34,10 +34,8 @@ import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.queue.DiskQueue;
-import org.apache.hama.bsp.message.queue.DiskQueueTransfer;
-import org.apache.hama.bsp.message.queue.MemoryQueueTransfer;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferProtocol;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHamaMessageManager extends TestCase {
@@ -49,8 +47,8 @@ public class TestHamaMessageManager exte
 
   public void testMemoryMessaging() throws Exception {
     HamaConfiguration conf = new HamaConfiguration();
-    conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        MemoryQueueTransfer.class, MessageTransferProtocol.class);
+    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+        MessageQueue.class);
     conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     messagingInternal(conf);
   }
@@ -58,12 +56,13 @@ public class TestHamaMessageManager exte
   public void testDiskMessaging() throws Exception {
     HamaConfiguration conf = new HamaConfiguration();
     conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        DiskQueueTransfer.class, MessageTransferProtocol.class);
+    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
+        MessageQueue.class);
     messagingInternal(conf);
   }
 
-  private static void messagingInternal(HamaConfiguration conf) throws Exception {
+  private static void messagingInternal(HamaConfiguration conf)
+      throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         "org.apache.hama.bsp.message.HamaMessageManagerImpl");
     MessageManager<IntWritable> messageManager = MessageManagerFactory
@@ -86,24 +85,24 @@ public class TestHamaMessageManager exte
     System.out.println("Peer is " + peerName);
     messageManager.send(peerName, new IntWritable(1337));
 
-    Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>> messageIterator
= messageManager
-        .getMessageIterator();
+    Iterator<Entry<InetSocketAddress, BSPMessageBundle<IntWritable>>> messageIterator
= messageManager
+        .getOutgoingBundles();
 
-    Entry<InetSocketAddress, MessageQueue<IntWritable>> entry = messageIterator
+    Entry<InetSocketAddress, BSPMessageBundle<IntWritable>> entry = messageIterator
         .next();
 
     assertEquals(entry.getKey(), peer);
 
-    assertTrue(entry.getValue().size() == 1);
+    assertTrue(entry.getValue().getMessages().size() == 1);
 
     BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
-    for (IntWritable msg : entry.getValue()) {
+    for (IntWritable msg : entry.getValue().getMessages()) {
       bundle.addMessage(msg);
     }
 
     messageManager.transfer(peer, bundle);
 
-    messageManager.clearOutgoingQueues();
+    messageManager.clearOutgoingMessages();
 
     assertTrue(messageManager.getNumCurrentMessages() == 1);
     IntWritable currentMessage = messageManager.getCurrentMessage();

Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Tue Jan 21 01:28:58
2014
@@ -253,8 +253,8 @@ public class TestPipes extends HamaClust
     bsp.setPartitioner(PipesPartitioner.class);
 
     // sort sent messages
-    bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer");
+    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        "org.apache.hama.bsp.message.queue.SortedMemoryQueue");
     bsp.set("hama.mat.mult.B.path", transposedMatrixB.toString());
     return bsp;
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1559885&r1=1559884&r2=1559885&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue Jan 21 01:28:58
2014
@@ -32,8 +32,7 @@ import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferProtocol;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueueTransfer;
+import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 
 import com.google.common.base.Preconditions;
 
@@ -104,15 +103,15 @@ public class GraphJob extends BSPJob {
   }
 
   /**
-  * Custom aggregator registration. Add a custom aggregator
-  * that will aggregate massages sent from the user.
-  *
-  * @param name identifies an aggregator
-  * @param aggregatorClass the aggregator class
-  */
+   * Custom aggregator registration. Add a custom aggregator that will aggregate
+   * massages sent from the user.
+   * 
+   * @param name identifies an aggregator
+   * @param aggregatorClass the aggregator class
+   */
   @SuppressWarnings("rawtypes")
-  public void registerAggregator(String name, Class<? extends
-      Aggregator> aggregatorClass) {
+  public void registerAggregator(String name,
+      Class<? extends Aggregator> aggregatorClass) {
     String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
 
     prevAggrs += name + "@" + aggregatorClass.getName() + ";";
@@ -197,8 +196,8 @@ public class GraphJob extends BSPJob {
     }
 
     // add the default message queue to the sorted one
-    this.getConfiguration().setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
-        SortedMemoryQueueTransfer.class, MessageTransferProtocol.class);
+    this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
+        SortedMemoryQueue.class, MessageQueue.class);
 
     super.submit();
   }



Mime
View raw message