incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1205819 - in /incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples: ShortestPathVertexMessage.java ShortestPaths.java
Date Thu, 24 Nov 2011 13:20:15 GMT
Author: edwardyoon
Date: Thu Nov 24 13:20:15 2011
New Revision: 1205819

URL: http://svn.apache.org/viewvc?rev=1205819&view=rev
Log:
To reduce memory usage, vertexLookup structure replaced from Map to list.

Added:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
Modified:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java?rev=1205819&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
(added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
Thu Nov 24 13:20:15 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hama.bsp.BSPMessage;
+
+public class ShortestPathVertexMessage extends BSPMessage {
+
+  ShortestPathVertex tag;
+  int data;
+
+  public ShortestPathVertexMessage() {
+    super();
+  }
+
+  public ShortestPathVertexMessage(ShortestPathVertex tag, int data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag.getName());
+    out.writeInt(tag.getWeight());
+    out.writeInt(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    String name = in.readUTF();
+    int weight = in.readInt();
+    tag = new ShortestPathVertex(weight, name);
+    data = in.readInt();
+  }
+
+  @Override
+  public ShortestPathVertex getTag() {
+    return tag;
+  }
+
+  @Override
+  public Integer getData() {
+    return data;
+  }
+
+}
\ No newline at end of file

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1205819&r1=1205818&r2=1205819&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Thu Nov 24 13:20:15 2011
@@ -18,15 +18,22 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
@@ -45,7 +52,7 @@ public class ShortestPaths extends
   public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
 
   public static final String START_VERTEX = "shortest.paths.start.vertex.name";
-  private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String,
ShortestPathVertex>();
+  private final List<ShortestPathVertex> vertexLookup = new ArrayList<ShortestPathVertex>();
   private final HashMap<ShortestPathVertex, ShortestPathVertex[]> adjacencyList = new
HashMap<ShortestPathVertex, ShortestPathVertex[]>();
   private String masterTask;
 
@@ -55,13 +62,15 @@ public class ShortestPaths extends
       throws IOException, KeeperException, InterruptedException {
     boolean updated = true;
     while (updated) {
-      int updatesMade = 0;
       peer.sync();
 
-      IntegerMessage msg = null;
+      int updatesMade = 0;
+      ShortestPathVertexMessage msg = null;
       Deque<ShortestPathVertex> updatedQueue = new LinkedList<ShortestPathVertex>();
-      while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) {
-        ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag());
+      while ((msg = (ShortestPathVertexMessage) peer.getCurrentMessage()) != null) {
+        int index = Collections.binarySearch(vertexLookup, msg.getTag());
+        ShortestPathVertex vertex = vertexLookup.get(index);
+
         // check if we need an distance update
         if (vertex.getCost() > msg.getData()) {
           updatesMade++;
@@ -69,7 +78,7 @@ public class ShortestPaths extends
           vertex.setCost(msg.getData());
         }
       }
-      // synchonize with all grooms if there were updates
+
       updated = broadcastUpdatesMade(peer, updatesMade);
       // send updates to the adjacents of the updated vertices
       for (ShortestPathVertex vertex : updatedQueue) {
@@ -78,26 +87,28 @@ public class ShortestPaths extends
     }
   }
 
-  @Override
   public void setup(
       BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer)
       throws IOException, KeeperException, InterruptedException {
-
     KeyValuePair<ShortestPathVertex, ShortestPathVertexArrayWritable> next = null;
+    ShortestPathVertex startVertex = null;
+
     while ((next = peer.readNext()) != null) {
+      if (next.getKey().getName().equals(
+          peer.getConfiguration().get(START_VERTEX))) {
+        next.getKey().setCost(0);
+        startVertex = next.getKey();
+      }
       adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
           .toArray());
-      vertexLookupMap.put(next.getKey().getName(), next.getKey());
+      vertexLookup.add(next.getKey());
     }
 
+    Collections.sort(vertexLookup);
     masterTask = peer.getPeerName(0);
 
     // initial message bypass
-    ShortestPathVertex startVertex = vertexLookupMap.get(peer
-        .getConfiguration().get(START_VERTEX));
-
     if (startVertex != null) {
-      startVertex.setCost(0);
       sendMessageToNeighbors(peer, startVertex);
     }
   }
@@ -109,8 +120,10 @@ public class ShortestPaths extends
     // write our map into hdfs
     for (Entry<ShortestPathVertex, ShortestPathVertex[]> entry : adjacencyList
         .entrySet()) {
-      peer.write(new Text(entry.getKey().getName()), new IntWritable(entry
-          .getKey().getCost()));
+      int cost = entry.getKey().getCost();
+      if (cost < Integer.MAX_VALUE) {
+        peer.write(new Text(entry.getKey().getName()), new IntWritable(cost));
+      }
     }
   }
 
@@ -166,14 +179,40 @@ public class ShortestPaths extends
       BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer,
       ShortestPathVertex id) throws IOException {
     ShortestPathVertex[] outgoingEdges = adjacencyList.get(id);
+
     for (ShortestPathVertex adjacent : outgoingEdges) {
-      int mod = Math.abs((adjacent.hashCode() % peer.getAllPeerNames().length));
-      peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(),
+      String target = peer.getPeerName(Math.abs((adjacent.hashCode() % peer
+          .getAllPeerNames().length)));
+
+      peer.send(target, new ShortestPathVertexMessage(adjacent,
           id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
               + adjacent.getWeight()));
     }
   }
 
+  static void printOutput(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+    for (FileStatus status : stati) {
+      if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+        Path path = status.getPath();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+        Text key = new Text();
+        IntWritable value = new IntWritable();
+        int x = 0;
+        while (reader.next(key, value)) {
+          System.out.println(key.toString() + " | " + value.get());
+          x++;
+          if (x > 10) {
+            System.out.println("...");
+            break;
+          }
+        }
+        reader.close();
+      }
+    }
+  }
+
   public static void printUsage() {
     System.out.println("Usage: <startNode> <output path> <input path>");
   }
@@ -206,10 +245,10 @@ public class ShortestPaths extends
 
     long startTime = System.currentTimeMillis();
     if (bsp.waitForCompletion(true)) {
+      printOutput(conf);
       System.out.println("Job Finished in "
           + (double) (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }
-
 }



Mime
View raw message