incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1211824 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Date Thu, 08 Dec 2011 11:01:07 GMT
Author: edwardyoon
Date: Thu Dec  8 11:01:07 2011
New Revision: 1211824

URL: http://svn.apache.org/viewvc?rev=1211824&view=rev
Log:
Move clearOutgoingQueues() call to bottom of sync() method.

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1211824&r1=1211823&r2=1211824&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Dec 
8 11:01:07 2011
@@ -258,14 +258,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
       messenger.transfer(addr, bundle);
     }
 
-    // Clear outgoing queues.
-    messenger.clearOutgoingQueues();
     leaveBarrier();
     
     incrCounter(PeerCounter.SUPERSTEPS, 1);
     currentTaskStatus.setCounters(counters);
 
     umbilical.statusUpdate(taskId, currentTaskStatus);
+    // Clear outgoing queues.
+    messenger.clearOutgoingQueues();
   }
 
   private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1211824&r1=1211823&r2=1211824&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Thu Dec  8 11:01:07 2011
@@ -39,14 +39,11 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BooleanMessage;
 import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.IntegerMessage;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
-import org.apache.zookeeper.KeeperException;
 
 public class ShortestPaths extends
     BSP<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable> {
@@ -63,28 +60,52 @@ public class ShortestPaths extends
       throws IOException, SyncException, InterruptedException {
     boolean updated = true;
     while (updated) {
-      peer.sync();
-
       int updatesMade = 0;
+      int globalUpdateCounts = 0;
       ShortestPathVertexMessage msg = null;
       Deque<ShortestPathVertex> updatedQueue = new LinkedList<ShortestPathVertex>();
       while ((msg = (ShortestPathVertexMessage) peer.getCurrentMessage()) != null) {
-        int index = Collections.binarySearch(vertexLookup, msg.getTag());
-        ShortestPathVertex vertex = vertexLookup.get(index);
+        if (msg.getTag().getName().startsWith("updatesMade")) {
+          if (msg.getData() == -1) {
+            updated = false;
+          } else {
+            globalUpdateCounts += msg.getData();
+          }
+        } else {
+          int index = Collections.binarySearch(vertexLookup, msg.getTag());
+          ShortestPathVertex vertex = vertexLookup.get(index);
+
+          // check if we need an distance update
+          if (vertex.getCost() > msg.getData()) {
+            updatesMade++;
+            updatedQueue.add(vertex);
+            vertex.setCost(msg.getData());
+          }
+        }
+      }
+
+      LOG.info(">> previous updates counts: " + globalUpdateCounts + " at "
+          + peer.getSuperstepCount());
 
-        // check if we need an distance update
-        if (vertex.getCost() > msg.getData()) {
-          updatesMade++;
-          updatedQueue.add(vertex);
-          vertex.setCost(msg.getData());
+      if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
+          && peer.getSuperstepCount() > 1) {
+        for (String peerName : peer.getAllPeerNames()) {
+          peer.send(peerName, new ShortestPathVertexMessage(
+              new ShortestPathVertex((int) peer.getSuperstepCount(),
+                  "updatesMade-" + peer.getPeerName()), -1));
         }
       }
 
-      updated = broadcastUpdatesMade(peer, updatesMade);
       // send updates to the adjacents of the updated vertices
+      peer.send(masterTask, new ShortestPathVertexMessage(
+          new ShortestPathVertex((int) peer.getSuperstepCount(), "updatesMade-"
+              + peer.getPeerName()), updatesMade));
+
       for (ShortestPathVertex vertex : updatedQueue) {
         sendMessageToNeighbors(peer, vertex);
       }
+
+      peer.sync();
     }
   }
 
@@ -112,6 +133,7 @@ public class ShortestPaths extends
     if (startVertex != null) {
       sendMessageToNeighbors(peer, startVertex);
     }
+    peer.sync();
   }
 
   @Override
@@ -129,42 +151,6 @@ public class ShortestPaths extends
   }
 
   /**
-   * This method broadcasts to a master groom how many updates were made. He
-   * simply sums them up and sends a message back to the grooms if sum is
-   * greater than zero.
-   * 
-   * @param peer The peer we got through the BSP method.
-   * @param master The assigned master groom name.
-   * @param updates How many updates were made?
-   * @return True if we need another iteration, False if no updates can be made
-   *         anymore.
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  private boolean broadcastUpdatesMade(
-      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer,
-      int updates) throws IOException, SyncException, InterruptedException {
-    peer.send(masterTask, new IntegerMessage(peer.getPeerName(), updates));
-    peer.sync();
-    if (peer.getPeerName().equals(masterTask)) {
-      int count = 0;
-      IntegerMessage message;
-      while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
-        count += message.getData();
-      }
-
-      for (String name : peer.getAllPeerNames()) {
-        peer.send(name, new BooleanMessage("", count > 0 ? true : false));
-      }
-    }
-
-    peer.sync();
-    BooleanMessage message = (BooleanMessage) peer.getCurrentMessage();
-    return message.getData();
-  }
-
-  /**
    * This method takes advantage of our partitioning: it uses the vertexID
    * (simply hash of the name) to determine the host where the message belongs
    * to. <br/>



Mime
View raw message