hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1673127 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/apac...
Date Mon, 13 Apr 2015 07:52:46 GMT
Author: edwardyoon
Date: Mon Apr 13 07:52:45 2015
New Revision: 1673127

URL: http://svn.apache.org/r1673127
Log:
HAMA-526: Multi-threaded vertex processing

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Apr 13 07:52:45 2015
@@ -18,6 +18,7 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
@@ -202,4 +203,9 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @return the task id of this task.
    */
   public TaskAttemptID getTaskId();
+
+  public List<List<M>> getSubLists(int num);
+
+  public void clearIncomingMessages();
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Apr 13 07:52:45
2015
@@ -511,6 +511,11 @@ public final class BSPPeerImpl<K1, V1, K
   public final void clear() {
     messenger.clearOutgoingMessages();
   }
+  
+  @Override
+  public final void clearIncomingMessages() {
+    messenger.clearIncomingMessages();
+  }
 
   /**
    * @return the string as host:port of this Peer
@@ -668,4 +673,9 @@ public final class BSPPeerImpl<K1, V1, K
     return taskId;
   }
 
+  @Override
+  public List<List<M>> getSubLists(int num) {
+    return messenger.getSubLists(num);
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Mon Apr 13 07:52:45 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Queue;
 
@@ -121,6 +122,11 @@ public abstract class AbstractMessageMan
     return localQueue.poll();
   }
 
+  @Override
+  public List<List<M>> getSubLists(int num) {
+    return localQueue.getSubLists(num);
+  }
+  
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
@@ -129,6 +135,10 @@ public abstract class AbstractMessageMan
   public final int getNumCurrentMessages() {
     return localQueue.size();
   }
+  
+  public void clearIncomingMessages() {
+    localQueue.clear();
+  }
 
   /*
    * (non-Javadoc)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Apr 13 07:52:45 2015
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Mon Apr
13 07:52:45 2015
@@ -20,6 +20,7 @@ package org.apache.hama.bsp.message;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
@@ -91,6 +92,11 @@ public interface MessageManager<M extend
   public void clearOutgoingMessages();
 
   /**
+   * Clears the incoming queue. Can be used to switch queues.
+   */
+  public void clearIncomingMessages();
+  
+  /**
    * Gets the number of messages in the current queue.
    * 
    */
@@ -122,4 +128,6 @@ public interface MessageManager<M extend
    * on.
    */
   public InetSocketAddress getListenerAddress();
+
+  public List<List<M>> getSubLists(int num);
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Mon Apr
13 07:52:45 2015
@@ -17,7 +17,9 @@
  */
 package org.apache.hama.bsp.message.queue;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
@@ -25,6 +27,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
+import com.google.common.collect.Lists;
+
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
@@ -87,11 +91,6 @@ public final class MemoryQueue<M extends
   }
 
   @Override
-  public final Iterator<M> iterator() {
-    return deque.iterator();
-  }
-
-  @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -117,4 +116,15 @@ public final class MemoryQueue<M extends
     return this;
   }
 
+  @Override
+  public List<List<M>> getSubLists(int num) {
+    List<List<M>> subLists = new ArrayList<List<M>>();
+    subLists.add(Lists.newArrayList(deque.iterator()));
+    Iterator<BSPMessageBundle<M>> it = bundles.iterator();
+    while (it.hasNext()) {
+      subLists.add(Lists.newArrayList(it.next().iterator()));
+    }
+    return subLists;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Mon
Apr 13 07:52:45 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.message.queue;
 
+import java.util.List;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -26,8 +28,7 @@ import org.apache.hama.bsp.TaskAttemptID
 /**
  * Simple queue interface.
  */
-public interface MessageQueue<M extends Writable> extends Iterable<M>,
-    Configurable {
+public interface MessageQueue<M extends Writable> extends Configurable {
 
   public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
 
@@ -63,6 +64,8 @@ public interface MessageQueue<M extends
    */
   public void add(M item);
 
+  public List<List<M>> getSubLists(int num);
+  
   /**
    * Clears all entries in the given queue.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Mon
Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -45,17 +45,6 @@ public final class SingleLockQueue<T ext
 
   /*
    * (non-Javadoc)
-   * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
-   */
-  @Override
-  public Iterator<T> iterator() {
-    synchronized (mutex) {
-      return queue.iterator();
-    }
-  }
-
-  /*
-   * (non-Javadoc)
    * @see
    * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
    * .conf.Configuration)
@@ -196,4 +185,9 @@ public final class SingleLockQueue<T ext
       queue.addAll(otherqueue);
     }
   }
+
+  @Override
+  public List<List<T>> getSubLists(int num) {
+    return queue.getSubLists(num);
+  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 
@@ -26,6 +26,8 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
+import com.google.common.collect.Lists;
+
 /**
  * Heap (Java's priority queue) based message queue implementation that supports
  * sorted receive and send.
@@ -37,11 +39,6 @@ public final class SortedMemoryQueue<M e
   private Configuration conf;
 
   @Override
-  public Iterator<M> iterator() {
-    return queue.iterator();
-  }
-
-  @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -106,4 +103,9 @@ public final class SortedMemoryQueue<M e
     return this;
   }
 
+  @Override
+  public List<List<M>> getSubLists(int num) {
+    return Lists.partition(Lists.newArrayList(queue.iterator()), num);
+  }
+
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Apr 13 07:52:45
2015
@@ -151,6 +151,18 @@ public class TestCheckpoint extends Test
       return null;
     }
 
+    @Override
+    public List<List<Text>> getSubLists(int num) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void clearIncomingMessages() {
+      // TODO Auto-generated method stub
+      
+    }
+
   }
 
   public static class TestBSPPeer implements
@@ -307,6 +319,18 @@ public class TestCheckpoint extends Test
       return null;
     }
 
+    @Override
+    public List<List<Text>> getSubLists(int num) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public void clearIncomingMessages() {
+      // TODO Auto-generated method stub
+      
+    }
+
   }
 
   public static class TempSyncClient extends BSPPeerSyncClient {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java Mon Apr 13 07:52:45
2015
@@ -173,6 +173,7 @@ public class MaxFlow {
       stepStatusDetecting(haveActivingNormalVertex);
       boolean pushStepCompleted = pushStepCompleted(haveActivingNormalVertex);
       boolean senseStepCompleted = senseStepCompleted(haveActivingNormalVertex);
+      
       if (senseStepCompleted) {
         aggregate();
         for (FloatArrayWritable msg : overFlowMsgList) {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Mon Apr 13 07:52:45
2015
@@ -135,10 +135,10 @@ public final class GraphJobMessage imple
       ByteArrayOutputStream a = new ByteArrayOutputStream();
       DataOutputStream b = new DataOutputStream(a);
       value.write(b);
+      
       byteBuffer.write(a.toByteArray());
       numOfValues++;
     } catch (IOException e) {
-      // TODO Auto-generated catch block
       e.printStackTrace();
     }
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Apr 13 07:52:45
2015
@@ -20,11 +20,9 @@ package org.apache.hama.graph;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -228,58 +226,65 @@ public final class GraphJobRunner<V exte
     }
   }
 
-  private Set<V> notComputedVertices;
-
   /**
    * Do the main logic of a superstep, namely checking if vertices are active,
    * feeding compute with messages and controlling combiners/aggregators. We
    * iterate over our messages and vertices in sorted order. That means that we
    * need to seek the first vertex that has the same ID as the iterated message.
    */
-  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    int activeVertices = 0;
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    notComputedVertices = new HashSet();
-    notComputedVertices.addAll(vertices.keySet());
+    List<Thread> runners = new ArrayList<Thread>();
 
-    Vertex<V, E, M> vertex = null;
+    List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
+        "hama.graph.thread.num", 100));
 
-    while (currentMessage != null) {
-      vertex = vertices.get((V) currentMessage.getVertexId());
+    if (subLists.size() == 0) {
+      if (currentMessage != null) {
+        List<GraphJobMessage> first = new ArrayList<GraphJobMessage>();
+        first.add(currentMessage);
+        runners.add(new ComputeReceivedMessage(first));
+      }
+    } else {
+      for (List<GraphJobMessage> subList : subLists) {
+        if (runners.size() == 0)
+          subList.add(currentMessage);
 
-      // reactivation
-      if (vertex.isHalted()) {
-        vertex.setActive();
+        runners.add(new ComputeReceivedMessage(subList));
       }
+    }
 
-      if (!vertex.isHalted()) {
-        vertex.compute((Iterable<M>) currentMessage.getIterableMessages());
-        vertices.finishVertexComputation(vertex);
-        activeVertices++;
+    for (Thread computer : runners) {
+      computer.start();
+    }
 
-        notComputedVertices.remove(vertex.getVertexID());
+    for (Thread computer : runners) {
+      try {
+        computer.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
       }
-
-      currentMessage = peer.getCurrentMessage();
     }
-    
-    for (V v : notComputedVertices) {
-      vertex = vertices.get(v);
-      if (!vertex.isHalted()) {
+
+    // After using getSubLists(), we need to clean up local vertex messages.
+    peer.clearIncomingMessages();
+
+    for (V v : vertices.getNotComputedVertices()) {
+      if (!vertices.get(v).isHalted()) {
+        Vertex<V, E, M> vertex = vertices.get(v);
         vertex.compute(Collections.<M> emptyList());
         vertices.finishVertexComputation(vertex);
-        activeVertices++;
       }
     }
 
     vertices.finishSuperstep();
-    getAggregationRunner().sendAggregatorValues(peer, activeVertices,
-        this.changedVertexCnt);
+
+    getAggregationRunner().sendAggregatorValues(peer,
+        vertices.getComputedVertices().size(), this.changedVertexCnt);
     this.iteration++;
   }
 
@@ -294,10 +299,10 @@ public final class GraphJobRunner<V exte
     vertices.startSuperstep();
 
     List<Thread> runners = new ArrayList<Thread>();
-    List<Vertex<V, E, M>> v = new ArrayList<Vertex<V, E, M>>(
-        vertices.getValues());
-    for (List<Vertex<V, E, M>> partition : Lists.partition(v, conf.getInt("hama.graph.thread.num",
30))) {
-      runners.add(new Computer(partition));
+
+    for (List<V> vLists : Lists.partition(new ArrayList<V>(vertices.keySet()),
+        conf.getInt("hama.graph.thread.num", 100))) {
+      runners.add(new Computer(vLists));
     }
 
     for (Thread computer : runners) {
@@ -317,20 +322,49 @@ public final class GraphJobRunner<V exte
     iteration++;
   }
 
+  class ComputeReceivedMessage extends Thread {
+    List<GraphJobMessage> subList;
+
+    public ComputeReceivedMessage(List<GraphJobMessage> subList) {
+      this.subList = subList;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+      try {
+        for (GraphJobMessage msg : subList) {
+          Vertex<V, E, M> vertex = vertices.get((V) msg.getVertexId());
+
+          // reactivation
+          if (vertex.isHalted()) {
+            vertex.setActive();
+          }
+
+          vertex.compute((Iterable<M>) msg.getIterableMessages());
+          vertices.finishVertexComputation(vertex);
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
   class Computer extends Thread {
-    List<Vertex<V, E, M>> partition;
+    List<V> vList;
 
-    public Computer(List<Vertex<V, E, M>> partition) {
-      this.partition = partition;
+    public Computer(List<V> vList) {
+      this.vList = vList;
     }
 
     @Override
     public void run() {
       try {
-        for (Vertex<V, E, M> v : partition) {
-          v.setup(conf);
-          v.compute(Collections.singleton(v.getValue()));
-          vertices.finishVertexComputation(v);
+        for (V v : vList) {
+          Vertex<V, E, M> vertex = vertices.get(v);
+          vertex.setup(conf);
+          vertex.compute(Collections.singleton(vertex.getValue()));
+          vertices.finishVertexComputation(vertex);
         }
       } catch (IOException e) {
         e.printStackTrace();
@@ -423,9 +457,9 @@ public final class GraphJobRunner<V exte
     while ((received = peer.getCurrentMessage()) != null) {
       addVertex((Vertex<V, E, M>) received.getVertex());
     }
+
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
-    LOG.debug("Starting Vertex processing!");
   }
 
   /**
@@ -571,9 +605,8 @@ public final class GraphJobRunner<V exte
     vertices.finishSuperstep();
   }
 
-  public void sendMessage(V dstinationVertexID, M msg) throws IOException {
-    peer.send(getHostName(dstinationVertexID), new GraphJobMessage(
-        dstinationVertexID, msg));
+  public void sendMessage(V vertexID, M msg) throws IOException {
+    peer.send(getHostName(vertexID), new GraphJobMessage(vertexID, msg));
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.graph;
 
-import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
@@ -36,11 +36,6 @@ public class IncomingVertexMessageManage
   private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new ConcurrentLinkedQueue<GraphJobMessage>();
 
   @Override
-  public Iterator<GraphJobMessage> iterator() {
-    return msgPerVertex.iterator();
-  }
-
-  @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -80,6 +75,7 @@ public class IncomingVertexMessageManage
 
   @Override
   public void clear() {
+    mapMessages.clear();
     msgPerVertex.clear();
   }
 
@@ -94,7 +90,7 @@ public class IncomingVertexMessageManage
 
   @Override
   public int size() {
-    return msgPerVertex.size();
+    return msgPerVertex.size() + mapMessages.size();
   }
 
   // empty, not needed to implement
@@ -112,5 +108,9 @@ public class IncomingVertexMessageManage
     return this;
   }
 
+  @Override
+  public List<List<GraphJobMessage>> getSubLists(int num) {
+    return msgPerVertex.getSubLists(num);
+  }
 
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java Mon Apr 13 07:52:45
2015
@@ -20,6 +20,7 @@ package org.apache.hama.graph;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,8 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.TaskAttemptID;
 
+import com.google.common.collect.Sets;
+
 /**
  * Stores the vertices into a memory-based tree map. This implementation allows
  * the runtime graph modification and random access by vertex ID.
@@ -43,6 +46,8 @@ public final class MapVerticesInfo<V ext
     implements VerticesInfo<V, E, M> {
   private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V,
E, M>>();
 
+  private Set<V> computedVertices; 
+  
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
@@ -113,9 +118,9 @@ public final class MapVerticesInfo<V ext
   }
 
   @Override
-  public void finishVertexComputation(Vertex<V, E, M> vertex)
+  public synchronized void finishVertexComputation(Vertex<V, E, M> vertex)
       throws IOException {
-    // do nothing
+    computedVertices.add(vertex.getVertexID());
   }
 
   @Override
@@ -128,10 +133,23 @@ public final class MapVerticesInfo<V ext
 
   @Override
   public void startSuperstep() throws IOException {
+    computedVertices = new HashSet<V>();
   }
 
   @Override
   public void finishSuperstep() throws IOException {
   }
 
+  @Override
+  public Set<V> getComputedVertices() {
+    return this.computedVertices;
+  }
+  
+  public Set<V> getNotComputedVertices() {
+    return Sets.difference(vertices.keySet(), computedVertices);
+  }
+
+  public int getActiveVerticesNum() {
+    return computedVertices.size();
+  }
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java Mon Apr 13
07:52:45 2015
@@ -18,11 +18,14 @@
 package org.apache.hama.graph;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.hadoop.io.WritableComparable;
 
+import com.google.common.collect.Lists;
+
 public class MessagePerVertex {
 
   @SuppressWarnings("rawtypes")
@@ -67,4 +70,8 @@ public class MessagePerVertex {
     return (storage.size() > 0) ? storage.pollFirstEntry().getValue() : null;
   }
 
+  public List<List<GraphJobMessage>> getSubLists(int num) {
+    return Lists.partition(Lists.newArrayList(iterator()), num);
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon Apr 13 07:52:45
2015
@@ -82,6 +82,12 @@ public interface VerticesInfo<V extends
   
   public Collection<Vertex<V, E, M>> getValues();
 
+  public Set<V> getComputedVertices();
+  
+  public Set<V> getNotComputedVertices();
+  
+  public int getActiveVerticesNum();
+  
   /**
    * Finish the additions, from this point on the implementations should close
    * the adds and throw exceptions in case something is added after this call.



Mime
View raw message